You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2017/03/16 15:04:46 UTC

[2/2] camel git commit: CAMEL-11027: Camel-Hazelcast: Support Reliable Topic

CAMEL-11027: Camel-Hazelcast: Support Reliable Topic


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/619b0d45
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/619b0d45
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/619b0d45

Branch: refs/heads/master
Commit: 619b0d459f623f2efca662ff49e1d490b0f4f40c
Parents: a7e9acd
Author: Andrea Cosentino <an...@gmail.com>
Authored: Thu Mar 16 15:55:53 2017 +0100
Committer: Andrea Cosentino <an...@gmail.com>
Committed: Thu Mar 16 16:04:29 2017 +0100

----------------------------------------------------------------------
 .../src/main/docs/hazelcast-component.adoc      |  3 +-
 .../component/hazelcast/HazelcastComponent.java |  5 +-
 .../hazelcast/HazelcastDefaultEndpoint.java     |  5 +-
 .../topic/HazelcastTopicConfiguration.java      | 42 +++++++++
 .../hazelcast/topic/HazelcastTopicConsumer.java | 10 ++-
 .../hazelcast/topic/HazelcastTopicEndpoint.java | 13 ++-
 .../hazelcast/topic/HazelcastTopicProducer.java |  8 +-
 .../HazelcastReliableTopicConsumerTest.java     | 94 ++++++++++++++++++++
 .../HazelcastReliableTopicProducerTest.java     | 80 +++++++++++++++++
 9 files changed, 245 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/619b0d45/components/camel-hazelcast/src/main/docs/hazelcast-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-hazelcast/src/main/docs/hazelcast-component.adoc b/components/camel-hazelcast/src/main/docs/hazelcast-component.adoc
index 492d8d9..c5a98d6 100644
--- a/components/camel-hazelcast/src/main/docs/hazelcast-component.adoc
+++ b/components/camel-hazelcast/src/main/docs/hazelcast-component.adoc
@@ -80,7 +80,7 @@ with the following path and query parameters:
 | cacheName |  | String | *Required* The name of the cache
 |=======================================================================
 
-#### Query Parameters (11 parameters):
+#### Query Parameters (12 parameters):
 
 [width="100%",cols="2,1,1m,1m,5",options="header"]
 |=======================================================================
@@ -88,6 +88,7 @@ with the following path and query parameters:
 | defaultOperation | common |  | String | To specify a default operation to use if no operation header has been provided.
 | hazelcastInstance | common |  | HazelcastInstance | The hazelcast instance reference which can be used for hazelcast endpoint.
 | hazelcastInstanceName | common |  | String | The hazelcast instance reference name which can be used for hazelcast endpoint. If you don't specify the instance reference camel use the default hazelcast instance from the camel-hazelcast instance.
+| reliable | common | false | boolean | Define if the endpoint will use a reliable Topic struct or not.
 | bridgeErrorHandler | consumer | false | boolean | Allows for bridging the consumer to the Camel routing Error Handler which mean any exceptions occurred while the consumer is trying to pickup incoming messages or the likes will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions that will be logged at WARN or ERROR level and ignored.
 | exceptionHandler | consumer (advanced) |  | ExceptionHandler | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this options is not in use. By default the consumer will deal with exceptions that will be logged at WARN or ERROR level and ignored.
 | exchangePattern | consumer (advanced) |  | ExchangePattern | Sets the exchange pattern when the consumer creates an exchange.

http://git-wip-us.apache.org/repos/asf/camel/blob/619b0d45/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastComponent.java b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastComponent.java
index 2e667bb..b4b0414 100644
--- a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastComponent.java
+++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastComponent.java
@@ -41,6 +41,7 @@ import org.apache.camel.component.hazelcast.ringbuffer.HazelcastRingbufferEndpoi
 import org.apache.camel.component.hazelcast.seda.HazelcastSedaConfiguration;
 import org.apache.camel.component.hazelcast.seda.HazelcastSedaEndpoint;
 import org.apache.camel.component.hazelcast.set.HazelcastSetEndpoint;
+import org.apache.camel.component.hazelcast.topic.HazelcastTopicConfiguration;
 import org.apache.camel.component.hazelcast.topic.HazelcastTopicEndpoint;
 import org.apache.camel.impl.DefaultComponent;
 import org.apache.camel.spi.Metadata;
@@ -131,7 +132,9 @@ public class HazelcastComponent extends DefaultComponent {
         if (remaining.startsWith(HazelcastConstants.TOPIC_PREFIX)) {
             // remaining is anything (name it foo ;)
             remaining = StringHelper.removeStartingCharacters(remaining.substring(HazelcastConstants.TOPIC_PREFIX.length()), '/');
-            endpoint = new HazelcastTopicEndpoint(hzInstance, uri, this, remaining);
+            final HazelcastTopicConfiguration config = new HazelcastTopicConfiguration();
+            setProperties(config, parameters);
+            endpoint = new HazelcastTopicEndpoint(hzInstance, uri, this, remaining, config);
             endpoint.setCommand(HazelcastCommand.topic);
         }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/619b0d45/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastDefaultEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastDefaultEndpoint.java b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastDefaultEndpoint.java
index c32ac15..f93be82 100644
--- a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastDefaultEndpoint.java
+++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastDefaultEndpoint.java
@@ -22,6 +22,7 @@ import org.apache.camel.Consumer;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.component.hazelcast.seda.HazelcastSedaConfiguration;
+import org.apache.camel.component.hazelcast.topic.HazelcastTopicConfiguration;
 import org.apache.camel.impl.DefaultEndpoint;
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.UriEndpoint;
@@ -47,6 +48,8 @@ public abstract class HazelcastDefaultEndpoint extends DefaultEndpoint {
     private String defaultOperation;
     @UriParam
     private HazelcastSedaConfiguration hazelcastSedaConfiguration; // to include component schema docs
+    @UriParam
+    private HazelcastTopicConfiguration hazelcastTopicConfiguration; 
 
     public HazelcastDefaultEndpoint(HazelcastInstance hazelcastInstance, String endpointUri, Component component) {
         this(hazelcastInstance, endpointUri, component, null);
@@ -111,7 +114,7 @@ public abstract class HazelcastDefaultEndpoint extends DefaultEndpoint {
         this.hazelcastInstanceName = hazelcastInstanceName;
     }
 
-    /**
+	/**
      * To specify a default operation to use, if no operation header has been provided.
      */
     public void setDefaultOperation(String defaultOperation) {

http://git-wip-us.apache.org/repos/asf/camel/blob/619b0d45/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/topic/HazelcastTopicConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/topic/HazelcastTopicConfiguration.java b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/topic/HazelcastTopicConfiguration.java
new file mode 100644
index 0000000..d83d268
--- /dev/null
+++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/topic/HazelcastTopicConfiguration.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hazelcast.topic;
+
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriParams;
+
+/**
+ * Hazelcast Topic Component configuration.
+ */
+@UriParams
+public class HazelcastTopicConfiguration {
+	
+	@UriParam(label = "common", defaultValue = "false")
+    private boolean reliable = false;
+	
+    /**
+     * Define if the endpoint will use a reliable Topic struct or not.
+     */
+	public boolean isReliable() {
+		return reliable;
+	}
+
+	public void setReliable(boolean reliable) {
+		this.reliable = reliable;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/619b0d45/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/topic/HazelcastTopicConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/topic/HazelcastTopicConsumer.java b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/topic/HazelcastTopicConsumer.java
index dd601db..13a802b 100644
--- a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/topic/HazelcastTopicConsumer.java
+++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/topic/HazelcastTopicConsumer.java
@@ -28,10 +28,14 @@ import org.apache.camel.component.hazelcast.listener.CamelMessageListener;
  */
 public class HazelcastTopicConsumer extends HazelcastDefaultConsumer {
 
-    public HazelcastTopicConsumer(HazelcastInstance hazelcastInstance, Endpoint endpoint, Processor processor, String cacheName) {
+    public HazelcastTopicConsumer(HazelcastInstance hazelcastInstance, Endpoint endpoint, Processor processor, String cacheName, boolean reliable) {
         super(hazelcastInstance, endpoint, processor, cacheName);
-
-        ITopic<Object> topic = hazelcastInstance.getTopic(cacheName);
+        ITopic<Object> topic;
+        if (!reliable) {
+            topic = hazelcastInstance.getTopic(cacheName);
+        } else {
+        	topic = hazelcastInstance.getReliableTopic(cacheName);
+        }
         topic.addMessageListener(new CamelMessageListener(this, cacheName));
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/619b0d45/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/topic/HazelcastTopicEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/topic/HazelcastTopicEndpoint.java b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/topic/HazelcastTopicEndpoint.java
index 3e57529..f9aefb7 100644
--- a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/topic/HazelcastTopicEndpoint.java
+++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/topic/HazelcastTopicEndpoint.java
@@ -24,30 +24,29 @@ import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.component.hazelcast.HazelcastDefaultEndpoint;
 
-/**
- *
- */
 public class HazelcastTopicEndpoint extends HazelcastDefaultEndpoint implements MultipleConsumersSupport {
 
-    public HazelcastTopicEndpoint(HazelcastInstance hazelcastInstance, String endpointUri, Component component, String cacheName) {
+    private final HazelcastTopicConfiguration configuration;
+    
+    public HazelcastTopicEndpoint(HazelcastInstance hazelcastInstance, String endpointUri, Component component, String cacheName, final HazelcastTopicConfiguration configuration) {
         super(hazelcastInstance, endpointUri, component, cacheName);
+        this.configuration = configuration;
     }
 
     @Override
     public Consumer createConsumer(Processor processor) throws Exception {
-        HazelcastTopicConsumer answer = new HazelcastTopicConsumer(hazelcastInstance, this, processor, cacheName);
+        HazelcastTopicConsumer answer = new HazelcastTopicConsumer(hazelcastInstance, this, processor, cacheName, configuration.isReliable());
         configureConsumer(answer);
         return answer;
     }
 
     @Override
     public Producer createProducer() throws Exception {
-        return new HazelcastTopicProducer(hazelcastInstance, this, cacheName);
+        return new HazelcastTopicProducer(hazelcastInstance, this, cacheName, configuration.isReliable());
     }
 
     @Override
     public boolean isMultipleConsumersSupported() {
         return true;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/619b0d45/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/topic/HazelcastTopicProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/topic/HazelcastTopicProducer.java b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/topic/HazelcastTopicProducer.java
index 4cb08fe..5ca57df 100644
--- a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/topic/HazelcastTopicProducer.java
+++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/topic/HazelcastTopicProducer.java
@@ -31,9 +31,13 @@ public class HazelcastTopicProducer extends HazelcastDefaultProducer {
 
     private ITopic<Object> topic;
 
-    public HazelcastTopicProducer(HazelcastInstance hazelcastInstance, HazelcastDefaultEndpoint endpoint, String topicName) {
+    public HazelcastTopicProducer(HazelcastInstance hazelcastInstance, HazelcastDefaultEndpoint endpoint, String topicName, boolean reliable) {
         super(endpoint);
-        this.topic = hazelcastInstance.getTopic(topicName);
+        if (!reliable) {
+            this.topic = hazelcastInstance.getTopic(topicName);
+        } else {
+            this.topic = hazelcastInstance.getReliableTopic(topicName);
+        }
     }
 
     public void process(Exchange exchange) throws Exception {

http://git-wip-us.apache.org/repos/asf/camel/blob/619b0d45/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastReliableTopicConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastReliableTopicConsumerTest.java b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastReliableTopicConsumerTest.java
new file mode 100644
index 0000000..d6b0ecc
--- /dev/null
+++ b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastReliableTopicConsumerTest.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hazelcast;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.ITopic;
+import com.hazelcast.core.Message;
+import com.hazelcast.core.MessageListener;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class HazelcastReliableTopicConsumerTest extends HazelcastCamelTestSupport {
+
+    @Mock
+    private ITopic<String> reliableTopic;
+
+    private ArgumentCaptor<MessageListener> argument;
+
+    @Override
+    @SuppressWarnings("unchecked")
+    protected void trainHazelcastInstance(HazelcastInstance hazelcastInstance) {
+        when(hazelcastInstance.<String>getReliableTopic("foo")).thenReturn(reliableTopic);
+        argument = ArgumentCaptor.forClass(MessageListener.class);
+        when(reliableTopic.addMessageListener(argument.capture())).thenReturn("foo");
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    protected void verifyHazelcastInstance(HazelcastInstance hazelcastInstance) {
+        verify(hazelcastInstance).getReliableTopic("foo");
+        verify(reliableTopic).addMessageListener(any(MessageListener.class));
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void receive() throws InterruptedException {
+        MockEndpoint out = getMockEndpoint("mock:received");
+        out.expectedMessageCount(1);
+
+        final Message<String> msg = new Message<String>("foo", "foo", new java.util.Date().getTime(), null);
+        argument.getValue().onMessage(msg);
+
+        assertMockEndpointsSatisfied(2000, TimeUnit.MILLISECONDS);
+
+        this.checkHeaders(out.getExchanges().get(0).getIn().getHeaders(), HazelcastConstants.RECEIVED);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from(String.format("hazelcast:%sfoo?reliable=true", HazelcastConstants.TOPIC_PREFIX)).log("object...")
+                        .choice()
+                            .when(header(HazelcastConstants.LISTENER_ACTION).isEqualTo(HazelcastConstants.RECEIVED))
+                                .log("...received").to("mock:received")
+                        .otherwise()
+                            .log("fail!");
+            }
+        };
+    }
+
+    private void checkHeaders(Map<String, Object> headers, String action) {
+        assertEquals(action, headers.get(HazelcastConstants.LISTENER_ACTION));
+        assertEquals(HazelcastConstants.CACHE_LISTENER, headers.get(HazelcastConstants.LISTENER_TYPE));
+        assertEquals(null, headers.get(HazelcastConstants.OBJECT_ID));
+        assertNotNull(headers.get(HazelcastConstants.LISTENER_TIME));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/619b0d45/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastReliableTopicProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastReliableTopicProducerTest.java b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastReliableTopicProducerTest.java
new file mode 100644
index 0000000..bb8f1b5
--- /dev/null
+++ b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastReliableTopicProducerTest.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hazelcast;
+
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.ITopic;
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.After;
+import org.junit.Test;
+import org.mockito.Mock;
+
+import static org.mockito.Mockito.*;
+
+public class HazelcastReliableTopicProducerTest extends HazelcastCamelTestSupport {
+
+    @Mock
+    private ITopic<String> reliableTopic;
+
+    @Override
+    protected void trainHazelcastInstance(HazelcastInstance hazelcastInstance) {
+        when(hazelcastInstance.<String>getReliableTopic("bar")).thenReturn(reliableTopic);
+    }
+
+    @Override
+    protected void verifyHazelcastInstance(HazelcastInstance hazelcastInstance) {
+        verify(hazelcastInstance, atLeastOnce()).getReliableTopic("bar");
+    }
+
+    @After
+    public void verifyQueueMock() {
+        verifyNoMoreInteractions(reliableTopic);
+    }
+
+    @Test(expected = CamelExecutionException.class)
+    public void testWithInvalidOperation() {
+        template.sendBody("direct:publishInvalid", "foo");
+    }
+
+    @Test
+    public void noOperation() {
+        template.sendBody("direct:no-operation", "bar");
+        verify(reliableTopic).publish("bar");
+    }
+
+    @Test
+    public void publish() {
+        template.sendBody("direct:publish", "bar");
+        verify(reliableTopic).publish("bar");
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:no-operation").to(String.format("hazelcast:%sbar?reliable=true", HazelcastConstants.TOPIC_PREFIX));
+
+                from("direct:publishInvalid").setHeader(HazelcastConstants.OPERATION, constant("bogus")).to(String.format("hazelcast:%sbar?reliable=true", HazelcastConstants.TOPIC_PREFIX));
+
+                from("direct:publish").setHeader(HazelcastConstants.OPERATION, constant(HazelcastConstants.PUBLISH_OPERATION)).to(String.format("hazelcast:%sbar?reliable=true", HazelcastConstants.TOPIC_PREFIX));
+            }
+        };
+    }
+
+}