You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/04/25 16:12:37 UTC

svn commit: r1475787 - in /camel/branches/camel-2.11.x: ./ camel-core/src/main/java/org/apache/camel/component/browse/ camel-core/src/main/java/org/apache/camel/component/dataset/ camel-core/src/main/java/org/apache/camel/component/direct/ camel-core/s...

Author: davsclaus
Date: Thu Apr 25 14:12:36 2013
New Revision: 1475787

URL: http://svn.apache.org/r1475787
Log:
CAMEL-6312: When endpoints create Consumer make sure to configure consume as well so we can use the consumer. prefix from uris.

Added:
    camel/branches/camel-2.11.x/camel-core/src/test/java/org/apache/camel/processor/CustomConsumerExceptionHandlerTest.java
      - copied unchanged from r1475742, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/CustomConsumerExceptionHandlerTest.java
Modified:
    camel/branches/camel-2.11.x/   (props changed)
    camel/branches/camel-2.11.x/camel-core/src/main/java/org/apache/camel/component/browse/BrowseEndpoint.java
    camel/branches/camel-2.11.x/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java
    camel/branches/camel-2.11.x/camel-core/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java
    camel/branches/camel-2.11.x/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmEndpoint.java
    camel/branches/camel-2.11.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
    camel/branches/camel-2.11.x/camel-core/src/main/java/org/apache/camel/component/timer/TimerEndpoint.java
    camel/branches/camel-2.11.x/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java
    camel/branches/camel-2.11.x/camel-core/src/main/java/org/apache/camel/impl/DefaultPollingEndpoint.java
    camel/branches/camel-2.11.x/camel-core/src/main/java/org/apache/camel/impl/ProcessorEndpoint.java
    camel/branches/camel-2.11.x/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelAlwaysHandledTest.java

Propchange: camel/branches/camel-2.11.x/
------------------------------------------------------------------------------
  Merged /camel/trunk:r1475742

Propchange: camel/branches/camel-2.11.x/
------------------------------------------------------------------------------
--- svnmerge-integrated (original)
+++ svnmerge-integrated Thu Apr 25 14:12:36 2013
@@ -1 +1 @@
-/camel/trunk:1-1468763,1469704,1469819,1470420,1470426-1470427,1470429,1470508,1471293,1471330,1471407-1471408,1471543,1475657,1475677
+/camel/trunk:1-1468763,1469704,1469819,1470420,1470426-1470427,1470429,1470508,1471293,1471330,1471407-1471408,1471543,1475657,1475677,1475742

Modified: camel/branches/camel-2.11.x/camel-core/src/main/java/org/apache/camel/component/browse/BrowseEndpoint.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.11.x/camel-core/src/main/java/org/apache/camel/component/browse/BrowseEndpoint.java?rev=1475787&r1=1475786&r2=1475787&view=diff
==============================================================================
--- camel/branches/camel-2.11.x/camel-core/src/main/java/org/apache/camel/component/browse/BrowseEndpoint.java (original)
+++ camel/branches/camel-2.11.x/camel-core/src/main/java/org/apache/camel/component/browse/BrowseEndpoint.java Thu Apr 25 14:12:36 2013
@@ -69,7 +69,9 @@ public class BrowseEndpoint extends Defa
     }
 
     public Consumer createConsumer(Processor processor) throws Exception {
-        return new LoadBalancerConsumer(this, processor, loadBalancer);
+        Consumer answer = new LoadBalancerConsumer(this, processor, loadBalancer);
+        configureConsumer(answer);
+        return answer;
     }
 
     protected List<Exchange> createExchangeList() {

Modified: camel/branches/camel-2.11.x/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.11.x/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java?rev=1475787&r1=1475786&r2=1475787&view=diff
==============================================================================
--- camel/branches/camel-2.11.x/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java (original)
+++ camel/branches/camel-2.11.x/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java Thu Apr 25 14:12:36 2013
@@ -71,7 +71,9 @@ public class DataSetEndpoint extends Moc
 
     @Override
     public Consumer createConsumer(Processor processor) throws Exception {
-        return new DataSetConsumer(this, processor);
+        Consumer answer = new DataSetConsumer(this, processor);
+        configureConsumer(answer);
+        return answer;
     }
 
     @Override

Modified: camel/branches/camel-2.11.x/camel-core/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.11.x/camel-core/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java?rev=1475787&r1=1475786&r2=1475787&view=diff
==============================================================================
--- camel/branches/camel-2.11.x/camel-core/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java (original)
+++ camel/branches/camel-2.11.x/camel-core/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java Thu Apr 25 14:12:36 2013
@@ -53,7 +53,9 @@ public class DirectEndpoint extends Defa
     }
 
     public Consumer createConsumer(Processor processor) throws Exception {
-        return new DirectConsumer(this, processor);
+        Consumer answer = new DirectConsumer(this, processor);
+        configureConsumer(answer);
+        return answer;
     }
 
     public boolean isSingleton() {

Modified: camel/branches/camel-2.11.x/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmEndpoint.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.11.x/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmEndpoint.java?rev=1475787&r1=1475786&r2=1475787&view=diff
==============================================================================
--- camel/branches/camel-2.11.x/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmEndpoint.java (original)
+++ camel/branches/camel-2.11.x/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmEndpoint.java Thu Apr 25 14:12:36 2013
@@ -42,7 +42,9 @@ public class DirectVmEndpoint extends De
 
     @Override
     public Consumer createConsumer(Processor processor) throws Exception {
-        return new DirectVmConsumer(this, new DirectVmProcessor(processor, this));
+        Consumer answer = new DirectVmConsumer(this, new DirectVmProcessor(processor, this));
+        configureConsumer(answer);
+        return answer;
     }
 
     @Override

Modified: camel/branches/camel-2.11.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.11.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java?rev=1475787&r1=1475786&r2=1475787&view=diff
==============================================================================
--- camel/branches/camel-2.11.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java (original)
+++ camel/branches/camel-2.11.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java Thu Apr 25 14:12:36 2013
@@ -94,7 +94,9 @@ public class SedaEndpoint extends Defaul
     }
 
     public Consumer createConsumer(Processor processor) throws Exception {
-        return new SedaConsumer(this, processor);
+        Consumer answer = new SedaConsumer(this, processor);
+        configureConsumer(answer);
+        return answer;
     }
 
     public synchronized BlockingQueue<Exchange> getQueue() {

Modified: camel/branches/camel-2.11.x/camel-core/src/main/java/org/apache/camel/component/timer/TimerEndpoint.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.11.x/camel-core/src/main/java/org/apache/camel/component/timer/TimerEndpoint.java?rev=1475787&r1=1475786&r2=1475787&view=diff
==============================================================================
--- camel/branches/camel-2.11.x/camel-core/src/main/java/org/apache/camel/component/timer/TimerEndpoint.java (original)
+++ camel/branches/camel-2.11.x/camel-core/src/main/java/org/apache/camel/component/timer/TimerEndpoint.java Thu Apr 25 14:12:36 2013
@@ -58,7 +58,9 @@ public class TimerEndpoint extends Defau
     }
 
     public Consumer createConsumer(Processor processor) throws Exception {
-        return new TimerConsumer(this, processor);
+        Consumer answer = new TimerConsumer(this, processor);
+        configureConsumer(answer);
+        return answer;
     }
 
     @Override

Modified: camel/branches/camel-2.11.x/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.11.x/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java?rev=1475787&r1=1475786&r2=1475787&view=diff
==============================================================================
--- camel/branches/camel-2.11.x/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java (original)
+++ camel/branches/camel-2.11.x/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java Thu Apr 25 14:12:36 2013
@@ -206,6 +206,7 @@ public abstract class DefaultEndpoint ex
     }
 
     public PollingConsumer createPollingConsumer() throws Exception {
+        // should not configure consumer
         return new EventDrivenPollingConsumer(this);
     }
 
@@ -259,7 +260,7 @@ public abstract class DefaultEndpoint ex
 
     public void configureProperties(Map<String, Object> options) {
         Map<String, Object> consumerProperties = IntrospectionSupport.extractProperties(options, "consumer.");
-        if (consumerProperties != null) {
+        if (consumerProperties != null && !consumerProperties.isEmpty()) {
             setConsumerProperties(consumerProperties);
         }
     }
@@ -317,11 +318,22 @@ public abstract class DefaultEndpoint ex
     }
 
     public Map<String, Object> getConsumerProperties() {
+        if (consumerProperties == null) {
+            // must create empty if none exists
+            consumerProperties = new HashMap<String, Object>();
+        }
         return consumerProperties;
     }
 
     public void setConsumerProperties(Map<String, Object> consumerProperties) {
-        this.consumerProperties = consumerProperties;
+        // append consumer properties
+        if (consumerProperties != null && !consumerProperties.isEmpty()) {
+            if (this.consumerProperties == null) {
+                this.consumerProperties = new HashMap<String, Object>(consumerProperties);
+            } else {
+                this.consumerProperties.putAll(consumerProperties);
+            }
+        }
     }
 
     protected void configureConsumer(Consumer consumer) throws Exception {

Modified: camel/branches/camel-2.11.x/camel-core/src/main/java/org/apache/camel/impl/DefaultPollingEndpoint.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.11.x/camel-core/src/main/java/org/apache/camel/impl/DefaultPollingEndpoint.java?rev=1475787&r1=1475786&r2=1475787&view=diff
==============================================================================
--- camel/branches/camel-2.11.x/camel-core/src/main/java/org/apache/camel/impl/DefaultPollingEndpoint.java (original)
+++ camel/branches/camel-2.11.x/camel-core/src/main/java/org/apache/camel/impl/DefaultPollingEndpoint.java Thu Apr 25 14:12:36 2013
@@ -46,7 +46,7 @@ public abstract class DefaultPollingEndp
     }
 
     public Consumer createConsumer(Processor processor) throws Exception {
-        DefaultScheduledPollConsumer result = new DefaultScheduledPollConsumer(this, processor);
+        Consumer result = new DefaultScheduledPollConsumer(this, processor);
         configureConsumer(result);
         return result;
     }

Modified: camel/branches/camel-2.11.x/camel-core/src/main/java/org/apache/camel/impl/ProcessorEndpoint.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.11.x/camel-core/src/main/java/org/apache/camel/impl/ProcessorEndpoint.java?rev=1475787&r1=1475786&r2=1475787&view=diff
==============================================================================
--- camel/branches/camel-2.11.x/camel-core/src/main/java/org/apache/camel/impl/ProcessorEndpoint.java (original)
+++ camel/branches/camel-2.11.x/camel-core/src/main/java/org/apache/camel/impl/ProcessorEndpoint.java Thu Apr 25 14:12:36 2013
@@ -75,7 +75,9 @@ public class ProcessorEndpoint extends D
 
     @Override
     public PollingConsumer createPollingConsumer() throws Exception {
-        return new ProcessorPollingConsumer(this, getProcessor());
+        PollingConsumer answer = new ProcessorPollingConsumer(this, getProcessor());
+        configureConsumer(answer);
+        return answer;
     }
 
     public Processor getProcessor() throws Exception {

Modified: camel/branches/camel-2.11.x/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelAlwaysHandledTest.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.11.x/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelAlwaysHandledTest.java?rev=1475787&r1=1475786&r2=1475787&view=diff
==============================================================================
--- camel/branches/camel-2.11.x/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelAlwaysHandledTest.java (original)
+++ camel/branches/camel-2.11.x/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelAlwaysHandledTest.java Thu Apr 25 14:12:36 2013
@@ -21,18 +21,21 @@ import java.util.concurrent.atomic.Atomi
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.seda.SedaConsumer;
+import org.apache.camel.impl.JndiRegistry;
 import org.apache.camel.spi.ExceptionHandler;
 
 public class DeadLetterChannelAlwaysHandledTest extends ContextTestSupport {
 
-    private static final AtomicBoolean called = new AtomicBoolean();
+    private static final AtomicBoolean CALLLED = new AtomicBoolean();
 
-    public void testDeadLetterChannelAlwaysHandled() throws Exception {
-        // need to set exception handler manually to work around an issue configuring from uri
-        SedaConsumer seda = (SedaConsumer) context.getRoute("foo").getConsumer();
-        seda.setExceptionHandler(new MyExceptionHandler());
+    @Override
+    protected JndiRegistry createRegistry() throws Exception {
+        JndiRegistry jndi = super.createRegistry();
+        jndi.bind("myHandler", new MyExceptionHandler());
+        return jndi;
+    }
 
+    public void testDeadLetterChannelAlwaysHandled() throws Exception {
         getMockEndpoint("mock:foo").expectedMessageCount(1);
         getMockEndpoint("mock:bar").expectedMessageCount(1);
         getMockEndpoint("mock:dead").expectedMessageCount(1);
@@ -42,7 +45,7 @@ public class DeadLetterChannelAlwaysHand
 
         assertMockEndpointsSatisfied();
 
-        assertFalse("Should not have called", called.get());
+        assertFalse("Should not have called", CALLLED.get());
     }
 
     @Override
@@ -52,7 +55,7 @@ public class DeadLetterChannelAlwaysHand
             public void configure() throws Exception {
                 errorHandler(deadLetterChannel("mock:dead"));
 
-                from("seda:foo?synchronous=true").routeId("foo")
+                from("seda:foo?synchronous=true&consumer.exceptionHandler=#myHandler").routeId("foo")
                     .to("mock:foo")
                     .to("direct:bar")
                     .to("mock:result");
@@ -69,17 +72,17 @@ public class DeadLetterChannelAlwaysHand
 
         @Override
         public void handleException(Throwable exception) {
-            called.set(true);
+            CALLLED.set(true);
         }
 
         @Override
         public void handleException(String message, Throwable exception) {
-            called.set(true);
+            CALLLED.set(true);
         }
 
         @Override
         public void handleException(String message, Exchange exchange, Throwable exception) {
-            called.set(true);
+            CALLLED.set(true);
         }
     }
 }