You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by bo...@apache.org on 2015/04/01 20:26:05 UTC

camel git commit: CAMEL-7905 added failIfNoConsumers option to the direct component

Repository: camel
Updated Branches:
  refs/heads/master 79f60617f -> 4a533134d


CAMEL-7905 added failIfNoConsumers option to the direct component


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/4a533134
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/4a533134
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/4a533134

Branch: refs/heads/master
Commit: 4a533134de44be2f2a88d9a84dfc584958dae204
Parents: 79f6061
Author: boday <bo...@apache.org>
Authored: Wed Apr 1 11:19:19 2015 -0700
Committer: boday <bo...@apache.org>
Committed: Wed Apr 1 11:19:19 2015 -0700

----------------------------------------------------------------------
 .../camel/component/direct/DirectEndpoint.java  |  13 ++
 .../camel/component/direct/DirectProducer.java  |  18 +-
 .../component/direct/DirectNoConsumerTest.java  | 168 +++++++++++++++++++
 3 files changed, 195 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/4a533134/camel-core/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java b/camel-core/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java
index 645224c..52925e7 100644
--- a/camel-core/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java
+++ b/camel-core/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java
@@ -48,6 +48,8 @@ public class DirectEndpoint extends DefaultEndpoint {
     private boolean block;
     @UriParam(label = "producer", defaultValue = "30000")
     private long timeout = 30000L;
+    @UriParam(label = "producer")
+    private boolean failIfNoConsumers = true;
 
     public DirectEndpoint() {
         this.consumers = new HashMap<String, DirectConsumer>();
@@ -125,6 +127,17 @@ public class DirectEndpoint extends DefaultEndpoint {
         this.timeout = timeout;
     }
 
+    public boolean isFailIfNoConsumers() {
+        return failIfNoConsumers;
+    }
+
+    /**
+     * Whether the producer should fail by throwing an exception, when sending to a DIRECT endpoint with no active consumers.
+     */
+    public void setFailIfNoConsumers(boolean failIfNoConsumers) {
+        this.failIfNoConsumers = failIfNoConsumers;
+    }
+
     protected String getKey() {
         String uri = getEndpointUri();
         if (uri.indexOf('?') != -1) {

http://git-wip-us.apache.org/repos/asf/camel/blob/4a533134/camel-core/src/main/java/org/apache/camel/component/direct/DirectProducer.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/direct/DirectProducer.java b/camel-core/src/main/java/org/apache/camel/component/direct/DirectProducer.java
index 8a60d30..234268e 100644
--- a/camel-core/src/main/java/org/apache/camel/component/direct/DirectProducer.java
+++ b/camel-core/src/main/java/org/apache/camel/component/direct/DirectProducer.java
@@ -19,13 +19,15 @@ package org.apache.camel.component.direct;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.impl.DefaultAsyncProducer;
-
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 /**
  * The direct producer.
  *
  * @version 
  */
 public class DirectProducer extends DefaultAsyncProducer {
+    private static final transient Logger LOG = LoggerFactory.getLogger(DirectProducer.class);
     private final DirectEndpoint endpoint;
 
     public DirectProducer(DirectEndpoint endpoint) {
@@ -35,7 +37,11 @@ public class DirectProducer extends DefaultAsyncProducer {
 
     public void process(Exchange exchange) throws Exception {
         if (endpoint.getConsumer() == null) {
-            throw new DirectConsumerNotAvailableException("No consumers available on endpoint: " + endpoint, exchange);
+            if (endpoint.isFailIfNoConsumers()) {
+                throw new DirectConsumerNotAvailableException("No consumers available on endpoint: " + endpoint, exchange);
+            } else {
+                log.debug("message ignored, no consumers available on endpoint: " + endpoint);
+            }
         } else {
             endpoint.getConsumer().getProcessor().process(exchange);
         }
@@ -43,8 +49,12 @@ public class DirectProducer extends DefaultAsyncProducer {
 
     public boolean process(Exchange exchange, AsyncCallback callback) {
         if (endpoint.getConsumer() == null) {
-            // indicate its done synchronously
-            exchange.setException(new DirectConsumerNotAvailableException("No consumers available on endpoint: " + endpoint, exchange));
+            if (endpoint.isFailIfNoConsumers()) {
+                // indicate its done synchronously
+                exchange.setException(new DirectConsumerNotAvailableException("No consumers available on endpoint: " + endpoint, exchange));
+            } else {
+                log.debug("message ignored, no consumers available on endpoint: " + endpoint);
+            }
             callback.done(true);
             return true;
         } else {

http://git-wip-us.apache.org/repos/asf/camel/blob/4a533134/camel-core/src/test/java/org/apache/camel/component/direct/DirectNoConsumerTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/component/direct/DirectNoConsumerTest.java b/camel-core/src/test/java/org/apache/camel/component/direct/DirectNoConsumerTest.java
new file mode 100644
index 0000000..a756ea6
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/component/direct/DirectNoConsumerTest.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.direct;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.Test;
+
+/**
+ * @version 
+ */
+public class DirectNoConsumerTest extends ContextTestSupport {
+
+    @Override
+    public boolean isUseRouteBuilder() {
+        return false;
+    }
+
+    public void testInOnly() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start").to("direct:foo");
+            }
+        });
+
+        context.start();
+
+        try {
+            template.sendBody("direct:start", "Hello World");
+            fail("Should throw an exception");
+        } catch (CamelExecutionException e) {
+            assertIsInstanceOf(DirectConsumerNotAvailableException.class, e.getCause());
+        }
+    }
+
+    public void testInOut() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start").to("direct:foo");
+            }
+        });
+
+        context.start();
+
+        try {
+            template.requestBody("direct:start", "Hello World");
+            fail("Should throw an exception");
+        } catch (CamelExecutionException e) {
+            assertIsInstanceOf(DirectConsumerNotAvailableException.class, e.getCause());
+        }
+    }
+
+    @Test
+    public void testFailIfNoConsumerFalse() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start").to("direct:foo?failIfNoConsumers=false");
+            }
+        });
+
+        context.start();
+
+        try {
+            template.sendBody("direct:start", "Hello World");
+        } catch (CamelExecutionException e) {
+            fail("Should not throw an exception");
+        }
+    }
+
+    @Test
+    public void testFailIfNoConsumersAfterConsumersLeave() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:foo").routeId("stopThisRoute").to("mock:foo");
+            }
+        });
+
+        context.start();
+
+        getMockEndpoint("mock:foo").expectedBodiesReceived("Hello World");
+
+        template.sendBody("direct:foo", "Hello World");
+
+        assertMockEndpointsSatisfied();
+
+        context.stopRoute("stopThisRoute");
+        TimeUnit.MILLISECONDS.sleep(100);
+        try {
+            template.sendBody("direct:foo", "Hello World");
+            fail("Should throw an exception");
+        } catch (CamelExecutionException e) {
+            assertIsInstanceOf(DirectConsumerNotAvailableException.class, e.getCause());
+        }
+    }
+
+    @Test
+    public void testFailIfNoConsumersWithValidConsumer() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:in").to("direct:foo");
+                from("direct:foo").to("mock:foo");
+            }
+        });
+
+        context.start();
+
+        getMockEndpoint("mock:foo").expectedBodiesReceived("Hello World");
+
+        template.sendBody("direct:in", "Hello World");
+
+        assertMockEndpointsSatisfied();
+
+    }
+
+    @Test
+    public void testFailIfNoConsumersFalseWithPipeline() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:in").to("direct:foo?failIfNoConsumers=false").to("direct:bar");
+                from("direct:bar").to("mock:foo");
+            }
+        });
+
+        context.start();
+
+        getMockEndpoint("mock:foo").expectedBodiesReceived("Hello World");
+
+        template.sendBody("direct:in", "Hello World");
+
+        assertMockEndpointsSatisfied();
+
+    }
+
+    @Test
+    public void testConfigOnAConsumer() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:foo?failIfNoConsumers=false").to("log:test");
+            }
+        });
+
+        context.start();
+    }
+
+}