You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2019/09/02 05:55:22 UTC
[camel] 01/02: ENTESB-11304 - failIfNoConsumers option does not
work with enabled block option
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch camel-2.24.x
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 88c54774bd3dc8fa64c8b7f12c7d9b4bd1bd8102
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Mon Sep 2 07:06:46 2019 +0200
ENTESB-11304 - failIfNoConsumers option does not work with enabled block option
---
.../directvm/DirectVmBlockingProducer.java | 24 ++++++++++++++--------
.../directvm/DirectVmProducerBlockingTest.java | 19 ++++++++++++++---
2 files changed, 31 insertions(+), 12 deletions(-)
diff --git a/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmBlockingProducer.java b/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmBlockingProducer.java
index 22d3f63..3f07cf9 100644
--- a/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmBlockingProducer.java
+++ b/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmBlockingProducer.java
@@ -26,14 +26,16 @@ import org.slf4j.LoggerFactory;
/**
* The direct producer.
* <p/>
- * If blocking is enabled ({@code DirectEndpoint#isBlock}) then the DirectEndpoint will create an instance
- * of this class instead of {@code DirectProducer}.
- * This producers {@code process} method will block for the configured duration ({@code DirectEndpoint#getTimeout},
- * default to 30 seconds). After which if a consumer is still unavailable a DirectConsumerNotAvailableException
- * will be thrown.
+ * If blocking is enabled ({@code DirectEndpoint#isBlock}) then the
+ * DirectEndpoint will create an instance of this class instead of
+ * {@code DirectProducer}. This producers {@code process} method will block for
+ * the configured duration ({@code DirectEndpoint#getTimeout}, default to 30
+ * seconds). After which if a consumer is still unavailable a
+ * DirectConsumerNotAvailableException will be thrown.
* <p/>
- * Implementation note: Concurrent Producers will block for the duration it takes to determine if a
- * consumer is available, but actual consumer execution will happen concurrently.
+ * Implementation note: Concurrent Producers will block for the duration it
+ * takes to determine if a consumer is available, but actual consumer execution
+ * will happen concurrently.
*/
public class DirectVmBlockingProducer extends DefaultAsyncProducer {
private static final Logger LOG = LoggerFactory.getLogger(DirectVmBlockingProducer.class);
@@ -62,9 +64,13 @@ public class DirectVmBlockingProducer extends DefaultAsyncProducer {
DirectVmConsumer answer = endpoint.getConsumer();
if (answer == null) {
// okay then await until we have a consumer or we timed out
- answer = awaitConsumer();
- if (answer == null) {
+ if (endpoint.isFailIfNoConsumers()) {
throw new DirectVmConsumerNotAvailableException("No consumers available on endpoint: " + endpoint, exchange);
+ } else {
+ answer = awaitConsumer();
+ if (answer == null) {
+ throw new DirectVmConsumerNotAvailableException("No consumers available on endpoint: " + endpoint, exchange);
+ }
}
}
diff --git a/camel-core/src/test/java/org/apache/camel/component/directvm/DirectVmProducerBlockingTest.java b/camel-core/src/test/java/org/apache/camel/component/directvm/DirectVmProducerBlockingTest.java
index de37563..c18de85 100644
--- a/camel-core/src/test/java/org/apache/camel/component/directvm/DirectVmProducerBlockingTest.java
+++ b/camel-core/src/test/java/org/apache/camel/component/directvm/DirectVmProducerBlockingTest.java
@@ -35,7 +35,7 @@ public class DirectVmProducerBlockingTest extends ContextTestSupport {
StopWatch watch = new StopWatch();
try {
- template.sendBody("direct-vm:suspended?block=true&timeout=500", "hello world");
+ template.sendBody("direct-vm:suspended?block=true&timeout=500&failIfNoConsumers=false", "hello world");
fail("Expected CamelExecutionException");
} catch (CamelExecutionException e) {
DirectVmConsumerNotAvailableException cause = assertIsInstanceOf(DirectVmConsumerNotAvailableException.class, e.getCause());
@@ -51,7 +51,7 @@ public class DirectVmProducerBlockingTest extends ContextTestSupport {
StopWatch watch = new StopWatch();
try {
- template.sendBody("direct-vm:start?block=true&timeout=500", "hello world");
+ template.sendBody("direct-vm:start?block=true&timeout=500&failIfNoConsumers=false", "hello world");
fail("Expected CamelExecutionException");
} catch (CamelExecutionException e) {
DirectVmConsumerNotAvailableException cause = assertIsInstanceOf(DirectVmConsumerNotAvailableException.class, e.getCause());
@@ -60,6 +60,19 @@ public class DirectVmProducerBlockingTest extends ContextTestSupport {
assertTrue(watch.taken() > 490);
}
}
+
+ public void testProducerBlocksFailIfNoConsumerFalse() throws Exception {
+ DirectVmEndpoint endpoint = getMandatoryEndpoint("direct-vm:suspended", DirectVmEndpoint.class);
+ endpoint.getConsumer().suspend();
+
+ try {
+ template.sendBody("direct-vm:start?block=true&timeout=500&failIfNoConsumers=true", "hello world");
+ fail("Expected CamelExecutionException");
+ } catch (CamelExecutionException e) {
+ DirectVmConsumerNotAvailableException cause = assertIsInstanceOf(DirectVmConsumerNotAvailableException.class, e.getCause());
+ assertIsInstanceOf(CamelExchangeException.class, cause);
+ }
+ }
@Test
public void testProducerBlocksResumeTest() throws Exception {
@@ -81,7 +94,7 @@ public class DirectVmProducerBlockingTest extends ContextTestSupport {
getMockEndpoint("mock:result").expectedMessageCount(1);
- template.sendBody("direct-vm:suspended?block=true&timeout=1000", "hello world");
+ template.sendBody("direct-vm:suspended?block=true&timeout=1000&failIfNoConsumers=false", "hello world");
assertMockEndpointsSatisfied();