You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by bo...@apache.org on 2015/04/01 21:48:02 UTC
camel git commit: CAMEL-7905 added failIfNoConsumers option to the
direct-vm component
Repository: camel
Updated Branches:
refs/heads/master 4a533134d -> b3ebc297f
CAMEL-7905 added failIfNoConsumers option to the direct-vm component
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/b3ebc297
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/b3ebc297
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/b3ebc297
Branch: refs/heads/master
Commit: b3ebc297fb94e1791ba72213d9ebea10c37477d3
Parents: 4a53313
Author: boday <bo...@apache.org>
Authored: Wed Apr 1 12:41:59 2015 -0700
Committer: boday <bo...@apache.org>
Committed: Wed Apr 1 12:46:53 2015 -0700
----------------------------------------------------------------------
.../component/directvm/DirectVmEndpoint.java | 14 ++
.../component/directvm/DirectVmProducer.java | 12 +-
.../directvm/DirectVmNoConsumerTest.java | 168 +++++++++++++++++++
3 files changed, 192 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/b3ebc297/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmEndpoint.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmEndpoint.java b/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmEndpoint.java
index b616f67..8489c6c 100644
--- a/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmEndpoint.java
+++ b/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmEndpoint.java
@@ -39,6 +39,8 @@ public class DirectVmEndpoint extends DefaultEndpoint {
private boolean block;
@UriParam(label = "producer", defaultValue = "30000")
private long timeout = 30000L;
+ @UriParam(label = "producer")
+ private boolean failIfNoConsumers = true;
public DirectVmEndpoint(String endpointUri, DirectVmComponent component) {
super(endpointUri, component);
@@ -96,4 +98,16 @@ public class DirectVmEndpoint extends DefaultEndpoint {
public void setTimeout(long timeout) {
this.timeout = timeout;
}
+
+ public boolean isFailIfNoConsumers() {
+ return failIfNoConsumers;
+ }
+
+ /**
+ * Whether the producer should fail by throwing an exception, when sending to a DIRECT-VM endpoint with no active consumers.
+ */
+ public void setFailIfNoConsumers(boolean failIfNoConsumers) {
+ this.failIfNoConsumers = failIfNoConsumers;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/camel/blob/b3ebc297/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmProducer.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmProducer.java b/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmProducer.java
index 14a2138..32fb395 100644
--- a/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmProducer.java
+++ b/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmProducer.java
@@ -37,7 +37,11 @@ public class DirectVmProducer extends DefaultAsyncProducer {
// send to consumer
DirectVmConsumer consumer = endpoint.getComponent().getConsumer(endpoint);
if (consumer == null) {
- throw new DirectVmConsumerNotAvailableException("No consumers available on endpoint: " + endpoint, exchange);
+ if (endpoint.isFailIfNoConsumers()) {
+ throw new DirectVmConsumerNotAvailableException("No consumers available on endpoint: " + endpoint, exchange);
+ } else {
+ log.debug("message ignored, no consumers available on endpoint: " + endpoint);
+ }
} else {
consumer.getProcessor().process(exchange);
}
@@ -48,7 +52,11 @@ public class DirectVmProducer extends DefaultAsyncProducer {
// send to consumer
DirectVmConsumer consumer = endpoint.getComponent().getConsumer(endpoint);
if (consumer == null) {
- exchange.setException(new DirectVmConsumerNotAvailableException("No consumers available on endpoint: " + endpoint, exchange));
+ if (endpoint.isFailIfNoConsumers()) {
+ exchange.setException(new DirectVmConsumerNotAvailableException("No consumers available on endpoint: " + endpoint, exchange));
+ } else {
+ log.debug("message ignored, no consumers available on endpoint: " + endpoint);
+ }
callback.done(true);
return true;
} else {
http://git-wip-us.apache.org/repos/asf/camel/blob/b3ebc297/camel-core/src/test/java/org/apache/camel/component/directvm/DirectVmNoConsumerTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/component/directvm/DirectVmNoConsumerTest.java b/camel-core/src/test/java/org/apache/camel/component/directvm/DirectVmNoConsumerTest.java
new file mode 100644
index 0000000..ae97899
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/component/directvm/DirectVmNoConsumerTest.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.directvm;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.Test;
+
+/**
+ * @version
+ */
+public class DirectVmNoConsumerTest extends ContextTestSupport {
+
+ @Override
+ public boolean isUseRouteBuilder() {
+ return false;
+ }
+
+ public void testInOnly() throws Exception {
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct-vm:start").to("direct-vm:foo");
+ }
+ });
+
+ context.start();
+
+ try {
+ template.sendBody("direct-vm:start", "Hello World");
+ fail("Should throw an exception");
+ } catch (CamelExecutionException e) {
+ assertIsInstanceOf(DirectVmConsumerNotAvailableException.class, e.getCause());
+ }
+ }
+
+ public void testInOut() throws Exception {
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct-vm:start").to("direct-vm:foo");
+ }
+ });
+
+ context.start();
+
+ try {
+ template.requestBody("direct-vm:start", "Hello World");
+ fail("Should throw an exception");
+ } catch (CamelExecutionException e) {
+ assertIsInstanceOf(DirectVmConsumerNotAvailableException.class, e.getCause());
+ }
+ }
+
+ @Test
+ public void testFailIfNoConsumerFalse() throws Exception {
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct-vm:start").to("direct-vm:foo?failIfNoConsumers=false");
+ }
+ });
+
+ context.start();
+
+ try {
+ template.sendBody("direct-vm:start", "Hello World");
+ } catch (CamelExecutionException e) {
+ fail("Should not throw an exception");
+ }
+ }
+
+ @Test
+ public void testFailIfNoConsumersAfterConsumersLeave() throws Exception {
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct-vm:foo").routeId("stopThisRoute").to("mock:foo");
+ }
+ });
+
+ context.start();
+
+ getMockEndpoint("mock:foo").expectedBodiesReceived("Hello World");
+
+ template.sendBody("direct-vm:foo", "Hello World");
+
+ assertMockEndpointsSatisfied();
+
+ context.stopRoute("stopThisRoute");
+ TimeUnit.MILLISECONDS.sleep(100);
+ try {
+ template.sendBody("direct-vm:foo", "Hello World");
+ fail("Should throw an exception");
+ } catch (CamelExecutionException e) {
+ assertIsInstanceOf(DirectVmConsumerNotAvailableException.class, e.getCause());
+ }
+ }
+
+ @Test
+ public void testFailIfNoConsumersWithValidConsumer() throws Exception {
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct-vm:in").to("direct-vm:foo");
+ from("direct-vm:foo").to("mock:foo");
+ }
+ });
+
+ context.start();
+
+ getMockEndpoint("mock:foo").expectedBodiesReceived("Hello World");
+
+ template.sendBody("direct-vm:in", "Hello World");
+
+ assertMockEndpointsSatisfied();
+
+ }
+
+ @Test
+ public void testFailIfNoConsumersFalseWithPipeline() throws Exception {
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct-vm:in").to("direct-vm:foo?failIfNoConsumers=false").to("direct-vm:bar");
+ from("direct-vm:bar").to("mock:foo");
+ }
+ });
+
+ context.start();
+
+ getMockEndpoint("mock:foo").expectedBodiesReceived("Hello World");
+
+ template.sendBody("direct-vm:in", "Hello World");
+
+ assertMockEndpointsSatisfied();
+
+ }
+
+ @Test
+ public void testConfigOnAConsumer() throws Exception {
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct-vm:foo?failIfNoConsumers=false").to("log:test");
+ }
+ });
+
+ context.start();
+ }
+
+}