You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/01/20 15:25:01 UTC
[camel] branch camel-3.7.x updated: CAMEL-16035
HazelcastQueueConsumer : do not send NULL when nothing is polled from queue
(#4901)
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch camel-3.7.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-3.7.x by this push:
new 779f554 CAMEL-16035 HazelcastQueueConsumer : do not send NULL when nothing is polled from queue (#4901)
779f554 is described below
commit 779f554e5b33eff016331f798229bb4c3879fbab
Author: Zineb BENDHIBA <be...@gmail.com>
AuthorDate: Wed Jan 20 16:23:54 2021 +0100
CAMEL-16035 HazelcastQueueConsumer : do not send NULL when nothing is polled from queue (#4901)
---
.../component/hazelcast/queue/HazelcastQueueConsumer.java | 15 +++++++++------
.../hazelcast/HazelcastCamelSpringTestSupport.java | 2 +-
.../hazelcast/HazelcastQueueConsumerPollTest.java | 11 +++++++++++
3 files changed, 21 insertions(+), 7 deletions(-)
diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueConsumer.java b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueConsumer.java
index 4fa6948..7794bda 100644
--- a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueConsumer.java
+++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueConsumer.java
@@ -84,12 +84,15 @@ public class HazelcastQueueConsumer extends HazelcastDefaultConsumer {
while (isRunAllowed()) {
try {
final Object body = queue.poll(config.getPollingTimeout(), TimeUnit.MILLISECONDS);
- Exchange exchange = getEndpoint().createExchange();
- exchange.getIn().setBody(body);
- try {
- processor.process(exchange);
- } catch (Exception e) {
- getExceptionHandler().handleException("Error during processing", exchange, e);
+ // CAMEL-16035 - If the polling timeout is exceeded with nothing to poll from the queue, the queue.poll() method return NULL
+ if (body != null) {
+ Exchange exchange = getEndpoint().createExchange();
+ exchange.getIn().setBody(body);
+ try {
+ processor.process(exchange);
+ } catch (Exception e) {
+ getExceptionHandler().handleException("Error during processing", exchange, e);
+ }
}
} catch (InterruptedException e) {
// ignore
diff --git a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastCamelSpringTestSupport.java b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastCamelSpringTestSupport.java
index 00d4ef3..faa9a90 100644
--- a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastCamelSpringTestSupport.java
+++ b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastCamelSpringTestSupport.java
@@ -32,7 +32,7 @@ public abstract class HazelcastCamelSpringTestSupport extends CamelSpringTestSup
@Override
protected CamelContext createCamelContext() throws Exception {
- MockitoAnnotations.initMocks(this);
+ MockitoAnnotations.openMocks(this);
CamelContext context = super.createCamelContext();
HazelcastCamelTestHelper.registerHazelcastComponents(context, hazelcastInstance);
trainHazelcastInstance(hazelcastInstance);
diff --git a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastQueueConsumerPollTest.java b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastQueueConsumerPollTest.java
index 558d59b..d7f3040 100644
--- a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastQueueConsumerPollTest.java
+++ b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastQueueConsumerPollTest.java
@@ -67,6 +67,17 @@ public class HazelcastQueueConsumerPollTest extends HazelcastCamelTestSupport {
this.checkHeadersAbsence(out.getExchanges().get(0).getIn().getHeaders(), HazelcastConstants.ADDED);
}
+ @Test
+ public void pollTimeout() throws InterruptedException {
+ // if nothing to poll after timeout the queue.poll returns NULL, the consumer shouldn't send this NULL message
+ when(queue.poll(10000, TimeUnit.MILLISECONDS)).thenReturn(null);
+
+ MockEndpoint out = getMockEndpoint("mock:result");
+ out.expectedMessageCount(0);
+
+ assertMockEndpointsSatisfied(2000, TimeUnit.MILLISECONDS);
+ }
+
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {