You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/01/23 09:20:56 UTC
[camel] branch camel-3.7.x updated: CAMEL-16018:
HazelcastReplicatedConsumer not receiving events (#4868) (#4912)
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch camel-3.7.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-3.7.x by this push:
new 5ef8f79 CAMEL-16018: HazelcastReplicatedConsumer not receiving events (#4868) (#4912)
5ef8f79 is described below
commit 5ef8f79475db97e517f2d9bc41622a8e80a53164
Author: Zineb BENDHIBA <be...@gmail.com>
AuthorDate: Sat Jan 23 10:20:23 2021 +0100
CAMEL-16018: HazelcastReplicatedConsumer not receiving events (#4868) (#4912)
---
.../hazelcast/HazelcastComponentHelper.java | 4 +-
.../HazelcastAtomicnumberProducer.java | 10 ++--
.../hazelcast/list/HazelcastListProducer.java | 2 +-
.../hazelcast/map/HazelcastMapProducer.java | 16 +++---
.../multimap/HazelcastMultimapProducer.java | 8 +--
.../hazelcast/queue/HazelcastQueueProducer.java | 14 ++---
.../HazelcastReplicatedmapConsumer.java | 2 +-
.../ringbuffer/HazelcastRingbufferProducer.java | 14 ++---
.../hazelcast/HazelcastCamelTestSupport.java | 2 +-
.../HazelcastReplicatedmapConsumerTest.java | 64 +++++++++-------------
10 files changed, 63 insertions(+), 73 deletions(-)
diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastComponentHelper.java b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastComponentHelper.java
index e3139a9..4683c61 100644
--- a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastComponentHelper.java
+++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastComponentHelper.java
@@ -41,8 +41,8 @@ public final class HazelcastComponentHelper {
}
// propagate headers if OUT message created
- if (ex.hasOut()) {
- ex.getOut().setHeaders(headers);
+ if (ex.getMessage() != null) {
+ ex.getMessage().setHeaders(headers);
}
}
diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/atomicnumber/HazelcastAtomicnumberProducer.java b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/atomicnumber/HazelcastAtomicnumberProducer.java
index bea9f87..5761f39 100644
--- a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/atomicnumber/HazelcastAtomicnumberProducer.java
+++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/atomicnumber/HazelcastAtomicnumberProducer.java
@@ -92,15 +92,15 @@ public class HazelcastAtomicnumberProducer extends HazelcastDefaultProducer {
}
private void get(Exchange exchange) {
- exchange.getOut().setBody(this.atomicnumber.get());
+ exchange.getMessage().setBody(this.atomicnumber.get());
}
private void increment(Exchange exchange) {
- exchange.getOut().setBody(this.atomicnumber.incrementAndGet());
+ exchange.getMessage().setBody(this.atomicnumber.incrementAndGet());
}
private void decrement(Exchange exchange) {
- exchange.getOut().setBody(this.atomicnumber.decrementAndGet());
+ exchange.getMessage().setBody(this.atomicnumber.decrementAndGet());
}
private void compare(long expected, Exchange exchange) {
@@ -108,12 +108,12 @@ public class HazelcastAtomicnumberProducer extends HazelcastDefaultProducer {
if (ObjectHelper.isEmpty(expected)) {
throw new IllegalArgumentException("Expected value must be specified");
}
- exchange.getOut().setBody(this.atomicnumber.compareAndSet(expected, update));
+ exchange.getMessage().setBody(this.atomicnumber.compareAndSet(expected, update));
}
private void getAndAdd(Exchange exchange) {
long delta = exchange.getIn().getBody(Long.class);
- exchange.getOut().setBody(this.atomicnumber.getAndAdd(delta));
+ exchange.getMessage().setBody(this.atomicnumber.getAndAdd(delta));
}
private void set(Exchange exchange) {
diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/list/HazelcastListProducer.java b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/list/HazelcastListProducer.java
index db57ab6..8044083 100644
--- a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/list/HazelcastListProducer.java
+++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/list/HazelcastListProducer.java
@@ -114,7 +114,7 @@ public class HazelcastListProducer extends HazelcastDefaultProducer {
}
private void get(Integer pos, Exchange exchange) {
- exchange.getOut().setBody(this.list.get(pos));
+ exchange.getMessage().setBody(this.list.get(pos));
}
private void set(Integer pos, Exchange exchange) {
diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/map/HazelcastMapProducer.java b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/map/HazelcastMapProducer.java
index 6ad84f9..a54246a 100644
--- a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/map/HazelcastMapProducer.java
+++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/map/HazelcastMapProducer.java
@@ -128,7 +128,7 @@ public class HazelcastMapProducer extends HazelcastDefaultProducer {
break;
case CLEAR:
- this.clear(exchange);
+ this.clear();
break;
case EVICT:
@@ -159,7 +159,7 @@ public class HazelcastMapProducer extends HazelcastDefaultProducer {
} else {
result = this.cache.values();
}
- exchange.getOut().setBody(result);
+ exchange.getMessage().setBody(result);
}
/**
@@ -193,14 +193,14 @@ public class HazelcastMapProducer extends HazelcastDefaultProducer {
* find an object by the given id and give it back
*/
private void get(Object oid, Exchange exchange) {
- exchange.getOut().setBody(this.cache.get(oid));
+ exchange.getMessage().setBody(this.cache.get(oid));
}
/**
* GET All objects and give it back
*/
private void getAll(Object oid, Exchange exchange) {
- exchange.getOut().setBody(this.cache.getAll((Set<Object>) oid));
+ exchange.getMessage().setBody(this.cache.getAll((Set<Object>) oid));
}
/**
@@ -239,7 +239,7 @@ public class HazelcastMapProducer extends HazelcastDefaultProducer {
/**
* Clear all the entries
*/
- private void clear(Exchange exchange) {
+ private void clear() {
this.cache.clear();
}
@@ -261,7 +261,7 @@ public class HazelcastMapProducer extends HazelcastDefaultProducer {
* Check for a specific key in the cache and return true if it exists or false otherwise
*/
private void containsKey(Object oid, Exchange exchange) {
- exchange.getOut().setBody(this.cache.containsKey(oid));
+ exchange.getMessage().setBody(this.cache.containsKey(oid));
}
/**
@@ -269,13 +269,13 @@ public class HazelcastMapProducer extends HazelcastDefaultProducer {
*/
private void containsValue(Exchange exchange) {
Object body = exchange.getIn().getBody();
- exchange.getOut().setBody(this.cache.containsValue(body));
+ exchange.getMessage().setBody(this.cache.containsValue(body));
}
/**
* GET keys set of objects and give it back
*/
private void getKeys(Exchange exchange) {
- exchange.getOut().setBody(this.cache.keySet());
+ exchange.getMessage().setBody(this.cache.keySet());
}
}
diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/multimap/HazelcastMultimapProducer.java b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/multimap/HazelcastMultimapProducer.java
index f736591..386dc51 100644
--- a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/multimap/HazelcastMultimapProducer.java
+++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/multimap/HazelcastMultimapProducer.java
@@ -99,7 +99,7 @@ public class HazelcastMultimapProducer extends HazelcastDefaultProducer {
}
private void get(Object oid, Exchange exchange) {
- exchange.getOut().setBody(this.cache.get(oid));
+ exchange.getMessage().setBody(this.cache.get(oid));
}
private void delete(Object oid) {
@@ -111,7 +111,7 @@ public class HazelcastMultimapProducer extends HazelcastDefaultProducer {
}
private void valuecount(Object oid, Exchange exchange) {
- exchange.getOut().setBody(this.cache.valueCount(oid));
+ exchange.getMessage().setBody(this.cache.valueCount(oid));
}
private void clear(Exchange exchange) {
@@ -119,11 +119,11 @@ public class HazelcastMultimapProducer extends HazelcastDefaultProducer {
}
private void containsKey(Object oid, Exchange exchange) {
- exchange.getOut().setBody(this.cache.containsKey(oid));
+ exchange.getMessage().setBody(this.cache.containsKey(oid));
}
private void containsValue(Exchange exchange) {
Object body = exchange.getIn().getBody();
- exchange.getOut().setBody(this.cache.containsValue(body));
+ exchange.getMessage().setBody(this.cache.containsValue(body));
}
}
diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueProducer.java b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueProducer.java
index 2e86417..3113acf 100644
--- a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueProducer.java
+++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueProducer.java
@@ -127,11 +127,11 @@ public class HazelcastQueueProducer extends HazelcastDefaultProducer {
}
private void poll(Exchange exchange) {
- exchange.getOut().setBody(this.queue.poll());
+ exchange.getMessage().setBody(this.queue.poll());
}
private void peek(Exchange exchange) {
- exchange.getOut().setBody(this.queue.peek());
+ exchange.getMessage().setBody(this.queue.peek());
}
private void offer(Exchange exchange) {
@@ -149,12 +149,12 @@ public class HazelcastQueueProducer extends HazelcastDefaultProducer {
}
private void remainingCapacity(Exchange exchange) {
- exchange.getOut().setBody(this.queue.remainingCapacity());
+ exchange.getMessage().setBody(this.queue.remainingCapacity());
}
private void drainTo(Collection c, Exchange exchange) {
- exchange.getOut().setBody(this.queue.drainTo(c));
- exchange.getOut().setHeader(HazelcastConstants.DRAIN_TO_COLLECTION, c);
+ exchange.getMessage().setBody(this.queue.drainTo(c));
+ exchange.getMessage().setHeader(HazelcastConstants.DRAIN_TO_COLLECTION, c);
}
private void removeAll(Exchange exchange) {
@@ -164,11 +164,11 @@ public class HazelcastQueueProducer extends HazelcastDefaultProducer {
private void removeIf(Exchange exchange) {
Predicate filter = exchange.getIn().getBody(Predicate.class);
- exchange.getOut().setBody(this.queue.removeIf(filter));
+ exchange.getMessage().setBody(this.queue.removeIf(filter));
}
private void take(Exchange exchange) throws InterruptedException {
- exchange.getOut().setBody(this.queue.take());
+ exchange.getMessage().setBody(this.queue.take());
}
private void retainAll(Exchange exchange) {
diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/replicatedmap/HazelcastReplicatedmapConsumer.java b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/replicatedmap/HazelcastReplicatedmapConsumer.java
index 796d2a5..2a215fd 100644
--- a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/replicatedmap/HazelcastReplicatedmapConsumer.java
+++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/replicatedmap/HazelcastReplicatedmapConsumer.java
@@ -45,7 +45,7 @@ public class HazelcastReplicatedmapConsumer extends HazelcastDefaultConsumer {
protected void doStart() throws Exception {
super.doStart();
- listener = cache.addEntryListener(new CamelEntryListener(this, cacheName), true);
+ listener = cache.addEntryListener(new CamelEntryListener(this, cacheName));
}
/**
diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/ringbuffer/HazelcastRingbufferProducer.java b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/ringbuffer/HazelcastRingbufferProducer.java
index e649f31..528d873 100644
--- a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/ringbuffer/HazelcastRingbufferProducer.java
+++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/ringbuffer/HazelcastRingbufferProducer.java
@@ -73,23 +73,23 @@ public class HazelcastRingbufferProducer extends HazelcastDefaultProducer {
}
private void readOnceHead(Exchange exchange) throws InterruptedException {
- exchange.getOut().setBody(this.ringbuffer.readOne(ringbuffer.headSequence()));
+ exchange.getMessage().setBody(this.ringbuffer.readOne(ringbuffer.headSequence()));
}
private void readOnceTail(Exchange exchange) throws InterruptedException {
- exchange.getOut().setBody(this.ringbuffer.readOne(ringbuffer.tailSequence()));
+ exchange.getMessage().setBody(this.ringbuffer.readOne(ringbuffer.tailSequence()));
}
- private void getCapacity(Exchange exchange) throws InterruptedException {
- exchange.getOut().setBody(this.ringbuffer.capacity());
+ private void getCapacity(Exchange exchange) {
+ exchange.getMessage().setBody(this.ringbuffer.capacity());
}
- private void getRemainingCapacity(Exchange exchange) throws InterruptedException {
- exchange.getOut().setBody(this.ringbuffer.remainingCapacity());
+ private void getRemainingCapacity(Exchange exchange) {
+ exchange.getMessage().setBody(this.ringbuffer.remainingCapacity());
}
private void add(Exchange exchange) {
final Object body = exchange.getIn().getBody();
- exchange.getOut().setBody(ringbuffer.add(body));
+ exchange.getMessage().setBody(ringbuffer.add(body));
}
}
diff --git a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastCamelTestSupport.java b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastCamelTestSupport.java
index aed1630..c3b1df7 100644
--- a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastCamelTestSupport.java
+++ b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastCamelTestSupport.java
@@ -32,7 +32,7 @@ public class HazelcastCamelTestSupport extends CamelTestSupport {
@Override
protected CamelContext createCamelContext() throws Exception {
- MockitoAnnotations.initMocks(this);
+ MockitoAnnotations.openMocks(this);
CamelContext context = super.createCamelContext();
HazelcastCamelTestHelper.registerHazelcastComponents(context, hazelcastInstance);
trainHazelcastInstance(hazelcastInstance);
diff --git a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastReplicatedmapConsumerTest.java b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastReplicatedmapConsumerTest.java
index 74c854d..6a699ab 100644
--- a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastReplicatedmapConsumerTest.java
+++ b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastReplicatedmapConsumerTest.java
@@ -17,47 +17,45 @@
package org.apache.camel.component.hazelcast;
import java.util.Map;
-import java.util.UUID;
import java.util.concurrent.TimeUnit;
-import com.hazelcast.core.EntryEvent;
-import com.hazelcast.core.EntryEventType;
-import com.hazelcast.core.EntryListener;
+import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.replicatedmap.ReplicatedMap;
+import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Captor;
-import org.mockito.Mock;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-public class HazelcastReplicatedmapConsumerTest extends HazelcastCamelTestSupport {
+public class HazelcastReplicatedmapConsumerTest extends CamelTestSupport {
- @Mock
+ private HazelcastInstance hazelcastInstance;
private ReplicatedMap<Object, Object> map;
- @Captor
- private ArgumentCaptor<EntryListener<Object, Object>> argument;
+ @BeforeEach
+ public void beforeEach() {
+ hazelcastInstance = Hazelcast.newHazelcastInstance();
+ map = hazelcastInstance.getReplicatedMap("rm");
+ }
- @Override
- protected void trainHazelcastInstance(HazelcastInstance hazelcastInstance) {
- when(hazelcastInstance.getReplicatedMap("rm")).thenReturn(map);
- when(map.addEntryListener(any(), eq(true))).thenReturn(UUID.randomUUID());
+ @AfterEach
+ public void afterEach() {
+ if (hazelcastInstance != null) {
+ hazelcastInstance.shutdown();
+ }
}
@Override
- @SuppressWarnings("unchecked")
- protected void verifyHazelcastInstance(HazelcastInstance hazelcastInstance) {
- verify(hazelcastInstance).getReplicatedMap("rm");
- verify(map).addEntryListener(any(EntryListener.class), eq(true));
+ protected CamelContext createCamelContext() throws Exception {
+ CamelContext context = super.createCamelContext();
+ HazelcastCamelTestHelper.registerHazelcastComponents(context, hazelcastInstance);
+ return context;
}
@Test
@@ -65,10 +63,7 @@ public class HazelcastReplicatedmapConsumerTest extends HazelcastCamelTestSuppor
MockEndpoint out = getMockEndpoint("mock:added");
out.expectedMessageCount(1);
- verify(map).addEntryListener(argument.capture(), eq(true));
- EntryEvent<Object, Object> event = new EntryEvent<>("foo", null, EntryEventType.ADDED.getType(), "4711", "my-foo");
- argument.getValue().entryAdded(event);
-
+ map.put("4711", "my-foo");
assertMockEndpointsSatisfied(5000, TimeUnit.MILLISECONDS);
this.checkHeaders(out.getExchanges().get(0).getIn().getHeaders(), HazelcastConstants.ADDED);
@@ -81,11 +76,8 @@ public class HazelcastReplicatedmapConsumerTest extends HazelcastCamelTestSuppor
public void testEvict() throws InterruptedException {
MockEndpoint out = getMockEndpoint("mock:evicted");
out.expectedMessageCount(1);
-
- verify(map).addEntryListener(argument.capture(), eq(true));
- EntryEvent<Object, Object> event = new EntryEvent<>("foo", null, EntryEventType.EVICTED.getType(), "4711", "my-foo");
- argument.getValue().entryEvicted(event);
-
+ map.put("4711", "my-foo", 100, TimeUnit.MILLISECONDS);
+ Thread.sleep(150);
assertMockEndpointsSatisfied(30000, TimeUnit.MILLISECONDS);
}
@@ -93,11 +85,8 @@ public class HazelcastReplicatedmapConsumerTest extends HazelcastCamelTestSuppor
public void testRemove() throws InterruptedException {
MockEndpoint out = getMockEndpoint("mock:removed");
out.expectedMessageCount(1);
-
- verify(map).addEntryListener(argument.capture(), eq(true));
- EntryEvent<Object, Object> event = new EntryEvent<>("foo", null, EntryEventType.REMOVED.getType(), "4711", "my-foo");
- argument.getValue().entryRemoved(event);
-
+ map.put("4711", "my-foo");
+ map.remove("4711");
assertMockEndpointsSatisfied(5000, TimeUnit.MILLISECONDS);
this.checkHeaders(out.getExchanges().get(0).getIn().getHeaders(), HazelcastConstants.REMOVED);
}
@@ -124,4 +113,5 @@ public class HazelcastReplicatedmapConsumerTest extends HazelcastCamelTestSuppor
assertEquals("4711", headers.get(HazelcastConstants.OBJECT_ID));
assertNotNull(headers.get(HazelcastConstants.LISTENER_TIME));
}
+
}