You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2007/05/01 15:17:27 UTC
svn commit: r534063 - in /activemq/camel/trunk:
camel-core/src/main/java/org/apache/camel/
camel-core/src/main/java/org/apache/camel/builder/
camel-core/src/main/java/org/apache/camel/component/direct/
camel-core/src/main/java/org/apache/camel/componen...
Author: chirino
Date: Tue May 1 06:17:24 2007
New Revision: 534063
URL: http://svn.apache.org/viewvc?view=rev&rev=534063
Log:
Added a throws Exception to Processor.process
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Processor.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FromBuilder.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/PojoEndpoint.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/processor/ProcessorEndpoint.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ServiceSupport.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ChoiceProcessor.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/CompositeProcessor.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelegateProcessor.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/FilterProcessor.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/LoggingErrorHandler.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/QueueLoadBalancer.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ProducerCache.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/InterceptorBuilderTest.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/MyInterceptorProcessor.java
activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java
activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfInvokeConsumer.java
activemq/camel/trunk/camel-http/src/main/java/org/apache/camel/component/http/CamelServlet.java
activemq/camel/trunk/camel-jbi/src/main/java/org/apache/camel/component/jbi/FromJbiProcessor.java
activemq/camel/trunk/camel-jbi/src/main/java/org/apache/camel/component/jbi/JbiEndpoint.java
activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/MessageListenerProcessor.java
activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/component/jms/TransactedJmsRouteTest.java
activemq/camel/trunk/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
activemq/camel/trunk/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java
activemq/camel/trunk/camel-spring/src/main/java/org/apache/camel/spring/spi/SpringTransactionPolicy.java
activemq/camel/trunk/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppConsumer.java
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Processor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Processor.java?view=diff&rev=534063&r1=534062&r2=534063
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Processor.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Processor.java Tue May 1 06:17:24 2007
@@ -29,6 +29,8 @@
/**
* Processes the message exchange
+ *
+ * @throws Exception if an internal processing error has occurred.
*/
- void process(E exchange);
+ void process(E exchange) throws Exception;
}
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FromBuilder.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FromBuilder.java?view=diff&rev=534063&r1=534062&r2=534063
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FromBuilder.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FromBuilder.java Tue May 1 06:17:24 2007
@@ -266,7 +266,7 @@
final Log log = LogFactory.getLog(category);
return intercept(new DelegateProcessor<Exchange>(){
@Override
- public void process(Exchange exchange) {
+ public void process(Exchange exchange) throws Exception {
log.trace(exchange);
processNext(exchange);
}
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java?view=diff&rev=534063&r1=534062&r2=534063
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java Tue May 1 06:17:24 2007
@@ -46,13 +46,13 @@
public Producer<E> createProducer() throws Exception {
return startService(new DefaultProducer<E>(this) {
- public void process(E exchange) {
+ public void process(E exchange) throws Exception {
DirectEndpoint.this.process(exchange);
}
});
}
- protected void process(E exchange) {
+ protected void process(E exchange) throws Exception {
for (DefaultConsumer<E> consumer : consumers) {
consumer.getProcessor().process(exchange);
}
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java?view=diff&rev=534063&r1=534062&r2=534063
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java Tue May 1 06:17:24 2007
@@ -73,7 +73,11 @@
}
protected void processFile(File file) {
- getProcessor().process(endpoint.createExchange(file));
+ try {
+ getProcessor().process(endpoint.createExchange(file));
+ } catch (Throwable e) {
+ handleException(e);
+ }
}
protected boolean isValidFile(File file) {
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/PojoEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/PojoEndpoint.java?view=diff&rev=534063&r1=534062&r2=534063
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/PojoEndpoint.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/PojoEndpoint.java Tue May 1 06:17:24 2007
@@ -62,7 +62,7 @@
* This causes us to invoke the endpoint Pojo using reflection.
* @param pojo
*/
- public void invoke(Object pojo, PojoExchange exchange) {
+ static public void invoke(Object pojo, PojoExchange exchange) {
PojoInvocation invocation = exchange.getInvocation();
try {
Object response = invocation.getMethod().invoke(pojo, invocation.getArgs());
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/processor/ProcessorEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/processor/ProcessorEndpoint.java?view=diff&rev=534063&r1=534062&r2=534063
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/processor/ProcessorEndpoint.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/processor/ProcessorEndpoint.java Tue May 1 06:17:24 2007
@@ -49,7 +49,7 @@
public Producer<Exchange> createProducer() throws Exception {
return startService(new DefaultProducer<Exchange>(this) {
- public void process(Exchange exchange) {
+ public void process(Exchange exchange) throws Exception {
onExchange(exchange);
}
});
@@ -67,7 +67,7 @@
return loadBalancer;
}
- protected void onExchange(Exchange exchange) {
+ protected void onExchange(Exchange exchange) throws Exception {
processor.process(exchange);
// now lets output to the load balancer
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java?view=diff&rev=534063&r1=534062&r2=534063
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java Tue May 1 06:17:24 2007
@@ -287,6 +287,12 @@
//-----------------------------------------------------------------------
protected void doStart() throws Exception {
+ if (components != null) {
+ for (Component component : components.values()) {
+ startServices(component);
+ }
+ }
+
if (routes != null) {
for (Route<Exchange> route : routes) {
Processor<Exchange> processor = route.getProcessor();
@@ -306,6 +312,11 @@
protected void doStop() throws Exception {
stopServices(servicesToClose);
+ if (components != null) {
+ for (Component component : components.values()) {
+ stopServices(component);
+ }
+ }
}
/**
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ServiceSupport.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ServiceSupport.java?view=diff&rev=534063&r1=534062&r2=534063
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ServiceSupport.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ServiceSupport.java Tue May 1 06:17:24 2007
@@ -17,11 +17,9 @@
*/
package org.apache.camel.impl;
-import org.apache.camel.Service;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.camel.Service;
/**
* A useful base class which ensures that a service is only initialized once and provides some helper methods for
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ChoiceProcessor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ChoiceProcessor.java?view=diff&rev=534063&r1=534062&r2=534063
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ChoiceProcessor.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ChoiceProcessor.java Tue May 1 06:17:24 2007
@@ -40,7 +40,7 @@
this.otherwise = otherwise;
}
- public void process(E exchange) {
+ public void process(E exchange) throws Exception {
for (FilterProcessor<E> filterProcessor : filters) {
Predicate<E> predicate = filterProcessor.getPredicate();
if (predicate != null && predicate.matches(exchange)) {
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/CompositeProcessor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/CompositeProcessor.java?view=diff&rev=534063&r1=534062&r2=534063
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/CompositeProcessor.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/CompositeProcessor.java Tue May 1 06:17:24 2007
@@ -35,7 +35,7 @@
this.processors = processors;
}
- public void process(E exchange) {
+ public void process(E exchange) throws Exception {
for (Processor<E> processor : processors) {
processor.process(exchange);
}
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java?view=diff&rev=534063&r1=534062&r2=534063
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java Tue May 1 06:17:24 2007
@@ -62,7 +62,7 @@
return "DeadLetterChannel[" + output + ", " + deadLetter + ", " + redeliveryPolicy + "]";
}
- public void process(E exchange) {
+ public void process(E exchange) throws Exception {
int redeliveryCounter = 0;
long redeliveryDelay = 0;
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelegateProcessor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelegateProcessor.java?view=diff&rev=534063&r1=534062&r2=534063
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelegateProcessor.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelegateProcessor.java Tue May 1 06:17:24 2007
@@ -38,11 +38,11 @@
this.next = next;
}
- public void process(E exchange) {
+ public void process(E exchange) throws Exception {
processNext(exchange);
}
- protected void processNext(E exchange) {
+ protected void processNext(E exchange) throws Exception {
if (next != null) {
next.process(exchange);
}
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/FilterProcessor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/FilterProcessor.java?view=diff&rev=534063&r1=534062&r2=534063
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/FilterProcessor.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/FilterProcessor.java Tue May 1 06:17:24 2007
@@ -34,7 +34,7 @@
this.processor = processor;
}
- public void process(E exchange) {
+ public void process(E exchange) throws Exception {
if (predicate.matches(exchange)) {
processor.process(exchange);
}
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/LoggingErrorHandler.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/LoggingErrorHandler.java?view=diff&rev=534063&r1=534062&r2=534063
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/LoggingErrorHandler.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/LoggingErrorHandler.java Tue May 1 06:17:24 2007
@@ -49,7 +49,7 @@
return "LoggingErrorHandler[" + output + "]";
}
- public void process(E exchange) {
+ public void process(E exchange) throws Exception {
try {
output.process(exchange);
}
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?view=diff&rev=534063&r1=534062&r2=534063
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java Tue May 1 06:17:24 2007
@@ -55,7 +55,7 @@
return "Multicast" + getEndpoints();
}
- public void process(E exchange) {
+ public void process(E exchange) throws Exception {
for (Producer<E> producer : producers) {
E copy = copyExchangeStrategy(producer, exchange);
producer.process(copy);
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java?view=diff&rev=534063&r1=534062&r2=534063
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java Tue May 1 06:17:24 2007
@@ -35,7 +35,7 @@
super(endpoints);
}
- public void process(E exchange) {
+ public void process(E exchange) throws Exception {
E nextExchange = exchange;
boolean first = true;
for (Producer<E> producer : getProducers()) {
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java?view=diff&rev=534063&r1=534062&r2=534063
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java Tue May 1 06:17:24 2007
@@ -49,7 +49,7 @@
return "RecipientList[" + expression + "]";
}
- public void process(E exchange) {
+ public void process(E exchange) throws Exception {
Object receipientList = expression.evaluate(exchange);
Iterator iter = ObjectConverter.iterator(receipientList);
while (iter.hasNext()) {
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java?view=diff&rev=534063&r1=534062&r2=534063
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java Tue May 1 06:17:24 2007
@@ -50,7 +50,7 @@
this.producer = destination.createProducer();
}
- public void process(E exchange) {
+ public void process(E exchange) throws Exception {
if (producer == null) {
throw new IllegalStateException("No producer, this processor has not been started!");
}
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java?view=diff&rev=534063&r1=534062&r2=534063
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java Tue May 1 06:17:24 2007
@@ -49,7 +49,7 @@
return "Splitter[on: " + expression + " to: " + processor + "]";
}
- public void process(E exchange) {
+ public void process(E exchange) throws Exception {
Object value = expression.evaluate(exchange);
Iterator iter = ObjectConverter.iterator(value);
while (iter.hasNext()) {
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java?view=diff&rev=534063&r1=534062&r2=534063
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java Tue May 1 06:17:24 2007
@@ -49,7 +49,7 @@
return "IdempotentConsumer[expression=" + messageIdExpression + ", repository=" + messageIdRepository + ", processor=" + nextProcessor + "]";
}
- public void process(E exchange) {
+ public void process(E exchange) throws Exception {
String messageId = ExpressionHelper.evaluateAsString(messageIdExpression, exchange);
if (messageId == null) {
throw new NoMessageIdException(exchange, messageIdExpression);
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/QueueLoadBalancer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/QueueLoadBalancer.java?view=diff&rev=534063&r1=534062&r2=534063
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/QueueLoadBalancer.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/QueueLoadBalancer.java Tue May 1 06:17:24 2007
@@ -30,7 +30,7 @@
*/
public abstract class QueueLoadBalancer<E extends Exchange> extends LoadBalancerSupport<E> {
- public void process(E exchange) {
+ public void process(E exchange) throws Exception {
List<Processor<E>> list = getProcessors();
if (list.isEmpty()) {
throw new IllegalStateException("No processors available to process " + exchange);
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java?view=diff&rev=534063&r1=534062&r2=534063
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java Tue May 1 06:17:24 2007
@@ -29,7 +29,7 @@
* @version $Revision: 1.1 $
*/
public class TopicLoadBalancer<E extends Exchange> extends LoadBalancerSupport<E> {
- public void process(E exchange) {
+ public void process(E exchange) throws Exception {
List<Processor<E>> list = getProcessors();
for (Processor<E> processor : list) {
E copy = copyExchangeStrategy(processor, exchange);
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ProducerCache.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ProducerCache.java?view=diff&rev=534063&r1=534062&r2=534063
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ProducerCache.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ProducerCache.java Tue May 1 06:17:24 2007
@@ -22,6 +22,7 @@
import org.apache.camel.Endpoint;
import org.apache.camel.FailedToCreateProducerException;
import org.apache.camel.Processor;
+import org.apache.camel.RuntimeCamelException;
import org.apache.camel.impl.ServiceSupport;
import java.util.Map;
@@ -57,8 +58,12 @@
* @param exchange the exchange to send
*/
public void send(Endpoint<E> endpoint, E exchange) {
- Producer<E> producer = getProducer(endpoint);
- producer.process(exchange);
+ try {
+ Producer<E> producer = getProducer(endpoint);
+ producer.process(exchange);
+ } catch (Exception e) {
+ throw new RuntimeCamelException(e);
+ }
}
/**
@@ -68,15 +73,19 @@
* @param processor the transformer used to populate the new exchange
*/
public E send(Endpoint<E> endpoint, Processor<E> processor) {
- Producer<E> producer = getProducer(endpoint);
- E exchange = producer.createExchange();
-
- // lets populate using the processor callback
- processor.process(exchange);
-
- // now lets dispatch
- producer.process(exchange);
- return exchange;
+ try {
+ Producer<E> producer = getProducer(endpoint);
+ E exchange = producer.createExchange();
+
+ // lets populate using the processor callback
+ processor.process(exchange);
+
+ // now lets dispatch
+ producer.process(exchange);
+ return exchange;
+ } catch (Exception e) {
+ throw new RuntimeCamelException(e);
+ }
}
protected void doStop() throws Exception {
Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/InterceptorBuilderTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/InterceptorBuilderTest.java?view=diff&rev=534063&r1=534062&r2=534063
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/InterceptorBuilderTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/InterceptorBuilderTest.java Tue May 1 06:17:24 2007
@@ -43,7 +43,7 @@
final DelegateProcessor<Exchange> interceptor1 = new DelegateProcessor<Exchange>() {
@Override
- public void process(Exchange exchange) {
+ public void process(Exchange exchange) throws Exception {
order.add("START:1");
super.process(exchange);
order.add("END:1");
@@ -51,7 +51,7 @@
};
final DelegateProcessor<Exchange> interceptor2 = new DelegateProcessor<Exchange>() {
@Override
- public void process(Exchange exchange) {
+ public void process(Exchange exchange) throws Exception {
order.add("START:2");
super.process(exchange);
order.add("END:2");
Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/MyInterceptorProcessor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/MyInterceptorProcessor.java?view=diff&rev=534063&r1=534062&r2=534063
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/MyInterceptorProcessor.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/MyInterceptorProcessor.java Tue May 1 06:17:24 2007
@@ -7,7 +7,7 @@
import org.apache.camel.processor.DelegateProcessor;
public class MyInterceptorProcessor extends DelegateProcessor<Exchange> {
- public void process(Exchange exchange) {
+ public void process(Exchange exchange) throws Exception {
System.out.println("START of onExchange: "+exchange);
next.process(exchange);
System.out.println("END of onExchange: "+exchange);
Modified: activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java?view=diff&rev=534063&r1=534062&r2=534063
==============================================================================
--- activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java (original)
+++ activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java Tue May 1 06:17:24 2007
@@ -61,7 +61,12 @@
}
protected void incomingCxfMessage(Message message) {
- CxfExchange exchange = endpoint.createExchange(message);
- getProcessor().process(exchange);
+ try {
+ CxfExchange exchange = endpoint.createExchange(message);
+ getProcessor().process(exchange);
+ } catch (Exception e) {
+ // TODO: what do do if we are getting processing errors from camel? Shutdown?
+ e.printStackTrace();
+ }
}
}
Modified: activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfInvokeConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfInvokeConsumer.java?view=diff&rev=534063&r1=534062&r2=534063
==============================================================================
--- activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfInvokeConsumer.java (original)
+++ activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfInvokeConsumer.java Tue May 1 06:17:24 2007
@@ -67,7 +67,12 @@
// TODO this method currently is not being called.
protected void incomingCxfMessage(Message message) {
- CxfExchange exchange = cxfEndpoint.createExchange(message);
- getProcessor().process(exchange);
+ try {
+ CxfExchange exchange = cxfEndpoint.createExchange(message);
+ getProcessor().process(exchange);
+ } catch (Exception e) {
+ // TODO: what do do if we are getting processing errors from camel? Shutdown?
+ e.printStackTrace();
+ }
}
}
Modified: activemq/camel/trunk/camel-http/src/main/java/org/apache/camel/component/http/CamelServlet.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-http/src/main/java/org/apache/camel/component/http/CamelServlet.java?view=diff&rev=534063&r1=534062&r2=534063
==============================================================================
--- activemq/camel/trunk/camel-http/src/main/java/org/apache/camel/component/http/CamelServlet.java (original)
+++ activemq/camel/trunk/camel-http/src/main/java/org/apache/camel/component/http/CamelServlet.java Tue May 1 06:17:24 2007
@@ -17,14 +17,14 @@
*/
package org.apache.camel.component.http;
-import org.apache.camel.Producer;
-import org.apache.camel.util.ProducerCache;
+import java.io.IOException;
+import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
-import javax.servlet.ServletException;
-import java.io.IOException;
+
+import org.apache.camel.util.ProducerCache;
/**
* @version $Revision$
@@ -47,14 +47,20 @@
throw new ServletException("No endpoint found for request: " + request.getRequestURI());
}
- HttpExchange exchange = endpoint.createExchange(request, response);
- producerCache.send(endpoint, exchange);
-
- // HC: The getBinding() interesting because it illustrates the impedance miss-match between
- // HTTP's stream oriented protocol, and Camels more message oriented protocol exchanges.
-
- // now lets output to the response
- endpoint.getBinding().writeResponse(exchange);
+ try {
+
+ HttpExchange exchange = endpoint.createExchange(request, response);
+ producerCache.send(endpoint, exchange);
+
+ // HC: The getBinding() interesting because it illustrates the impedance miss-match between
+ // HTTP's stream oriented protocol, and Camels more message oriented protocol exchanges.
+
+ // now lets output to the response
+ endpoint.getBinding().writeResponse(exchange);
+
+ } catch (Exception e) {
+ throw new ServletException(e);
+ }
}
protected HttpEndpoint resolveEndpoint(HttpServletRequest request, HttpServletResponse response) {
Modified: activemq/camel/trunk/camel-jbi/src/main/java/org/apache/camel/component/jbi/FromJbiProcessor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jbi/src/main/java/org/apache/camel/component/jbi/FromJbiProcessor.java?view=diff&rev=534063&r1=534062&r2=534063
==============================================================================
--- activemq/camel/trunk/camel-jbi/src/main/java/org/apache/camel/component/jbi/FromJbiProcessor.java (original)
+++ activemq/camel/trunk/camel-jbi/src/main/java/org/apache/camel/component/jbi/FromJbiProcessor.java Tue May 1 06:17:24 2007
@@ -39,7 +39,11 @@
}
public void onMessageExchange(MessageExchange messageExchange) throws MessagingException {
- JbiExchange exchange = new JbiExchange(context, binding, messageExchange);
- processor.process(exchange);
+ try {
+ JbiExchange exchange = new JbiExchange(context, binding, messageExchange);
+ processor.process(exchange);
+ } catch (Exception e) {
+ throw new MessagingException(e);
+ }
}
}
Modified: activemq/camel/trunk/camel-jbi/src/main/java/org/apache/camel/component/jbi/JbiEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jbi/src/main/java/org/apache/camel/component/jbi/JbiEndpoint.java?view=diff&rev=534063&r1=534062&r2=534063
==============================================================================
--- activemq/camel/trunk/camel-jbi/src/main/java/org/apache/camel/component/jbi/JbiEndpoint.java (original)
+++ activemq/camel/trunk/camel-jbi/src/main/java/org/apache/camel/component/jbi/JbiEndpoint.java Tue May 1 06:17:24 2007
@@ -43,7 +43,7 @@
public Producer<Exchange> createProducer() throws Exception {
return startService(new DefaultProducer<Exchange>(this) {
- public void process(Exchange exchange) {
+ public void process(Exchange exchange) throws Exception {
toJbiProcessor.process(exchange);
}
});
Modified: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java?view=diff&rev=534063&r1=534062&r2=534063
==============================================================================
--- activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java (original)
+++ activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java Tue May 1 06:17:24 2007
@@ -17,15 +17,16 @@
*/
package org.apache.camel.component.jms;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
+import org.apache.camel.RuntimeCamelException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import javax.jms.Message;
-import javax.jms.MessageListener;
-
/**
* A JMS {@link MessageListener} which can be used to delegate processing to a Camel endpoint.
*
@@ -43,11 +44,17 @@
}
public void onMessage(Message message) {
- if (log.isDebugEnabled()) {
- log.debug(endpoint + " receiving JMS message: " + message);
- }
- JmsExchange exchange = createExchange(message);
- processor.process((E) exchange);
+ try {
+
+ if (log.isDebugEnabled()) {
+ log.debug(endpoint + " receiving JMS message: " + message);
+ }
+ JmsExchange exchange = createExchange(message);
+ processor.process((E) exchange);
+
+ } catch (Exception e) {
+ throw new RuntimeCamelException(e);
+ }
}
public JmsExchange createExchange(Message message) {
Modified: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/MessageListenerProcessor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/MessageListenerProcessor.java?view=diff&rev=534063&r1=534062&r2=534063
==============================================================================
--- activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/MessageListenerProcessor.java (original)
+++ activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/MessageListenerProcessor.java Tue May 1 06:17:24 2007
@@ -19,6 +19,7 @@
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
+import org.apache.camel.RuntimeCamelException;
import javax.jms.Message;
import javax.jms.MessageListener;
@@ -39,7 +40,11 @@
}
public void onMessage(Message message) {
- Exchange exchange = endpoint.createExchange(message);
- processor.process(exchange);
+ try {
+ Exchange exchange = endpoint.createExchange(message);
+ processor.process(exchange);
+ } catch (Exception e) {
+ throw new RuntimeCamelException(e);
+ }
}
}
Modified: activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/component/jms/TransactedJmsRouteTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/component/jms/TransactedJmsRouteTest.java?view=diff&rev=534063&r1=534062&r2=534063
==============================================================================
--- activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/component/jms/TransactedJmsRouteTest.java (original)
+++ activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/component/jms/TransactedJmsRouteTest.java Tue May 1 06:17:24 2007
@@ -66,7 +66,7 @@
public Processor wrap(Processor processor) {
return new DelegateProcessor(processor) {
@Override
- public void process(Object exchange) {
+ public void process(Object exchange) throws Exception {
processNext(exchange);
throw new RuntimeException("rollback");
}
Modified: activemq/camel/trunk/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java?view=diff&rev=534063&r1=534062&r2=534063
==============================================================================
--- activemq/camel/trunk/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java (original)
+++ activemq/camel/trunk/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java Tue May 1 06:17:24 2007
@@ -67,7 +67,11 @@
if (lockEntity(result, entityManager)) {
// lets turn the result into an exchange and fire it into the processor
Exchange exchange = createExchange(result);
- getProcessor().process(exchange);
+ try {
+ getProcessor().process(exchange);
+ } catch (Exception e) {
+ throw new PersistenceException(e);
+ }
getDeleteHandler().deleteObject(entityManager, result);
}
}
Modified: activemq/camel/trunk/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java?view=diff&rev=534063&r1=534062&r2=534063
==============================================================================
--- activemq/camel/trunk/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java (original)
+++ activemq/camel/trunk/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java Tue May 1 06:17:24 2007
@@ -17,12 +17,6 @@
*/
package org.apache.camel.component.mail;
-import org.apache.camel.Consumer;
-import org.apache.camel.Processor;
-import org.apache.camel.impl.PollingConsumer;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
import javax.mail.Flags;
import javax.mail.Folder;
import javax.mail.Message;
@@ -30,7 +24,12 @@
import javax.mail.Transport;
import javax.mail.event.MessageCountEvent;
import javax.mail.event.MessageCountListener;
-import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.camel.Consumer;
+import org.apache.camel.Processor;
+import org.apache.camel.impl.PollingConsumer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
/**
* A {@link Consumer} which consumes messages from JavaMail using a {@link Transport} and dispatches them
@@ -112,8 +111,12 @@
}
protected void processMessage(Message message) {
- MailExchange exchange = endpoint.createExchange(message);
- getProcessor().process(exchange);
+ try {
+ MailExchange exchange = endpoint.createExchange(message);
+ getProcessor().process(exchange);
+ } catch (Throwable e) {
+ handleException(e);
+ }
}
protected void ensureFolderIsOpen() throws MessagingException {
Modified: activemq/camel/trunk/camel-spring/src/main/java/org/apache/camel/spring/spi/SpringTransactionPolicy.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-spring/src/main/java/org/apache/camel/spring/spi/SpringTransactionPolicy.java?view=diff&rev=534063&r1=534062&r2=534063
==============================================================================
--- activemq/camel/trunk/camel-spring/src/main/java/org/apache/camel/spring/spi/SpringTransactionPolicy.java (original)
+++ activemq/camel/trunk/camel-spring/src/main/java/org/apache/camel/spring/spi/SpringTransactionPolicy.java Tue May 1 06:17:24 2007
@@ -18,6 +18,7 @@
package org.apache.camel.spring.spi;
import org.apache.camel.Processor;
+import org.apache.camel.RuntimeCamelException;
import org.apache.camel.processor.DelegateProcessor;
import org.apache.camel.spi.Policy;
import org.apache.commons.logging.Log;
@@ -56,7 +57,11 @@
public void process(final E exchange) {
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
protected void doInTransactionWithoutResult(TransactionStatus status) {
- processNext(exchange);
+ try {
+ processNext(exchange);
+ } catch (Exception e) {
+ throw new RuntimeCamelException(e);
+ }
}
});
}
Modified: activemq/camel/trunk/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppConsumer.java?view=diff&rev=534063&r1=534062&r2=534063
==============================================================================
--- activemq/camel/trunk/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppConsumer.java (original)
+++ activemq/camel/trunk/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppConsumer.java Tue May 1 06:17:24 2007
@@ -63,7 +63,12 @@
log.debug("<<<< message: " + message.getBody());
}
XmppExchange exchange = endpoint.createExchange(message);
- getProcessor().process(exchange);
+ try {
+ getProcessor().process(exchange);
+ } catch (Exception e) {
+ // TODO: what should we do when a processing failure occurs??
+ e.printStackTrace();
+ }
}
else if (packet instanceof RosterPacket) {
RosterPacket rosterPacket = (RosterPacket) packet;