You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2015/07/30 17:37:21 UTC
[2/2] camel git commit: CAMEL-9036 Camel-Hazelcast: Add support for
set Data Structure
CAMEL-9036 Camel-Hazelcast: Add support for set Data Structure
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/b781dc07
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/b781dc07
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/b781dc07
Branch: refs/heads/master
Commit: b781dc07ef3a48ce744ee0e38824462a259aaf53
Parents: 600aeba
Author: Andrea Cosentino <an...@gmail.com>
Authored: Thu Jul 30 17:33:22 2015 +0200
Committer: Andrea Cosentino <an...@gmail.com>
Committed: Thu Jul 30 17:33:45 2015 +0200
----------------------------------------------------------------------
.../component/hazelcast/HazelcastComponent.java | 14 ++-
.../component/hazelcast/HazelcastConstants.java | 1 +
.../hazelcast/set/HazelcastSetConsumer.java | 41 +++++++
.../hazelcast/set/HazelcastSetEndpoint.java | 48 ++++++++
.../hazelcast/set/HazelcastSetProducer.java | 111 +++++++++++++++++++
5 files changed, 212 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/b781dc07/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastComponent.java b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastComponent.java
index ec49638..94a09aa 100644
--- a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastComponent.java
+++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastComponent.java
@@ -34,6 +34,7 @@ import org.apache.camel.component.hazelcast.queue.HazelcastQueueEndpoint;
import org.apache.camel.component.hazelcast.replicatedmap.HazelcastReplicatedmapEndpoint;
import org.apache.camel.component.hazelcast.seda.HazelcastSedaConfiguration;
import org.apache.camel.component.hazelcast.seda.HazelcastSedaEndpoint;
+import org.apache.camel.component.hazelcast.set.HazelcastSetEndpoint;
import org.apache.camel.component.hazelcast.topic.HazelcastTopicEndpoint;
import org.apache.camel.impl.UriEndpointComponent;
@@ -137,12 +138,19 @@ public class HazelcastComponent extends UriEndpointComponent {
remaining = removeStartingCharacters(remaining.substring(HazelcastConstants.REPLICATEDMAP_PREFIX.length()), '/');
endpoint = new HazelcastReplicatedmapEndpoint(hzInstance, uri, remaining, this);
endpoint.setCommand(HazelcastCommand.replicatedmap);
- }
+ }
+
+ if (remaining.startsWith(HazelcastConstants.SET_PREFIX)) {
+ // remaining is anything (name it foo ;)
+ remaining = removeStartingCharacters(remaining.substring(HazelcastConstants.SET_PREFIX.length()), '/');
+ endpoint = new HazelcastSetEndpoint(hzInstance, uri, this, remaining);
+ endpoint.setCommand(HazelcastCommand.set);
+ }
if (endpoint == null) {
- throw new IllegalArgumentException(String.format("Your URI does not provide a correct 'type' prefix. It should be anything like 'hazelcast:[%s|%s|%s|%s|%s|%s|%s|%s]name' but is '%s'.",
+ throw new IllegalArgumentException(String.format("Your URI does not provide a correct 'type' prefix. It should be anything like 'hazelcast:[%s|%s|%s|%s|%s|%s|%s|%s|%s]name' but is '%s'.",
HazelcastConstants.MAP_PREFIX, HazelcastConstants.MULTIMAP_PREFIX, HazelcastConstants.ATOMICNUMBER_PREFIX, HazelcastConstants.INSTANCE_PREFIX, HazelcastConstants.QUEUE_PREFIX,
- HazelcastConstants.SEDA_PREFIX, HazelcastConstants.LIST_PREFIX, HazelcastConstants.REPLICATEDMAP_PREFIX, uri));
+ HazelcastConstants.SEDA_PREFIX, HazelcastConstants.LIST_PREFIX, HazelcastConstants.REPLICATEDMAP_PREFIX, HazelcastConstants.SET_PREFIX, uri));
}
if (defaultOperation != -1) {
http://git-wip-us.apache.org/repos/asf/camel/blob/b781dc07/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastConstants.java b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastConstants.java
index 6c42706..edc1090 100644
--- a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastConstants.java
+++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastConstants.java
@@ -30,6 +30,7 @@ public final class HazelcastConstants {
public static final String TOPIC_PREFIX = "topic:";
public static final String SEDA_PREFIX = "seda:";
public static final String LIST_PREFIX = "list:";
+ public static final String SET_PREFIX = "set:";
/*
* incoming header properties
http://git-wip-us.apache.org/repos/asf/camel/blob/b781dc07/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/set/HazelcastSetConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/set/HazelcastSetConsumer.java b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/set/HazelcastSetConsumer.java
new file mode 100644
index 0000000..c850ecd
--- /dev/null
+++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/set/HazelcastSetConsumer.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hazelcast.set;
+
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.IList;
+import com.hazelcast.core.ISet;
+
+import org.apache.camel.Consumer;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Processor;
+import org.apache.camel.component.hazelcast.HazelcastDefaultConsumer;
+import org.apache.camel.component.hazelcast.listener.CamelItemListener;
+
+/**
+ * Implementation of Hazelcast Set {@link Consumer}.
+ */
+public class HazelcastSetConsumer extends HazelcastDefaultConsumer {
+
+ public HazelcastSetConsumer(HazelcastInstance hazelcastInstance, Endpoint endpoint, Processor processor, String cacheName) {
+ super(hazelcastInstance, endpoint, processor, cacheName);
+
+ ISet<Object> set = hazelcastInstance.getSet(cacheName);
+ set.addItemListener(new CamelItemListener(this, cacheName), true);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/b781dc07/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/set/HazelcastSetEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/set/HazelcastSetEndpoint.java b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/set/HazelcastSetEndpoint.java
new file mode 100644
index 0000000..fe4c27d
--- /dev/null
+++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/set/HazelcastSetEndpoint.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hazelcast.set;
+
+import com.hazelcast.core.HazelcastInstance;
+import org.apache.camel.Component;
+import org.apache.camel.Consumer;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.component.hazelcast.HazelcastDefaultEndpoint;
+
+/**
+ * Hazelcast Set {@link Endpoint} implementation.
+ */
+public class HazelcastSetEndpoint extends HazelcastDefaultEndpoint {
+
+ public HazelcastSetEndpoint(HazelcastInstance hazelcastInstance, String endpointUri, Component component, String cacheName) {
+ super(hazelcastInstance, endpointUri, component, cacheName);
+ }
+
+ @Override
+ public Consumer createConsumer(Processor processor) throws Exception {
+ HazelcastSetConsumer answer = new HazelcastSetConsumer(hazelcastInstance, this, processor, cacheName);
+ configureConsumer(answer);
+ return answer;
+ }
+
+ @Override
+ public Producer createProducer() throws Exception {
+ return new HazelcastSetProducer(hazelcastInstance, this, cacheName);
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/b781dc07/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/set/HazelcastSetProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/set/HazelcastSetProducer.java b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/set/HazelcastSetProducer.java
new file mode 100644
index 0000000..99aec66
--- /dev/null
+++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/set/HazelcastSetProducer.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hazelcast.set;
+
+import java.util.Collection;
+import java.util.Map;
+
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.IList;
+import com.hazelcast.core.ISet;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Producer;
+import org.apache.camel.component.hazelcast.HazelcastComponentHelper;
+import org.apache.camel.component.hazelcast.HazelcastConstants;
+import org.apache.camel.component.hazelcast.HazelcastDefaultEndpoint;
+import org.apache.camel.component.hazelcast.HazelcastDefaultProducer;
+
+/**
+ * Implementation of Hazelcast Set {@link Producer}.
+ */
+public class HazelcastSetProducer extends HazelcastDefaultProducer {
+
+ private final ISet<Object> set;
+
+ public HazelcastSetProducer(HazelcastInstance hazelcastInstance, HazelcastDefaultEndpoint endpoint, String setName) {
+ super(endpoint);
+ this.set = hazelcastInstance.getSet(setName);
+ }
+
+ public void process(Exchange exchange) throws Exception {
+
+ final int operation = lookupOperationNumber(exchange);
+
+ switch (operation) {
+
+ case HazelcastConstants.ADD_OPERATION:
+ this.add(exchange);
+ break;
+
+ case HazelcastConstants.REMOVEVALUE_OPERATION:
+ this.remove(exchange);
+ break;
+
+ case HazelcastConstants.CLEAR_OPERATION:
+ this.clear();
+ break;
+
+ case HazelcastConstants.ADD_ALL_OPERATION:
+ this.addAll(exchange);
+ break;
+
+ case HazelcastConstants.REMOVE_ALL_OPERATION:
+ this.removeAll(exchange);
+ break;
+
+ case HazelcastConstants.RETAIN_ALL_OPERATION:
+ this.retainAll(exchange);
+ break;
+
+ default:
+ throw new IllegalArgumentException(String.format("The value '%s' is not allowed for parameter '%s' on the LIST cache.", operation, HazelcastConstants.OPERATION));
+ }
+
+ // finally copy headers
+ HazelcastComponentHelper.copyHeaders(exchange);
+ }
+
+ private void add(Exchange exchange) {
+ final Object body = exchange.getIn().getBody();
+ set.add(body);
+ }
+
+ private void remove(Exchange exchange) {
+ final Object body = exchange.getIn().getBody();
+ set.remove(body);
+ }
+
+ private void clear() {
+ set.clear();
+ }
+
+ private void addAll(Exchange exchange) {
+ final Object body = exchange.getIn().getBody();
+ set.addAll((Collection<? extends Object>) body);
+ }
+
+ private void removeAll(Exchange exchange) {
+ final Object body = exchange.getIn().getBody();
+ set.removeAll((Collection<?>) body);
+ }
+
+ private void retainAll(Exchange exchange) {
+ final Object body = exchange.getIn().getBody();
+ set.retainAll((Collection<?>) body);
+ }
+}