You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/04/25 15:03:36 UTC
svn commit: r1475742 - in /camel/trunk/camel-core/src:
main/java/org/apache/camel/component/browse/
main/java/org/apache/camel/component/dataset/
main/java/org/apache/camel/component/direct/
main/java/org/apache/camel/component/directvm/ main/java/org/...
Author: davsclaus
Date: Thu Apr 25 13:03:35 2013
New Revision: 1475742
URL: http://svn.apache.org/r1475742
Log:
CAMEL-6312: When endpoints create Consumer make sure to configure consume as well so we can use the consumer. prefix from uris.
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/CustomConsumerExceptionHandlerTest.java
- copied, changed from r1475700, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelAlwaysHandledTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/component/browse/BrowseEndpoint.java
camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java
camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java
camel/trunk/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmEndpoint.java
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
camel/trunk/camel-core/src/main/java/org/apache/camel/component/timer/TimerEndpoint.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultPollingEndpoint.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProcessorEndpoint.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelAlwaysHandledTest.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/browse/BrowseEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/browse/BrowseEndpoint.java?rev=1475742&r1=1475741&r2=1475742&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/browse/BrowseEndpoint.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/browse/BrowseEndpoint.java Thu Apr 25 13:03:35 2013
@@ -69,7 +69,9 @@ public class BrowseEndpoint extends Defa
}
public Consumer createConsumer(Processor processor) throws Exception {
- return new LoadBalancerConsumer(this, processor, loadBalancer);
+ Consumer answer = new LoadBalancerConsumer(this, processor, loadBalancer);
+ configureConsumer(answer);
+ return answer;
}
protected List<Exchange> createExchangeList() {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java?rev=1475742&r1=1475741&r2=1475742&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java Thu Apr 25 13:03:35 2013
@@ -71,7 +71,9 @@ public class DataSetEndpoint extends Moc
@Override
public Consumer createConsumer(Processor processor) throws Exception {
- return new DataSetConsumer(this, processor);
+ Consumer answer = new DataSetConsumer(this, processor);
+ configureConsumer(answer);
+ return answer;
}
@Override
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java?rev=1475742&r1=1475741&r2=1475742&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java Thu Apr 25 13:03:35 2013
@@ -53,7 +53,9 @@ public class DirectEndpoint extends Defa
}
public Consumer createConsumer(Processor processor) throws Exception {
- return new DirectConsumer(this, processor);
+ Consumer answer = new DirectConsumer(this, processor);
+ configureConsumer(answer);
+ return answer;
}
public boolean isSingleton() {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmEndpoint.java?rev=1475742&r1=1475741&r2=1475742&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmEndpoint.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmEndpoint.java Thu Apr 25 13:03:35 2013
@@ -42,7 +42,9 @@ public class DirectVmEndpoint extends De
@Override
public Consumer createConsumer(Processor processor) throws Exception {
- return new DirectVmConsumer(this, new DirectVmProcessor(processor, this));
+ Consumer answer = new DirectVmConsumer(this, new DirectVmProcessor(processor, this));
+ configureConsumer(answer);
+ return answer;
}
@Override
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java?rev=1475742&r1=1475741&r2=1475742&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java Thu Apr 25 13:03:35 2013
@@ -101,7 +101,9 @@ public class SedaEndpoint extends Defaul
}
public Consumer createConsumer(Processor processor) throws Exception {
- return new SedaConsumer(this, processor);
+ Consumer answer = new SedaConsumer(this, processor);
+ configureConsumer(answer);
+ return answer;
}
public synchronized BlockingQueue<Exchange> getQueue() {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/timer/TimerEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/timer/TimerEndpoint.java?rev=1475742&r1=1475741&r2=1475742&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/timer/TimerEndpoint.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/timer/TimerEndpoint.java Thu Apr 25 13:03:35 2013
@@ -69,7 +69,9 @@ public class TimerEndpoint extends Defau
}
public Consumer createConsumer(Processor processor) throws Exception {
- return new TimerConsumer(this, processor);
+ Consumer answer = new TimerConsumer(this, processor);
+ configureConsumer(answer);
+ return answer;
}
@Override
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java?rev=1475742&r1=1475741&r2=1475742&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java Thu Apr 25 13:03:35 2013
@@ -206,6 +206,7 @@ public abstract class DefaultEndpoint ex
}
public PollingConsumer createPollingConsumer() throws Exception {
+ // should not configure consumer
return new EventDrivenPollingConsumer(this);
}
@@ -259,7 +260,7 @@ public abstract class DefaultEndpoint ex
public void configureProperties(Map<String, Object> options) {
Map<String, Object> consumerProperties = IntrospectionSupport.extractProperties(options, "consumer.");
- if (consumerProperties != null) {
+ if (consumerProperties != null && !consumerProperties.isEmpty()) {
setConsumerProperties(consumerProperties);
}
}
@@ -317,11 +318,22 @@ public abstract class DefaultEndpoint ex
}
public Map<String, Object> getConsumerProperties() {
+ if (consumerProperties == null) {
+ // must create empty if none exists
+ consumerProperties = new HashMap<String, Object>();
+ }
return consumerProperties;
}
public void setConsumerProperties(Map<String, Object> consumerProperties) {
- this.consumerProperties = consumerProperties;
+ // append consumer properties
+ if (consumerProperties != null && !consumerProperties.isEmpty()) {
+ if (this.consumerProperties == null) {
+ this.consumerProperties = new HashMap<String, Object>(consumerProperties);
+ } else {
+ this.consumerProperties.putAll(consumerProperties);
+ }
+ }
}
protected void configureConsumer(Consumer consumer) throws Exception {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultPollingEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultPollingEndpoint.java?rev=1475742&r1=1475741&r2=1475742&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultPollingEndpoint.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultPollingEndpoint.java Thu Apr 25 13:03:35 2013
@@ -46,7 +46,7 @@ public abstract class DefaultPollingEndp
}
public Consumer createConsumer(Processor processor) throws Exception {
- DefaultScheduledPollConsumer result = new DefaultScheduledPollConsumer(this, processor);
+ Consumer result = new DefaultScheduledPollConsumer(this, processor);
configureConsumer(result);
return result;
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProcessorEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProcessorEndpoint.java?rev=1475742&r1=1475741&r2=1475742&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProcessorEndpoint.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProcessorEndpoint.java Thu Apr 25 13:03:35 2013
@@ -75,7 +75,9 @@ public class ProcessorEndpoint extends D
@Override
public PollingConsumer createPollingConsumer() throws Exception {
- return new ProcessorPollingConsumer(this, getProcessor());
+ PollingConsumer answer = new ProcessorPollingConsumer(this, getProcessor());
+ configureConsumer(answer);
+ return answer;
}
public Processor getProcessor() throws Exception {
Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/CustomConsumerExceptionHandlerTest.java (from r1475700, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelAlwaysHandledTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/CustomConsumerExceptionHandlerTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/CustomConsumerExceptionHandlerTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelAlwaysHandledTest.java&r1=1475700&r2=1475742&rev=1475742&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelAlwaysHandledTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/CustomConsumerExceptionHandlerTest.java Thu Apr 25 13:03:35 2013
@@ -16,33 +16,36 @@
*/
package org.apache.camel.processor;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.seda.SedaConsumer;
+import org.apache.camel.impl.JndiRegistry;
import org.apache.camel.spi.ExceptionHandler;
-public class DeadLetterChannelAlwaysHandledTest extends ContextTestSupport {
+public class CustomConsumerExceptionHandlerTest extends ContextTestSupport {
- private static final AtomicBoolean called = new AtomicBoolean();
+ private static final CountDownLatch LATCH = new CountDownLatch(1);
- public void testDeadLetterChannelAlwaysHandled() throws Exception {
- // need to set exception handler manually to work around an issue configuring from uri
- SedaConsumer seda = (SedaConsumer) context.getRoute("foo").getConsumer();
- seda.setExceptionHandler(new MyExceptionHandler());
+ @Override
+ protected JndiRegistry createRegistry() throws Exception {
+ JndiRegistry jndi = super.createRegistry();
+ jndi.bind("myHandler", new MyExceptionHandler());
+ return jndi;
+ }
+ public void testDeadLetterChannelAlwaysHandled() throws Exception {
getMockEndpoint("mock:foo").expectedMessageCount(1);
getMockEndpoint("mock:bar").expectedMessageCount(1);
- getMockEndpoint("mock:dead").expectedMessageCount(1);
getMockEndpoint("mock:result").expectedMessageCount(0);
template.sendBody("seda:foo", "Hello World");
assertMockEndpointsSatisfied();
- assertFalse("Should not have called", called.get());
+ assertTrue("Should have been called", LATCH.await(5, TimeUnit.SECONDS));
}
@Override
@@ -50,9 +53,7 @@ public class DeadLetterChannelAlwaysHand
return new RouteBuilder() {
@Override
public void configure() throws Exception {
- errorHandler(deadLetterChannel("mock:dead"));
-
- from("seda:foo?synchronous=true").routeId("foo")
+ from("seda:foo?synchronous=true&consumer.exceptionHandler=#myHandler").routeId("foo")
.to("mock:foo")
.to("direct:bar")
.to("mock:result");
@@ -69,17 +70,17 @@ public class DeadLetterChannelAlwaysHand
@Override
public void handleException(Throwable exception) {
- called.set(true);
+ LATCH.countDown();
}
@Override
public void handleException(String message, Throwable exception) {
- called.set(true);
+ LATCH.countDown();
}
@Override
public void handleException(String message, Exchange exchange, Throwable exception) {
- called.set(true);
+ LATCH.countDown();
}
}
}
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelAlwaysHandledTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelAlwaysHandledTest.java?rev=1475742&r1=1475741&r2=1475742&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelAlwaysHandledTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelAlwaysHandledTest.java Thu Apr 25 13:03:35 2013
@@ -21,18 +21,21 @@ import java.util.concurrent.atomic.Atomi
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.seda.SedaConsumer;
+import org.apache.camel.impl.JndiRegistry;
import org.apache.camel.spi.ExceptionHandler;
public class DeadLetterChannelAlwaysHandledTest extends ContextTestSupport {
- private static final AtomicBoolean called = new AtomicBoolean();
+ private static final AtomicBoolean CALLLED = new AtomicBoolean();
- public void testDeadLetterChannelAlwaysHandled() throws Exception {
- // need to set exception handler manually to work around an issue configuring from uri
- SedaConsumer seda = (SedaConsumer) context.getRoute("foo").getConsumer();
- seda.setExceptionHandler(new MyExceptionHandler());
+ @Override
+ protected JndiRegistry createRegistry() throws Exception {
+ JndiRegistry jndi = super.createRegistry();
+ jndi.bind("myHandler", new MyExceptionHandler());
+ return jndi;
+ }
+ public void testDeadLetterChannelAlwaysHandled() throws Exception {
getMockEndpoint("mock:foo").expectedMessageCount(1);
getMockEndpoint("mock:bar").expectedMessageCount(1);
getMockEndpoint("mock:dead").expectedMessageCount(1);
@@ -42,7 +45,7 @@ public class DeadLetterChannelAlwaysHand
assertMockEndpointsSatisfied();
- assertFalse("Should not have called", called.get());
+ assertFalse("Should not have called", CALLLED.get());
}
@Override
@@ -52,7 +55,7 @@ public class DeadLetterChannelAlwaysHand
public void configure() throws Exception {
errorHandler(deadLetterChannel("mock:dead"));
- from("seda:foo?synchronous=true").routeId("foo")
+ from("seda:foo?synchronous=true&consumer.exceptionHandler=#myHandler").routeId("foo")
.to("mock:foo")
.to("direct:bar")
.to("mock:result");
@@ -69,17 +72,17 @@ public class DeadLetterChannelAlwaysHand
@Override
public void handleException(Throwable exception) {
- called.set(true);
+ CALLLED.set(true);
}
@Override
public void handleException(String message, Throwable exception) {
- called.set(true);
+ CALLLED.set(true);
}
@Override
public void handleException(String message, Exchange exchange, Throwable exception) {
- called.set(true);
+ CALLLED.set(true);
}
}
}