You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2015/07/30 17:37:21 UTC

[2/2] camel git commit: CAMEL-9036 Camel-Hazelcast: Add support for set Data Structure

CAMEL-9036 Camel-Hazelcast: Add support for set Data Structure


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/b781dc07
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/b781dc07
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/b781dc07

Branch: refs/heads/master
Commit: b781dc07ef3a48ce744ee0e38824462a259aaf53
Parents: 600aeba
Author: Andrea Cosentino <an...@gmail.com>
Authored: Thu Jul 30 17:33:22 2015 +0200
Committer: Andrea Cosentino <an...@gmail.com>
Committed: Thu Jul 30 17:33:45 2015 +0200

----------------------------------------------------------------------
 .../component/hazelcast/HazelcastComponent.java |  14 ++-
 .../component/hazelcast/HazelcastConstants.java |   1 +
 .../hazelcast/set/HazelcastSetConsumer.java     |  41 +++++++
 .../hazelcast/set/HazelcastSetEndpoint.java     |  48 ++++++++
 .../hazelcast/set/HazelcastSetProducer.java     | 111 +++++++++++++++++++
 5 files changed, 212 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/b781dc07/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastComponent.java b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastComponent.java
index ec49638..94a09aa 100644
--- a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastComponent.java
+++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastComponent.java
@@ -34,6 +34,7 @@ import org.apache.camel.component.hazelcast.queue.HazelcastQueueEndpoint;
 import org.apache.camel.component.hazelcast.replicatedmap.HazelcastReplicatedmapEndpoint;
 import org.apache.camel.component.hazelcast.seda.HazelcastSedaConfiguration;
 import org.apache.camel.component.hazelcast.seda.HazelcastSedaEndpoint;
+import org.apache.camel.component.hazelcast.set.HazelcastSetEndpoint;
 import org.apache.camel.component.hazelcast.topic.HazelcastTopicEndpoint;
 import org.apache.camel.impl.UriEndpointComponent;
 
@@ -137,12 +138,19 @@ public class HazelcastComponent extends UriEndpointComponent {
             remaining = removeStartingCharacters(remaining.substring(HazelcastConstants.REPLICATEDMAP_PREFIX.length()), '/');
             endpoint = new HazelcastReplicatedmapEndpoint(hzInstance, uri, remaining, this);
             endpoint.setCommand(HazelcastCommand.replicatedmap);
-        }        
+        } 
+        
+        if (remaining.startsWith(HazelcastConstants.SET_PREFIX)) {
+            // remaining is anything (name it foo ;)
+            remaining = removeStartingCharacters(remaining.substring(HazelcastConstants.SET_PREFIX.length()), '/');
+            endpoint = new HazelcastSetEndpoint(hzInstance, uri, this, remaining);
+            endpoint.setCommand(HazelcastCommand.set);
+        } 
         
         if (endpoint == null) {
-            throw new IllegalArgumentException(String.format("Your URI does not provide a correct 'type' prefix. It should be anything like 'hazelcast:[%s|%s|%s|%s|%s|%s|%s|%s]name' but is '%s'.",
+            throw new IllegalArgumentException(String.format("Your URI does not provide a correct 'type' prefix. It should be anything like 'hazelcast:[%s|%s|%s|%s|%s|%s|%s|%s|%s]name' but is '%s'.",
                     HazelcastConstants.MAP_PREFIX, HazelcastConstants.MULTIMAP_PREFIX, HazelcastConstants.ATOMICNUMBER_PREFIX, HazelcastConstants.INSTANCE_PREFIX, HazelcastConstants.QUEUE_PREFIX,
-                    HazelcastConstants.SEDA_PREFIX, HazelcastConstants.LIST_PREFIX, HazelcastConstants.REPLICATEDMAP_PREFIX, uri));
+                    HazelcastConstants.SEDA_PREFIX, HazelcastConstants.LIST_PREFIX, HazelcastConstants.REPLICATEDMAP_PREFIX, HazelcastConstants.SET_PREFIX, uri));
         }
 
         if (defaultOperation != -1) {

http://git-wip-us.apache.org/repos/asf/camel/blob/b781dc07/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastConstants.java b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastConstants.java
index 6c42706..edc1090 100644
--- a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastConstants.java
+++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastConstants.java
@@ -30,6 +30,7 @@ public final class HazelcastConstants {
     public static final String TOPIC_PREFIX = "topic:";
     public static final String SEDA_PREFIX = "seda:";
     public static final String LIST_PREFIX = "list:";
+    public static final String SET_PREFIX = "set:";
 
     /*
      * incoming header properties

http://git-wip-us.apache.org/repos/asf/camel/blob/b781dc07/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/set/HazelcastSetConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/set/HazelcastSetConsumer.java b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/set/HazelcastSetConsumer.java
new file mode 100644
index 0000000..c850ecd
--- /dev/null
+++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/set/HazelcastSetConsumer.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hazelcast.set;
+
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.IList;
+import com.hazelcast.core.ISet;
+
+import org.apache.camel.Consumer;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Processor;
+import org.apache.camel.component.hazelcast.HazelcastDefaultConsumer;
+import org.apache.camel.component.hazelcast.listener.CamelItemListener;
+
+/**
+ * Implementation of Hazelcast Set {@link Consumer}.
+ */
+public class HazelcastSetConsumer extends HazelcastDefaultConsumer {
+
+    public HazelcastSetConsumer(HazelcastInstance hazelcastInstance, Endpoint endpoint, Processor processor, String cacheName) {
+        super(hazelcastInstance, endpoint, processor, cacheName);
+
+        ISet<Object> set = hazelcastInstance.getSet(cacheName);
+        set.addItemListener(new CamelItemListener(this, cacheName), true);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/b781dc07/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/set/HazelcastSetEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/set/HazelcastSetEndpoint.java b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/set/HazelcastSetEndpoint.java
new file mode 100644
index 0000000..fe4c27d
--- /dev/null
+++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/set/HazelcastSetEndpoint.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hazelcast.set;
+
+import com.hazelcast.core.HazelcastInstance;
+import org.apache.camel.Component;
+import org.apache.camel.Consumer;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.component.hazelcast.HazelcastDefaultEndpoint;
+
+/**
+ * Hazelcast Set {@link Endpoint} implementation.
+ */
+public class HazelcastSetEndpoint extends HazelcastDefaultEndpoint {
+
+    public HazelcastSetEndpoint(HazelcastInstance hazelcastInstance, String endpointUri, Component component, String cacheName) {
+        super(hazelcastInstance, endpointUri, component, cacheName);
+    }
+
+    @Override
+    public Consumer createConsumer(Processor processor) throws Exception {
+        HazelcastSetConsumer answer = new HazelcastSetConsumer(hazelcastInstance, this, processor, cacheName);
+        configureConsumer(answer);
+        return answer;
+    }
+
+    @Override
+    public Producer createProducer() throws Exception {
+        return new HazelcastSetProducer(hazelcastInstance, this, cacheName);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/b781dc07/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/set/HazelcastSetProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/set/HazelcastSetProducer.java b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/set/HazelcastSetProducer.java
new file mode 100644
index 0000000..99aec66
--- /dev/null
+++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/set/HazelcastSetProducer.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hazelcast.set;
+
+import java.util.Collection;
+import java.util.Map;
+
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.IList;
+import com.hazelcast.core.ISet;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Producer;
+import org.apache.camel.component.hazelcast.HazelcastComponentHelper;
+import org.apache.camel.component.hazelcast.HazelcastConstants;
+import org.apache.camel.component.hazelcast.HazelcastDefaultEndpoint;
+import org.apache.camel.component.hazelcast.HazelcastDefaultProducer;
+
+/**
+ * Implementation of Hazelcast Set {@link Producer}.
+ */
+public class HazelcastSetProducer extends HazelcastDefaultProducer {
+
+    private final ISet<Object> set;
+
+    public HazelcastSetProducer(HazelcastInstance hazelcastInstance, HazelcastDefaultEndpoint endpoint, String setName) {
+        super(endpoint);
+        this.set = hazelcastInstance.getSet(setName);
+    }
+
+    public void process(Exchange exchange) throws Exception {
+
+        final int operation = lookupOperationNumber(exchange);
+
+        switch (operation) {
+
+        case HazelcastConstants.ADD_OPERATION:
+            this.add(exchange);
+            break;
+
+        case HazelcastConstants.REMOVEVALUE_OPERATION:
+            this.remove(exchange);
+            break;
+            
+        case HazelcastConstants.CLEAR_OPERATION:
+            this.clear();
+            break;
+            
+        case HazelcastConstants.ADD_ALL_OPERATION:
+            this.addAll(exchange);
+            break;
+            
+        case HazelcastConstants.REMOVE_ALL_OPERATION:
+            this.removeAll(exchange);
+            break;
+
+        case HazelcastConstants.RETAIN_ALL_OPERATION:
+            this.retainAll(exchange);
+            break;
+            
+        default:
+            throw new IllegalArgumentException(String.format("The value '%s' is not allowed for parameter '%s' on the LIST cache.", operation, HazelcastConstants.OPERATION));
+        }
+
+        // finally copy headers
+        HazelcastComponentHelper.copyHeaders(exchange);
+    }
+
+    private void add(Exchange exchange) {
+        final Object body = exchange.getIn().getBody();
+        set.add(body);
+    }
+
+    private void remove(Exchange exchange) {
+        final Object body = exchange.getIn().getBody();
+        set.remove(body);
+    }
+    
+    private void clear() {
+        set.clear();
+    }
+    
+    private void addAll(Exchange exchange) {
+        final Object body = exchange.getIn().getBody();
+        set.addAll((Collection<? extends Object>) body);
+    }
+    
+    private void removeAll(Exchange exchange) {
+        final Object body = exchange.getIn().getBody();
+        set.removeAll((Collection<?>) body);
+    }
+    
+    private void retainAll(Exchange exchange) {
+        final Object body = exchange.getIn().getBody();
+        set.retainAll((Collection<?>) body);
+    }
+}