You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2015/05/04 17:24:03 UTC

camel git commit: CAMEL-8739 Camel-Hazelcast: Add Replicated Map support

Repository: camel
Updated Branches:
  refs/heads/master df47d5e98 -> b893fc2c7


CAMEL-8739 Camel-Hazelcast: Add Replicated Map support


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/b893fc2c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/b893fc2c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/b893fc2c

Branch: refs/heads/master
Commit: b893fc2c7511ecb0ae54af9404013b698b5124ca
Parents: df47d5e
Author: Andrea Cosentino <an...@gmail.com>
Authored: Mon May 4 17:20:27 2015 +0200
Committer: Andrea Cosentino <an...@gmail.com>
Committed: Mon May 4 17:20:27 2015 +0200

----------------------------------------------------------------------
 .../component/hazelcast/HazelcastCommand.java   |   2 +-
 .../component/hazelcast/HazelcastComponent.java |  13 +-
 .../component/hazelcast/HazelcastConstants.java |   1 +
 .../HazelcastReplicatedmapConsumer.java         |  37 ++++++
 .../HazelcastReplicatedmapEndpoint.java         |  42 +++++++
 .../HazelcastReplicatedmapProducer.java         |  94 ++++++++++++++
 .../hazelcast/HazelcastErrorMessagesTest.java   |   2 +-
 .../HazelcastReplicatedmapConsumerTest.java     | 123 +++++++++++++++++++
 ...lcastReplicatedmapProducerForSpringTest.java |  83 +++++++++++++
 .../HazelcastReplicatedmapProducerTest.java     | 123 +++++++++++++++++++
 .../spring/test-camel-context-replicatedmap.xml |  53 ++++++++
 11 files changed, 569 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/b893fc2c/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastCommand.java
----------------------------------------------------------------------
diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastCommand.java b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastCommand.java
index 2c328d7..f97d017 100644
--- a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastCommand.java
+++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastCommand.java
@@ -18,6 +18,6 @@ package org.apache.camel.component.hazelcast;
 
 public enum HazelcastCommand {
 
-    map, multimap, queue, topic, seda, set, atomicvalue, instance, list
+    map, multimap, queue, topic, seda, set, atomicvalue, instance, list, replicatedmap
 
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/b893fc2c/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastComponent.java b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastComponent.java
index 1b51ba8..0f29082 100644
--- a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastComponent.java
+++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastComponent.java
@@ -22,6 +22,7 @@ import com.hazelcast.config.Config;
 import com.hazelcast.config.XmlConfigBuilder;
 import com.hazelcast.core.Hazelcast;
 import com.hazelcast.core.HazelcastInstance;
+
 import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
 import org.apache.camel.component.hazelcast.atomicnumber.HazelcastAtomicnumberEndpoint;
@@ -30,6 +31,7 @@ import org.apache.camel.component.hazelcast.list.HazelcastListEndpoint;
 import org.apache.camel.component.hazelcast.map.HazelcastMapEndpoint;
 import org.apache.camel.component.hazelcast.multimap.HazelcastMultimapEndpoint;
 import org.apache.camel.component.hazelcast.queue.HazelcastQueueEndpoint;
+import org.apache.camel.component.hazelcast.replicatedmap.HazelcastReplicatedmapEndpoint;
 import org.apache.camel.component.hazelcast.seda.HazelcastSedaConfiguration;
 import org.apache.camel.component.hazelcast.seda.HazelcastSedaEndpoint;
 import org.apache.camel.component.hazelcast.topic.HazelcastTopicEndpoint;
@@ -130,10 +132,17 @@ public class HazelcastComponent extends UriEndpointComponent {
             endpoint.setCommand(HazelcastCommand.list);
         }
 
+        if (remaining.startsWith(HazelcastConstants.REPLICATEDMAP_PREFIX)) {
+            // remaining is anything (name it foo ;)
+            remaining = removeStartingCharacters(remaining.substring(HazelcastConstants.REPLICATEDMAP_PREFIX.length()), '/');
+            endpoint = new HazelcastReplicatedmapEndpoint(hzInstance, uri, remaining, this);
+            endpoint.setCommand(HazelcastCommand.replicatedmap);
+        }        
+        
         if (endpoint == null) {
-            throw new IllegalArgumentException(String.format("Your URI does not provide a correct 'type' prefix. It should be anything like 'hazelcast:[%s|%s|%s|%s|%s|%s|%s]name' but is '%s'.",
+            throw new IllegalArgumentException(String.format("Your URI does not provide a correct 'type' prefix. It should be anything like 'hazelcast:[%s|%s|%s|%s|%s|%s|%s|%s]name' but is '%s'.",
                     HazelcastConstants.MAP_PREFIX, HazelcastConstants.MULTIMAP_PREFIX, HazelcastConstants.ATOMICNUMBER_PREFIX, HazelcastConstants.INSTANCE_PREFIX, HazelcastConstants.QUEUE_PREFIX,
-                    HazelcastConstants.SEDA_PREFIX, HazelcastConstants.LIST_PREFIX, uri));
+                    HazelcastConstants.SEDA_PREFIX, HazelcastConstants.LIST_PREFIX, HazelcastConstants.REPLICATEDMAP_PREFIX, uri));
         }
 
         if (defaultOperation != -1) {

http://git-wip-us.apache.org/repos/asf/camel/blob/b893fc2c/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastConstants.java b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastConstants.java
index 056caa5..46567b1 100644
--- a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastConstants.java
+++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastConstants.java
@@ -23,6 +23,7 @@ public final class HazelcastConstants {
     */
     public static final String MAP_PREFIX = "map:";
     public static final String MULTIMAP_PREFIX = "multimap:";
+    public static final String REPLICATEDMAP_PREFIX = "replicatedmap:";
     public static final String ATOMICNUMBER_PREFIX = "atomicvalue:";
     public static final String INSTANCE_PREFIX = "instance:";
     public static final String QUEUE_PREFIX = "queue:";

http://git-wip-us.apache.org/repos/asf/camel/blob/b893fc2c/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/replicatedmap/HazelcastReplicatedmapConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/replicatedmap/HazelcastReplicatedmapConsumer.java b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/replicatedmap/HazelcastReplicatedmapConsumer.java
new file mode 100644
index 0000000..b5934dc
--- /dev/null
+++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/replicatedmap/HazelcastReplicatedmapConsumer.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hazelcast.replicatedmap;
+
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.MultiMap;
+import com.hazelcast.core.ReplicatedMap;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Processor;
+import org.apache.camel.component.hazelcast.HazelcastDefaultConsumer;
+import org.apache.camel.component.hazelcast.listener.CamelEntryListener;
+
+public class HazelcastReplicatedmapConsumer extends HazelcastDefaultConsumer {
+
+    public HazelcastReplicatedmapConsumer(HazelcastInstance hazelcastInstance, Endpoint endpoint, Processor processor, String cacheName) {
+        super(hazelcastInstance, endpoint, processor, cacheName);
+
+        ReplicatedMap<Object, Object> cache = hazelcastInstance.getReplicatedMap(cacheName);
+        cache.addEntryListener(new CamelEntryListener(this, cacheName), true);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/b893fc2c/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/replicatedmap/HazelcastReplicatedmapEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/replicatedmap/HazelcastReplicatedmapEndpoint.java b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/replicatedmap/HazelcastReplicatedmapEndpoint.java
new file mode 100644
index 0000000..79f36d6
--- /dev/null
+++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/replicatedmap/HazelcastReplicatedmapEndpoint.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hazelcast.replicatedmap;
+
+import com.hazelcast.core.HazelcastInstance;
+import org.apache.camel.Consumer;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.component.hazelcast.HazelcastComponent;
+import org.apache.camel.component.hazelcast.HazelcastDefaultEndpoint;
+
+public class HazelcastReplicatedmapEndpoint extends HazelcastDefaultEndpoint {
+
+    public HazelcastReplicatedmapEndpoint(HazelcastInstance hazelcastInstance, String uri, String cacheName, HazelcastComponent component) {
+        super(hazelcastInstance, uri, component, cacheName);
+    }
+
+    public Consumer createConsumer(Processor processor) throws Exception {
+        HazelcastReplicatedmapConsumer answer = new HazelcastReplicatedmapConsumer(hazelcastInstance, this, processor, cacheName);
+        configureConsumer(answer);
+        return answer;
+    }
+
+    public Producer createProducer() throws Exception {
+        return new HazelcastReplicatedmapProducer(hazelcastInstance, this, cacheName);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/b893fc2c/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/replicatedmap/HazelcastReplicatedmapProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/replicatedmap/HazelcastReplicatedmapProducer.java b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/replicatedmap/HazelcastReplicatedmapProducer.java
new file mode 100644
index 0000000..7fc8a2b
--- /dev/null
+++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/replicatedmap/HazelcastReplicatedmapProducer.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hazelcast.replicatedmap;
+
+import java.util.Map;
+
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.MultiMap;
+import com.hazelcast.core.ReplicatedMap;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.component.hazelcast.HazelcastComponentHelper;
+import org.apache.camel.component.hazelcast.HazelcastConstants;
+import org.apache.camel.component.hazelcast.HazelcastDefaultEndpoint;
+import org.apache.camel.component.hazelcast.HazelcastDefaultProducer;
+
+public class HazelcastReplicatedmapProducer extends HazelcastDefaultProducer {
+
+    private final ReplicatedMap<Object, Object> cache;
+
+    public HazelcastReplicatedmapProducer(HazelcastInstance hazelcastInstance, HazelcastDefaultEndpoint endpoint, String cacheName) {
+        super(endpoint);
+        this.cache = hazelcastInstance.getReplicatedMap(cacheName);
+    }
+
+    public void process(Exchange exchange) throws Exception {
+
+        Map<String, Object> headers = exchange.getIn().getHeaders();
+
+        // get header parameters
+        Object oid = null;
+
+        if (headers.containsKey(HazelcastConstants.OBJECT_ID)) {
+            oid = headers.get(HazelcastConstants.OBJECT_ID);
+        }
+
+        final int operation = lookupOperationNumber(exchange);
+
+        switch (operation) {
+        case HazelcastConstants.PUT_OPERATION:
+            this.put(oid, exchange);
+            break;
+
+        case HazelcastConstants.GET_OPERATION:
+            this.get(oid, exchange);
+            break;
+
+        case HazelcastConstants.DELETE_OPERATION:
+            this.delete(oid);
+            break;
+
+        case HazelcastConstants.CLEAR_OPERATION:
+            this.clear(exchange);
+            break;
+            
+        default:
+            throw new IllegalArgumentException(String.format("The value '%s' is not allowed for parameter '%s' on the MULTIMAP cache.", operation, HazelcastConstants.OPERATION));
+        }
+
+        // finally copy headers
+        HazelcastComponentHelper.copyHeaders(exchange);
+    }
+
+    private void put(Object oid, Exchange exchange) {
+        Object body = exchange.getIn().getBody();
+        this.cache.put(oid, body);
+    }
+
+    private void get(Object oid, Exchange exchange) {
+        exchange.getOut().setBody(this.cache.get(oid));
+    }
+
+    private void delete(Object oid) {
+        this.cache.remove(oid);
+    }
+
+    private void clear(Exchange exchange) {
+        this.cache.clear();
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/b893fc2c/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastErrorMessagesTest.java
----------------------------------------------------------------------
diff --git a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastErrorMessagesTest.java b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastErrorMessagesTest.java
index 74b4b78..31dbc64 100644
--- a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastErrorMessagesTest.java
+++ b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastErrorMessagesTest.java
@@ -36,7 +36,7 @@ public class HazelcastErrorMessagesTest extends HazelcastCamelTestSupport {
         } catch (Exception e) {
             assertTrue(e.getMessage().contains(
                     "Your URI does not provide a correct 'type' prefix. It should be anything like "
-                            + "'hazelcast:[map:|multimap:|atomicvalue:|instance:|queue:|seda:|list:]name' but is 'hazelcast://error:foo"));
+                            + "'hazelcast:[map:|multimap:|atomicvalue:|instance:|queue:|seda:|list:|replicatedmap:]name' but is 'hazelcast://error:foo"));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/b893fc2c/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastReplicatedmapConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastReplicatedmapConsumerTest.java b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastReplicatedmapConsumerTest.java
new file mode 100644
index 0000000..26d8df0
--- /dev/null
+++ b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastReplicatedmapConsumerTest.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hazelcast;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import com.hazelcast.core.EntryEvent;
+import com.hazelcast.core.EntryEventType;
+import com.hazelcast.core.EntryListener;
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.MultiMap;
+import com.hazelcast.core.ReplicatedMap;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class HazelcastReplicatedmapConsumerTest extends HazelcastCamelTestSupport {
+
+    @Mock
+    private ReplicatedMap<Object, Object> map;
+
+    private ArgumentCaptor<EntryListener> argument;
+
+    @Override
+    @SuppressWarnings("unchecked")
+    protected void trainHazelcastInstance(HazelcastInstance hazelcastInstance) {
+        when(hazelcastInstance.getReplicatedMap("rm")).thenReturn(map);
+        argument = ArgumentCaptor.forClass(EntryListener.class);
+        when(map.addEntryListener(argument.capture(), eq(true))).thenReturn("foo");
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    protected void verifyHazelcastInstance(HazelcastInstance hazelcastInstance) {
+        verify(hazelcastInstance).getReplicatedMap("rm");
+        verify(map).addEntryListener(any(EntryListener.class), eq(true));
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testAdd() throws InterruptedException {
+        MockEndpoint out = getMockEndpoint("mock:added");
+        out.expectedMessageCount(1);
+
+        EntryEvent<Object, Object> event = new EntryEvent<Object, Object>("foo", null, EntryEventType.ADDED.getType(), "4711", "my-foo");
+        argument.getValue().entryAdded(event);
+
+        assertMockEndpointsSatisfied(5000, TimeUnit.MILLISECONDS);
+
+        this.checkHeaders(out.getExchanges().get(0).getIn().getHeaders(), HazelcastConstants.ADDED);
+    }
+
+    /*
+     * mail from talip (hazelcast) on 21.02.2011: MultiMap doesn't support eviction yet. We can and should add this feature.
+     */
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testEvict() throws InterruptedException {
+        MockEndpoint out = getMockEndpoint("mock:evicted");
+        out.expectedMessageCount(1);
+
+        EntryEvent<Object, Object> event = new EntryEvent<Object, Object>("foo", null, EntryEventType.EVICTED.getType(), "4711", "my-foo");
+        argument.getValue().entryEvicted(event);
+
+        assertMockEndpointsSatisfied(30000, TimeUnit.MILLISECONDS);
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testRemove() throws InterruptedException {
+        MockEndpoint out = getMockEndpoint("mock:removed");
+        out.expectedMessageCount(1);
+
+        EntryEvent<Object, Object> event = new EntryEvent<Object, Object>("foo", null, EntryEventType.REMOVED.getType(), "4711", "my-foo");
+        argument.getValue().entryRemoved(event);
+
+        assertMockEndpointsSatisfied(5000, TimeUnit.MILLISECONDS);
+        this.checkHeaders(out.getExchanges().get(0).getIn().getHeaders(), HazelcastConstants.REMOVED);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from(String.format("hazelcast:%srm", HazelcastConstants.REPLICATEDMAP_PREFIX)).log("object...").choice()
+                        .when(header(HazelcastConstants.LISTENER_ACTION).isEqualTo(HazelcastConstants.ADDED)).log("...added").to("mock:added")
+                        .when(header(HazelcastConstants.LISTENER_ACTION).isEqualTo(HazelcastConstants.EVICTED)).log("...evicted").to("mock:evicted")
+                        .when(header(HazelcastConstants.LISTENER_ACTION).isEqualTo(HazelcastConstants.REMOVED)).log("...removed").to("mock:removed").otherwise().log("fail!");
+            }
+        };
+    }
+
+    private void checkHeaders(Map<String, Object> headers, String action) {
+        assertEquals(action, headers.get(HazelcastConstants.LISTENER_ACTION));
+        assertEquals(HazelcastConstants.CACHE_LISTENER, headers.get(HazelcastConstants.LISTENER_TYPE));
+        assertEquals("4711", headers.get(HazelcastConstants.OBJECT_ID));
+        assertNotNull(headers.get(HazelcastConstants.LISTENER_TIME));
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/b893fc2c/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastReplicatedmapProducerForSpringTest.java
----------------------------------------------------------------------
diff --git a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastReplicatedmapProducerForSpringTest.java b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastReplicatedmapProducerForSpringTest.java
new file mode 100644
index 0000000..c108a08
--- /dev/null
+++ b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastReplicatedmapProducerForSpringTest.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hazelcast;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.MultiMap;
+import com.hazelcast.core.ReplicatedMap;
+
+import org.junit.After;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.springframework.context.support.AbstractApplicationContext;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+public class HazelcastReplicatedmapProducerForSpringTest extends HazelcastCamelSpringTestSupport {
+
+    @Mock
+    private ReplicatedMap<Object, Object> map;
+
+    @Override
+    protected void trainHazelcastInstance(HazelcastInstance hazelcastInstance) {
+        when(hazelcastInstance.getReplicatedMap("bar")).thenReturn(map);
+    }
+
+    @Override
+    protected void verifyHazelcastInstance(HazelcastInstance hazelcastInstance) {
+        verify(hazelcastInstance, atLeastOnce()).getReplicatedMap("bar");
+    }
+
+    @After
+    public void verifyMapMock() {
+        verifyNoMoreInteractions(map);
+    }
+
+    @Override
+    protected AbstractApplicationContext createApplicationContext() {
+        return new ClassPathXmlApplicationContext("/META-INF/spring/test-camel-context-replicatedmap.xml");
+    }
+
+    @Test
+    public void testPut() throws InterruptedException {
+        template.sendBodyAndHeader("direct:put", "my-foo", HazelcastConstants.OBJECT_ID, "4711");
+        verify(map).put("4711", "my-foo");
+    }
+
+    @Test
+    public void testGet() {
+        when(map.get("4711")).thenReturn(Arrays.<Object>asList("my-foo"));
+        template.sendBodyAndHeader("direct:get", null, HazelcastConstants.OBJECT_ID, "4711");
+        verify(map).get("4711");
+        Collection<?> body = consumer.receiveBody("seda:out", 5000, Collection.class);
+        assertTrue(body.contains("my-foo"));
+    }
+
+    @Test
+    public void testDelete() {
+        template.sendBodyAndHeader("direct:delete", null, HazelcastConstants.OBJECT_ID, 4711);
+        verify(map).remove(4711);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/b893fc2c/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastReplicatedmapProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastReplicatedmapProducerTest.java b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastReplicatedmapProducerTest.java
new file mode 100644
index 0000000..82e2621
--- /dev/null
+++ b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastReplicatedmapProducerTest.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hazelcast;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.MultiMap;
+import com.hazelcast.core.ReplicatedMap;
+
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.After;
+import org.junit.Test;
+import org.mockito.Mock;
+
+import static org.mockito.Mockito.*;
+
+public class HazelcastReplicatedmapProducerTest extends HazelcastCamelTestSupport {
+
+    @Mock
+    private ReplicatedMap<Object, Object> map;
+
+    @Override
+    protected void trainHazelcastInstance(HazelcastInstance hazelcastInstance) {
+        when(hazelcastInstance.getReplicatedMap("bar")).thenReturn(map);
+    }
+
+    @Override
+    protected void verifyHazelcastInstance(HazelcastInstance hazelcastInstance) {
+        verify(hazelcastInstance, atLeastOnce()).getReplicatedMap("bar");
+    }
+
+    @After
+    public void verifyMapMock() {
+        verifyNoMoreInteractions(map);
+    }
+
+    @Test(expected = CamelExecutionException.class)
+    public void testWithInvalidOperation() {
+        template.sendBodyAndHeader("direct:putInvalid", "my-foo", HazelcastConstants.OBJECT_ID, "4711");
+    }
+
+    @Test
+    public void testPut() throws InterruptedException {
+        template.sendBodyAndHeader("direct:put", "my-foo", HazelcastConstants.OBJECT_ID, "4711");
+        verify(map).put("4711", "my-foo");
+    }
+
+    @Test
+    public void testPutWithOperationName() throws InterruptedException {
+        template.sendBodyAndHeader("direct:putWithOperationName", "my-foo", HazelcastConstants.OBJECT_ID, "4711");
+        verify(map).put("4711", "my-foo");
+    }
+
+    @Test
+    public void testPutWithOperationNumber() throws InterruptedException {
+        template.sendBodyAndHeader("direct:putWithOperationNumber", "my-foo", HazelcastConstants.OBJECT_ID, "4711");
+        verify(map).put("4711", "my-foo");
+    }
+
+    @Test
+    public void testGet() {
+        when(map.get("4711")).thenReturn(Arrays.<Object>asList("my-foo"));
+        template.sendBodyAndHeader("direct:get", null, HazelcastConstants.OBJECT_ID, "4711");
+        verify(map).get("4711");
+        Collection<?> body = consumer.receiveBody("seda:out", 5000, Collection.class);
+        assertTrue(body.contains("my-foo"));
+    }
+
+    @Test
+    public void testDelete() {
+        template.sendBodyAndHeader("direct:delete", null, HazelcastConstants.OBJECT_ID, 4711);
+        verify(map).remove(4711);
+    }
+    
+    @Test
+    public void testClear() {
+        template.sendBody("direct:clear", "test");
+        verify(map).clear();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+
+                from("direct:putInvalid").setHeader(HazelcastConstants.OPERATION, constant("bogus")).to(String.format("hazelcast:%sbar", HazelcastConstants.REPLICATEDMAP_PREFIX));
+
+                from("direct:put").setHeader(HazelcastConstants.OPERATION, constant(HazelcastConstants.PUT_OPERATION)).to(String.format("hazelcast:%sbar", HazelcastConstants.REPLICATEDMAP_PREFIX));
+
+                from("direct:get").setHeader(HazelcastConstants.OPERATION, constant(HazelcastConstants.GET_OPERATION)).to(String.format("hazelcast:%sbar", HazelcastConstants.REPLICATEDMAP_PREFIX))
+                        .to("seda:out");
+
+                from("direct:delete").setHeader(HazelcastConstants.OPERATION, constant(HazelcastConstants.DELETE_OPERATION))
+                        .to(String.format("hazelcast:%sbar", HazelcastConstants.REPLICATEDMAP_PREFIX));
+
+                from("direct:clear").setHeader(HazelcastConstants.OPERATION, constant(HazelcastConstants.CLEAR_OPERATION))
+                        .to(String.format("hazelcast:%sbar", HazelcastConstants.REPLICATEDMAP_PREFIX));
+                
+                from("direct:putWithOperationNumber").toF("hazelcast:%sbar?operation=%s", HazelcastConstants.REPLICATEDMAP_PREFIX, HazelcastConstants.PUT_OPERATION);
+                from("direct:putWithOperationName").toF("hazelcast:%sbar?operation=put", HazelcastConstants.REPLICATEDMAP_PREFIX);
+            }
+        };
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/b893fc2c/components/camel-hazelcast/src/test/resources/META-INF/spring/test-camel-context-replicatedmap.xml
----------------------------------------------------------------------
diff --git a/components/camel-hazelcast/src/test/resources/META-INF/spring/test-camel-context-replicatedmap.xml b/components/camel-hazelcast/src/test/resources/META-INF/spring/test-camel-context-replicatedmap.xml
new file mode 100644
index 0000000..c1af245
--- /dev/null
+++ b/components/camel-hazelcast/src/test/resources/META-INF/spring/test-camel-context-replicatedmap.xml
@@ -0,0 +1,53 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:camel="http://camel.apache.org/schema/spring"
+	xsi:schemaLocation="
+	   http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
+       http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
+
+	<camelContext xmlns="http://camel.apache.org/schema/spring">
+
+		<route>
+			<from uri="direct:put" />
+			<setHeader headerName="CamelHazelcastOperationType">
+				<constant>put</constant>
+			</setHeader>
+			<to uri="hazelcast:replicatedmap:bar" />
+		</route>
+
+		<route>
+			<from uri="direct:get" />
+			<setHeader headerName="CamelHazelcastOperationType">
+				<constant>get</constant>
+			</setHeader>
+			<to uri="hazelcast:replicatedmap:bar" />
+			<to uri="seda:out" />
+		</route>
+
+		<route>
+			<from uri="direct:delete" />
+			<setHeader headerName="CamelHazelcastOperationType">
+				<constant>delete</constant>
+			</setHeader>
+			<to uri="hazelcast:replicatedmap:bar" />
+		</route>
+
+	</camelContext>
+
+</beans>