You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/05/28 07:30:41 UTC

[3/7] camel git commit: initial registry files

initial registry files


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/36f25f43
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/36f25f43
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/36f25f43

Branch: refs/heads/master
Commit: 36f25f43a58b104465b47e5ec091f0d955431795
Parents: bc50b54
Author: Bernd Prager <be...@prager.ws>
Authored: Wed May 25 10:52:34 2016 -0400
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat May 28 09:18:07 2016 +0200

----------------------------------------------------------------------
 .../camel/component/consul/ConsulRegistry.java  | 272 +++++++++++++++++++
 .../component/consul/ConsulRegistryTest.java    |  31 +++
 2 files changed, 303 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/36f25f43/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulRegistry.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulRegistry.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulRegistry.java
new file mode 100644
index 0000000..472a537
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulRegistry.java
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.camel.NoSuchBeanException;
+import org.apache.camel.spi.Registry;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.lang.SerializationUtils;
+import org.apache.log4j.Logger;
+
+import com.google.common.base.Optional;
+import com.google.common.net.HostAndPort;
+import com.orbitz.consul.Consul;
+import com.orbitz.consul.ConsulException;
+import com.orbitz.consul.KeyValueClient;
+import com.orbitz.consul.SessionClient;
+import com.orbitz.consul.model.session.ImmutableSession;
+import com.orbitz.consul.model.session.SessionCreatedResponse;
+
+/**
+ * 
+ * @author Bernd Prager 
+ * 		Apache Camel Plug-in for Consul Registry (Objects stored
+ *      under kv/key as well as bookmarked under kv/[type]/key to avoid
+ *      iteration over types)
+ * 
+ */
+public class ConsulRegistry implements Registry {
+
+	private static final Logger logger = Logger.getLogger(ConsulRegistry.class);
+
+	private String hostname = "localhost";
+	private int port = 8500;
+	private Consul consul;
+	private KeyValueClient kvClient;
+	
+	/* constructor with default port */
+	public ConsulRegistry(String hostname) {
+		this(hostname, 8500);
+	}
+
+	/* constructor (since spring.xml does not support builder pattern) */
+	public ConsulRegistry(String hostname, int port) {
+		super();
+		this.hostname = hostname;
+		this.port = port;
+		logger.debug("get consul client for: " + hostname + ":" + port);
+		HostAndPort hostAndPort = HostAndPort.fromParts(hostname, port);
+		this.consul = Consul.builder().withHostAndPort(hostAndPort).build();
+	}
+
+	/* builder pattern */
+	private ConsulRegistry(Builder builder) {
+		this.hostname = builder.hostname;
+		this.port = builder.port;
+		logger.debug("get consul client for: " + hostname + ":" + port);
+		HostAndPort hostAndPort = HostAndPort.fromParts(hostname, port);
+		this.consul = Consul.builder().withHostAndPort(hostAndPort).build();
+	}
+
+	@Override
+	public Object lookupByName(String key) {
+		// Substitute $ character in key
+		key = key.replaceAll("\\$", "/");
+		logger.debug("lookup by name: " + key);
+		kvClient = consul.keyValueClient();
+		Optional<String> result = kvClient.getValueAsString(key);
+		if (result.isPresent()) {
+			byte[] postDecodedValue = Base64.decodeBase64(result.get());
+			logger.debug("got result: " + postDecodedValue);
+			return SerializationUtils.deserialize(postDecodedValue);
+		}
+		return null;
+	}
+
+	@Override
+	public <T> T lookupByNameAndType(String name, Class<T> type) {
+		logger.debug("lookup by name: " + name + " and type: " + type);
+		Object object = lookupByName(name);
+		if (object == null)
+			return null;
+		try {
+			return type.cast(object);
+		} catch (Throwable e) {
+			String msg = "Found bean: " + name + " in Consul Registry: " + this + " of type: "
+					+ object.getClass().getName() + "expected type was: " + type;
+			throw new NoSuchBeanException(name, msg, e);
+		}
+	}
+
+	@Override
+	public <T> Map<String, T> findByTypeWithName(Class<T> type) {
+		logger.debug("find by type with name: " + type);
+		Object obj = null;
+		Map<String, T> result = new HashMap<String, T>();
+		// encode $ signs as they occur in subclass types
+		String keyPrefix = type.getName().replaceAll("\\$", "/");
+		logger.debug("keyPrefix: " + keyPrefix);
+		kvClient = consul.keyValueClient();
+		List<String> keys = null;
+		try {
+			keys = kvClient.getKeys(keyPrefix);
+		} catch (ConsulException e) {
+			return result;
+		}
+		if (keys != null) {
+			for (String key : keys) {
+				// change bookmark back into actual key
+				key = key.substring((key.lastIndexOf('/') + 1));
+				obj = lookupByName(key.replaceAll("\\$", "/"));
+				if (type.isInstance(obj)) {
+					result.put(key, type.cast(obj));
+				}
+			}
+		}
+		return result;
+	}
+
+	@Override
+	public <T> Set<T> findByType(Class<T> type) {
+		String keyPrefix = type.getName().replaceAll("\\$", "/");
+		logger.debug("find by type using keyPrefix: " + keyPrefix);
+		Object object = null;
+		Set<T> result = new HashSet<T>();
+		List<String> keys = null;
+		try {
+			keys = kvClient.getKeys(keyPrefix);
+		} catch (ConsulException e) {
+			logger.debug("no keys found");
+			return result;
+		}
+		if (keys != null) {
+			for (String key : keys) {
+				// change bookmark back into actual key
+				key = key.substring((key.lastIndexOf('/') + 1));
+				logger.debug("now going for key :" + key);
+				object = lookupByName(key.replaceAll("\\$", "/"));
+				if (type.isInstance(object)) {
+					result.add(type.cast(object));
+				}
+			}
+		}
+		return result;
+	}
+
+	public void remove(String key) {
+		// create session to avoid conflicts (not sure if that is safe enough)
+		SessionClient sessionClient = consul.sessionClient();
+		String sessionName = "session_" + UUID.randomUUID().toString();
+		//
+		SessionCreatedResponse response = sessionClient
+				.createSession(ImmutableSession.builder().name(sessionName).build());
+		String sessionId = response.getId();
+		kvClient = consul.keyValueClient();
+		String lockKey = "lock_" + key;
+		kvClient.acquireLock(lockKey, sessionName, sessionId);
+		Object object = lookupByName(key);
+		if (object == null) {
+			String msg = "Bean with key '" + key + "' did not exist in Consul Registry.";
+			throw new NoSuchBeanException(msg);
+		}
+		kvClient.deleteKey(key);
+		kvClient.deleteKey(object.getClass().getName() + "/" + key);
+		kvClient.releaseLock(lockKey, sessionId);
+	}
+
+	public void put(String key, Object object) {
+		// Substitute $ character in key
+		key = key.replaceAll("\\$", "/");
+		// create session to avoid conflicts
+		// (not sure if that is safe enough, again)
+		SessionClient sessionClient = consul.sessionClient();
+		String sessionName = "session_" + UUID.randomUUID().toString();
+		SessionCreatedResponse response = sessionClient
+				.createSession(ImmutableSession.builder().name(sessionName).build());
+		String sessionId = response.getId();
+		kvClient = consul.keyValueClient();
+		String lockKey = "lock_" + key;
+		kvClient.acquireLock(lockKey, sessionName, sessionId);
+
+		// Allow only unique keys, last one wins
+		if (lookupByName(key) != null) {
+			remove(key);
+		}
+		Object clone = SerializationUtils.clone((Serializable) object);
+		byte[] serializedObject = SerializationUtils.serialize((Serializable) clone);
+		// pre-encode due native encoding issues
+		byte[] preEncodedValue = Base64.encodeBase64(serializedObject);
+		String value = new String(preEncodedValue);
+		// store the actual class
+		logger.debug("store value: " + value + " ,under key: " + key);
+		kvClient.putValue(key, value);
+		// store just as a bookmark
+		logger.debug("store bookmark: " + 1 + " ,under key: " + object.getClass().getName().replaceAll("\\$", "/") + "/" + key);
+		kvClient.putValue(object.getClass().getName().replaceAll("\\$", "/") + "/" + key, "1");
+		kvClient.releaseLock(lockKey, sessionId);
+	}
+
+	public static class Builder {
+		// required parameter
+		String hostname;
+		// optional parameter
+		Integer port = 8500;
+
+		public Builder(String hostname) {
+			this.hostname = hostname;
+		}
+
+		public Builder port(Integer port) {
+			this.port = port;
+			return this;
+		}
+
+		public ConsulRegistry build() {
+			return new ConsulRegistry(this);
+		}
+	}
+
+	public String getHostname() {
+		return hostname;
+	}
+
+	public void setHostname(String hostname) {
+		this.hostname = hostname;
+	}
+
+	public int getPort() {
+		return port;
+	}
+
+	public void setPort(int port) {
+		this.port = port;
+	}
+
+	@Override
+	public Object lookup(String name) {
+		return lookupByName(name);
+	}
+
+	@Override
+	public <T> T lookup(String name, Class<T> type) {
+		return lookupByNameAndType(name, type);
+	}
+
+	@Override
+	public <T> Map<String, T> lookupByType(Class<T> type) {
+		return lookupByType(type);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/36f25f43/components/camel-consul/src/test/java/org/apache/camel/component/consul/ConsulRegistryTest.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/test/java/org/apache/camel/component/consul/ConsulRegistryTest.java b/components/camel-consul/src/test/java/org/apache/camel/component/consul/ConsulRegistryTest.java
new file mode 100644
index 0000000..03be60e
--- /dev/null
+++ b/components/camel-consul/src/test/java/org/apache/camel/component/consul/ConsulRegistryTest.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul;
+
+import java.util.HashMap;
+
+import org.junit.BeforeClass;
+
+public class ConsulRegistryTest extends ConsulTestSupport {
+	
+	static HashMap<String, String> mockConsul;
+	
+	@BeforeClass public static void prepHashMap() {
+	       mockConsul = new HashMap<String, String>();
+	    }
+
+}