You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/05/28 07:30:41 UTC
[3/7] camel git commit: initial registry files
initial registry files
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/36f25f43
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/36f25f43
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/36f25f43
Branch: refs/heads/master
Commit: 36f25f43a58b104465b47e5ec091f0d955431795
Parents: bc50b54
Author: Bernd Prager <be...@prager.ws>
Authored: Wed May 25 10:52:34 2016 -0400
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat May 28 09:18:07 2016 +0200
----------------------------------------------------------------------
.../camel/component/consul/ConsulRegistry.java | 272 +++++++++++++++++++
.../component/consul/ConsulRegistryTest.java | 31 +++
2 files changed, 303 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/36f25f43/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulRegistry.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulRegistry.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulRegistry.java
new file mode 100644
index 0000000..472a537
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulRegistry.java
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.camel.NoSuchBeanException;
+import org.apache.camel.spi.Registry;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.lang.SerializationUtils;
+import org.apache.log4j.Logger;
+
+import com.google.common.base.Optional;
+import com.google.common.net.HostAndPort;
+import com.orbitz.consul.Consul;
+import com.orbitz.consul.ConsulException;
+import com.orbitz.consul.KeyValueClient;
+import com.orbitz.consul.SessionClient;
+import com.orbitz.consul.model.session.ImmutableSession;
+import com.orbitz.consul.model.session.SessionCreatedResponse;
+
+/**
+ *
+ * @author Bernd Prager
+ * Apache Camel Plug-in for Consul Registry (Objects stored
+ * under kv/key as well as bookmarked under kv/[type]/key to avoid
+ * iteration over types)
+ *
+ */
+public class ConsulRegistry implements Registry {
+
+ private static final Logger logger = Logger.getLogger(ConsulRegistry.class);
+
+ private String hostname = "localhost";
+ private int port = 8500;
+ private Consul consul;
+ private KeyValueClient kvClient;
+
+ /* constructor with default port */
+ public ConsulRegistry(String hostname) {
+ this(hostname, 8500);
+ }
+
+ /* constructor (since spring.xml does not support builder pattern) */
+ public ConsulRegistry(String hostname, int port) {
+ super();
+ this.hostname = hostname;
+ this.port = port;
+ logger.debug("get consul client for: " + hostname + ":" + port);
+ HostAndPort hostAndPort = HostAndPort.fromParts(hostname, port);
+ this.consul = Consul.builder().withHostAndPort(hostAndPort).build();
+ }
+
+ /* builder pattern */
+ private ConsulRegistry(Builder builder) {
+ this.hostname = builder.hostname;
+ this.port = builder.port;
+ logger.debug("get consul client for: " + hostname + ":" + port);
+ HostAndPort hostAndPort = HostAndPort.fromParts(hostname, port);
+ this.consul = Consul.builder().withHostAndPort(hostAndPort).build();
+ }
+
+ @Override
+ public Object lookupByName(String key) {
+ // Substitute $ character in key
+ key = key.replaceAll("\\$", "/");
+ logger.debug("lookup by name: " + key);
+ kvClient = consul.keyValueClient();
+ Optional<String> result = kvClient.getValueAsString(key);
+ if (result.isPresent()) {
+ byte[] postDecodedValue = Base64.decodeBase64(result.get());
+ logger.debug("got result: " + postDecodedValue);
+ return SerializationUtils.deserialize(postDecodedValue);
+ }
+ return null;
+ }
+
+ @Override
+ public <T> T lookupByNameAndType(String name, Class<T> type) {
+ logger.debug("lookup by name: " + name + " and type: " + type);
+ Object object = lookupByName(name);
+ if (object == null)
+ return null;
+ try {
+ return type.cast(object);
+ } catch (Throwable e) {
+ String msg = "Found bean: " + name + " in Consul Registry: " + this + " of type: "
+ + object.getClass().getName() + "expected type was: " + type;
+ throw new NoSuchBeanException(name, msg, e);
+ }
+ }
+
+ @Override
+ public <T> Map<String, T> findByTypeWithName(Class<T> type) {
+ logger.debug("find by type with name: " + type);
+ Object obj = null;
+ Map<String, T> result = new HashMap<String, T>();
+ // encode $ signs as they occur in subclass types
+ String keyPrefix = type.getName().replaceAll("\\$", "/");
+ logger.debug("keyPrefix: " + keyPrefix);
+ kvClient = consul.keyValueClient();
+ List<String> keys = null;
+ try {
+ keys = kvClient.getKeys(keyPrefix);
+ } catch (ConsulException e) {
+ return result;
+ }
+ if (keys != null) {
+ for (String key : keys) {
+ // change bookmark back into actual key
+ key = key.substring((key.lastIndexOf('/') + 1));
+ obj = lookupByName(key.replaceAll("\\$", "/"));
+ if (type.isInstance(obj)) {
+ result.put(key, type.cast(obj));
+ }
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public <T> Set<T> findByType(Class<T> type) {
+ String keyPrefix = type.getName().replaceAll("\\$", "/");
+ logger.debug("find by type using keyPrefix: " + keyPrefix);
+ Object object = null;
+ Set<T> result = new HashSet<T>();
+ List<String> keys = null;
+ try {
+ keys = kvClient.getKeys(keyPrefix);
+ } catch (ConsulException e) {
+ logger.debug("no keys found");
+ return result;
+ }
+ if (keys != null) {
+ for (String key : keys) {
+ // change bookmark back into actual key
+ key = key.substring((key.lastIndexOf('/') + 1));
+ logger.debug("now going for key :" + key);
+ object = lookupByName(key.replaceAll("\\$", "/"));
+ if (type.isInstance(object)) {
+ result.add(type.cast(object));
+ }
+ }
+ }
+ return result;
+ }
+
+ public void remove(String key) {
+ // create session to avoid conflicts (not sure if that is safe enough)
+ SessionClient sessionClient = consul.sessionClient();
+ String sessionName = "session_" + UUID.randomUUID().toString();
+ //
+ SessionCreatedResponse response = sessionClient
+ .createSession(ImmutableSession.builder().name(sessionName).build());
+ String sessionId = response.getId();
+ kvClient = consul.keyValueClient();
+ String lockKey = "lock_" + key;
+ kvClient.acquireLock(lockKey, sessionName, sessionId);
+ Object object = lookupByName(key);
+ if (object == null) {
+ String msg = "Bean with key '" + key + "' did not exist in Consul Registry.";
+ throw new NoSuchBeanException(msg);
+ }
+ kvClient.deleteKey(key);
+ kvClient.deleteKey(object.getClass().getName() + "/" + key);
+ kvClient.releaseLock(lockKey, sessionId);
+ }
+
+ public void put(String key, Object object) {
+ // Substitute $ character in key
+ key = key.replaceAll("\\$", "/");
+ // create session to avoid conflicts
+ // (not sure if that is safe enough, again)
+ SessionClient sessionClient = consul.sessionClient();
+ String sessionName = "session_" + UUID.randomUUID().toString();
+ SessionCreatedResponse response = sessionClient
+ .createSession(ImmutableSession.builder().name(sessionName).build());
+ String sessionId = response.getId();
+ kvClient = consul.keyValueClient();
+ String lockKey = "lock_" + key;
+ kvClient.acquireLock(lockKey, sessionName, sessionId);
+
+ // Allow only unique keys, last one wins
+ if (lookupByName(key) != null) {
+ remove(key);
+ }
+ Object clone = SerializationUtils.clone((Serializable) object);
+ byte[] serializedObject = SerializationUtils.serialize((Serializable) clone);
+ // pre-encode due native encoding issues
+ byte[] preEncodedValue = Base64.encodeBase64(serializedObject);
+ String value = new String(preEncodedValue);
+ // store the actual class
+ logger.debug("store value: " + value + " ,under key: " + key);
+ kvClient.putValue(key, value);
+ // store just as a bookmark
+ logger.debug("store bookmark: " + 1 + " ,under key: " + object.getClass().getName().replaceAll("\\$", "/") + "/" + key);
+ kvClient.putValue(object.getClass().getName().replaceAll("\\$", "/") + "/" + key, "1");
+ kvClient.releaseLock(lockKey, sessionId);
+ }
+
+ public static class Builder {
+ // required parameter
+ String hostname;
+ // optional parameter
+ Integer port = 8500;
+
+ public Builder(String hostname) {
+ this.hostname = hostname;
+ }
+
+ public Builder port(Integer port) {
+ this.port = port;
+ return this;
+ }
+
+ public ConsulRegistry build() {
+ return new ConsulRegistry(this);
+ }
+ }
+
+ public String getHostname() {
+ return hostname;
+ }
+
+ public void setHostname(String hostname) {
+ this.hostname = hostname;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public void setPort(int port) {
+ this.port = port;
+ }
+
+ @Override
+ public Object lookup(String name) {
+ return lookupByName(name);
+ }
+
+ @Override
+ public <T> T lookup(String name, Class<T> type) {
+ return lookupByNameAndType(name, type);
+ }
+
+ @Override
+ public <T> Map<String, T> lookupByType(Class<T> type) {
+ return lookupByType(type);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/36f25f43/components/camel-consul/src/test/java/org/apache/camel/component/consul/ConsulRegistryTest.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/test/java/org/apache/camel/component/consul/ConsulRegistryTest.java b/components/camel-consul/src/test/java/org/apache/camel/component/consul/ConsulRegistryTest.java
new file mode 100644
index 0000000..03be60e
--- /dev/null
+++ b/components/camel-consul/src/test/java/org/apache/camel/component/consul/ConsulRegistryTest.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul;
+
+import java.util.HashMap;
+
+import org.junit.BeforeClass;
+
+public class ConsulRegistryTest extends ConsulTestSupport {
+
+ static HashMap<String, String> mockConsul;
+
+ @BeforeClass public static void prepHashMap() {
+ mockConsul = new HashMap<String, String>();
+ }
+
+}