You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aries.apache.org by cs...@apache.org on 2016/03/14 17:49:39 UTC
[5/5] aries-rsa git commit: Adding roundtrip test
Adding roundtrip test
Project: http://git-wip-us.apache.org/repos/asf/aries-rsa/repo
Commit: http://git-wip-us.apache.org/repos/asf/aries-rsa/commit/69bb901e
Tree: http://git-wip-us.apache.org/repos/asf/aries-rsa/tree/69bb901e
Diff: http://git-wip-us.apache.org/repos/asf/aries-rsa/diff/69bb901e
Branch: refs/heads/master
Commit: 69bb901e935452f25ae1ed8ba86484b33deb1bd5
Parents: bd0352f
Author: Christian Schneider <ch...@die-schneider.net>
Authored: Mon Mar 14 17:49:19 2016 +0100
Committer: Christian Schneider <ch...@die-schneider.net>
Committed: Mon Mar 14 17:49:19 2016 +0100
----------------------------------------------------------------------
discovery/pom.xml | 2 -
discovery/zookeeper-server-config/bnd.bnd | 1 -
discovery/zookeeper-server-config/pom.xml | 40 ---
.../zookeeper/server/config/Activator.java | 110 -------
discovery/zookeeper-server/bnd.bnd | 1 -
discovery/zookeeper-server/pom.xml | 50 ---
.../discovery/zookeeper/server/Activator.java | 44 ---
.../dosgi/discovery/zookeeper/server/Utils.java | 108 -------
.../zookeeper/server/ZookeeperStarter.java | 164 ----------
.../resources/OSGI-INF/metatype/zookeeper.xml | 34 ---
.../zookeeper/server/ZookeeperStarterTest.java | 81 -----
discovery/zookeeper/bnd.bnd | 3 +-
.../rsa/discovery/zookeeper/Activator.java | 58 ++++
.../discovery/zookeeper/ZooKeeperDiscovery.java | 186 ++++++++++++
.../zookeeper/publish/DiscoveryPlugin.java | 54 ++++
.../publish/PublishingEndpointListener.java | 210 +++++++++++++
.../PublishingEndpointListenerFactory.java | 105 +++++++
.../rsa/discovery/zookeeper/server/Utils.java | 108 +++++++
.../zookeeper/server/ZookeeperStarter.java | 164 ++++++++++
.../subscribe/EndpointListenerTracker.java | 56 ++++
.../zookeeper/subscribe/InterfaceMonitor.java | 262 ++++++++++++++++
.../subscribe/InterfaceMonitorManager.java | 261 ++++++++++++++++
.../rsa/discovery/zookeeper/util/Utils.java | 54 ++++
.../dosgi/discovery/zookeeper/Activator.java | 43 ---
.../discovery/zookeeper/ZooKeeperDiscovery.java | 186 ------------
.../zookeeper/publish/DiscoveryPlugin.java | 54 ----
.../publish/PublishingEndpointListener.java | 210 -------------
.../PublishingEndpointListenerFactory.java | 105 -------
.../subscribe/EndpointListenerTracker.java | 56 ----
.../zookeeper/subscribe/InterfaceMonitor.java | 262 ----------------
.../subscribe/InterfaceMonitorManager.java | 261 ----------------
.../dosgi/discovery/zookeeper/util/Utils.java | 54 ----
.../resources/OSGI-INF/metatype/zookeeper.xml | 34 +++
.../zookeeper/DiscoveryDriverTest.java | 135 +++++++++
.../FindInZooKeeperCustomizerTest.java | 301 +++++++++++++++++++
.../InterfaceDataMonitorListenerImplTest.java | 183 +++++++++++
.../zookeeper/ZookeeperDiscoveryTest.java | 56 ++++
.../PublishingEndpointListenerFactoryTest.java | 102 +++++++
.../publish/PublishingEndpointListenerTest.java | 209 +++++++++++++
.../zookeeper/server/ZookeeperStarterTest.java | 82 +++++
.../subscribe/InterfaceMonitorManagerTest.java | 113 +++++++
.../subscribe/InterfaceMonitorTest.java | 68 +++++
.../rsa/discovery/zookeeper/util/UtilsTest.java | 37 +++
.../zookeeper/DiscoveryDriverTest.java | 135 ---------
.../FindInZooKeeperCustomizerTest.java | 301 -------------------
.../InterfaceDataMonitorListenerImplTest.java | 183 -----------
.../zookeeper/ZookeeperDiscoveryTest.java | 55 ----
.../PublishingEndpointListenerFactoryTest.java | 100 ------
.../publish/PublishingEndpointListenerTest.java | 207 -------------
.../subscribe/InterfaceMonitorManagerTest.java | 112 -------
.../subscribe/InterfaceMonitorTest.java | 67 -----
.../discovery/zookeeper/util/UtilsTest.java | 35 ---
examples/echotcp/Readme.md | 44 +++
examples/echotcp/api/bnd.bnd | 1 +
examples/echotcp/api/pom.xml | 11 +
.../rsa/examples/echotcp/api/EchoService.java | 5 +
.../aries/rsa/examples/echotcp/api/packageinfo | 19 ++
examples/echotcp/consumer/bnd.bnd | 1 +
examples/echotcp/consumer/pom.xml | 39 +++
.../examples/echotcp/consumer/EchoConsumer.java | 41 +++
examples/echotcp/pom.xml | 76 +++++
examples/echotcp/service/bnd.bnd | 1 +
examples/echotcp/service/pom.xml | 39 +++
.../echotcp/service/EchoServiceImpl.java | 14 +
examples/pom.xml | 43 +++
features/src/main/resources/features.xml | 4 -
itests/felix/pom.xml | 35 ++-
.../aries/rsa/itests/felix/RsaTestBase.java | 120 ++++++++
.../rsa/itests/felix/TestDiscoveryExport.java | 90 +-----
.../aries/rsa/itests/felix/TestRoundTrip.java | 88 ++++++
.../felix/ZookeeperDiscoveryConfigurer.java | 28 ++
.../itests/felix/ZookeeperServerConfigurer.java | 32 ++
itests/pom.xml | 1 -
itests/testbundle-service-tcp/bnd.bnd | 3 -
itests/testbundle-service-tcp/pom.xml | 40 ---
.../aries/rsa/itests/tcp/api/EchoService.java | 5 -
.../aries/rsa/itests/tcp/service/Activator.java | 25 --
.../rsa/itests/tcp/service/EchoServiceImpl.java | 12 -
pom.xml | 1 +
.../aries/rsa/core/ClientServiceFactory.java | 4 -
spi/pom.xml | 19 ++
81 files changed, 3503 insertions(+), 3240 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/pom.xml
----------------------------------------------------------------------
diff --git a/discovery/pom.xml b/discovery/pom.xml
index 40b5f80..9fc449b 100644
--- a/discovery/pom.xml
+++ b/discovery/pom.xml
@@ -34,7 +34,5 @@
<modules>
<module>local</module>
<module>zookeeper</module>
- <module>zookeeper-server</module>
- <module>zookeeper-server-config</module>
</modules>
</project>
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper-server-config/bnd.bnd
----------------------------------------------------------------------
diff --git a/discovery/zookeeper-server-config/bnd.bnd b/discovery/zookeeper-server-config/bnd.bnd
deleted file mode 100644
index 769558e..0000000
--- a/discovery/zookeeper-server-config/bnd.bnd
+++ /dev/null
@@ -1 +0,0 @@
-Bundle-Activator: org.apache.cxf.dosgi.discovery.zookeeper.server.config.Activator
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper-server-config/pom.xml
----------------------------------------------------------------------
diff --git a/discovery/zookeeper-server-config/pom.xml b/discovery/zookeeper-server-config/pom.xml
deleted file mode 100644
index 4f7ac7a..0000000
--- a/discovery/zookeeper-server-config/pom.xml
+++ /dev/null
@@ -1,40 +0,0 @@
-<?xml version='1.0' encoding='UTF-8' ?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.aries.rsa</groupId>
- <artifactId>parent</artifactId>
- <version>1.8-SNAPSHOT</version>
- <relativePath>../../parent/pom.xml</relativePath>
- </parent>
-
- <groupId>org.apache.aries.rsa.discovery</groupId>
- <artifactId>zookeeper-server-config</artifactId>
- <packaging>bundle</packaging>
- <name>Aries Remote Service Admin Discovery Zookeeper Config</name>
-
- <properties>
- <topDirectoryLocation>../..</topDirectoryLocation>
- </properties>
-
-</project>
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper-server-config/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/server/config/Activator.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper-server-config/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/server/config/Activator.java b/discovery/zookeeper-server-config/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/server/config/Activator.java
deleted file mode 100644
index e92fe0b..0000000
--- a/discovery/zookeeper-server-config/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/server/config/Activator.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.dosgi.discovery.zookeeper.server.config;
-
-import java.io.IOException;
-import java.net.ServerSocket;
-import java.util.Dictionary;
-import java.util.Hashtable;
-
-import org.osgi.framework.BundleActivator;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.ServiceReference;
-import org.osgi.service.cm.Configuration;
-import org.osgi.service.cm.ConfigurationAdmin;
-import org.osgi.service.cm.ManagedService;
-import org.osgi.util.tracker.ServiceTracker;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class Activator implements BundleActivator {
-
- private static final Logger LOG = LoggerFactory.getLogger(Activator.class);
- private static final String ZOOKEEPER_PORT = "org.apache.aries.rsa.discovery.zookeeper.port";
- private static final String PID = "org.apache.aries.rsa.discovery.zookeeper.server";
- private ServiceTracker<ConfigurationAdmin, ConfigurationAdmin> st;
-
- public void start(BundleContext context) throws Exception {
- synchronized (Activator.class) {
- // Only one thread gets to set the port number
- if (System.getProperty(ZOOKEEPER_PORT) == null) {
- String port = getFreePort();
- System.setProperty(ZOOKEEPER_PORT, port);
- LOG.info("Global ZooKeeper port: {}", port);
- }
- }
-
- st = new ServiceTracker<ConfigurationAdmin, ConfigurationAdmin>(context, ConfigurationAdmin.class, null) {
- @Override
- public ConfigurationAdmin addingService(ServiceReference<ConfigurationAdmin> reference) {
- ConfigurationAdmin service = super.addingService(reference);
- try {
- Configuration cfg = service.getConfiguration(PID, null);
- Dictionary<String, Object> props = new Hashtable<String, Object>();
- String zp = System.getProperty(ZOOKEEPER_PORT);
- props.put("clientPort", zp);
- cfg.update(props);
- LOG.debug("Set ZooKeeper client port to {}", zp);
- } catch (IOException e) {
- LOG.error("Failed to configure ZooKeeper server!", e);
- }
- return service;
- }
- };
- st.open();
-
- // The following section is done synchronously otherwise it doesn't happen in time for the CT
- ServiceReference<?>[] refs = context.getServiceReferences(ManagedService.class.getName(),
- "(service.pid=org.apache.cxf.dosgi.discovery.zookeeper)");
- if (refs == null || refs.length == 0) {
- throw new RuntimeException("This bundle must be started after the bundle with the ZooKeeper "
- + "Discovery Managed Service was started.");
- }
-
- Dictionary<String, Object> props = new Hashtable<String, Object>();
- props.put("zookeeper.host", "127.0.0.1");
- props.put("zookeeper.port", System.getProperty(ZOOKEEPER_PORT));
-
- ManagedService ms = (ManagedService) context.getService(refs[0]);
- try {
- ms.updated(props);
- } finally {
- if (ms != null) {
- context.ungetService(refs[0]);
- }
- }
- LOG.debug("Passed the zookeeper.host property to the ZooKeeper Client managed service.");
- }
-
- private String getFreePort() {
- try {
- ServerSocket ss = new ServerSocket(0);
- String port = "" + ss.getLocalPort();
- ss.close();
- return port;
- } catch (IOException e) {
- LOG.error("Failed to find a free port!", e);
- return null;
- }
- }
-
- public void stop(BundleContext context) throws Exception {
- st.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper-server/bnd.bnd
----------------------------------------------------------------------
diff --git a/discovery/zookeeper-server/bnd.bnd b/discovery/zookeeper-server/bnd.bnd
deleted file mode 100644
index cef642b..0000000
--- a/discovery/zookeeper-server/bnd.bnd
+++ /dev/null
@@ -1 +0,0 @@
-Bundle-Activator: org.apache.cxf.dosgi.discovery.zookeeper.server.Activator
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper-server/pom.xml
----------------------------------------------------------------------
diff --git a/discovery/zookeeper-server/pom.xml b/discovery/zookeeper-server/pom.xml
deleted file mode 100644
index e6bcdba..0000000
--- a/discovery/zookeeper-server/pom.xml
+++ /dev/null
@@ -1,50 +0,0 @@
-<?xml version='1.0' encoding='UTF-8' ?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.aries.rsa</groupId>
- <artifactId>parent</artifactId>
- <version>1.8-SNAPSHOT</version>
- <relativePath>../../parent/pom.xml</relativePath>
- </parent>
-
- <groupId>org.apache.aries.rsa.discovery</groupId>
- <artifactId>zookeeper-server</artifactId>
- <packaging>bundle</packaging>
- <name>Aries Remote Service Admin Discovery Zookeeper Server</name>
-
-
- <properties>
- <topDirectoryLocation>../..</topDirectoryLocation>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- <scope>provided</scope>
- </dependency>
-
- </dependencies>
-
-</project>
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper-server/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/server/Activator.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper-server/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/server/Activator.java b/discovery/zookeeper-server/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/server/Activator.java
deleted file mode 100644
index 17c5568..0000000
--- a/discovery/zookeeper-server/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/server/Activator.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.dosgi.discovery.zookeeper.server;
-
-import java.util.Dictionary;
-import java.util.Hashtable;
-
-import org.osgi.framework.BundleActivator;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.Constants;
-
-public class Activator implements BundleActivator {
-
- ZookeeperStarter zkStarter;
-
- public void start(BundleContext context) throws Exception {
- zkStarter = new ZookeeperStarter(context);
- Dictionary<String, Object> props = new Hashtable<String, Object>();
- props.put(Constants.SERVICE_PID, "org.apache.aries.rsa.discovery.zookeeper.server");
- context.registerService(org.osgi.service.cm.ManagedService.class.getName(), zkStarter, props);
- }
-
- public void stop(BundleContext context) throws Exception {
- if (zkStarter != null) {
- zkStarter.shutdown();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper-server/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/server/Utils.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper-server/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/server/Utils.java b/discovery/zookeeper-server/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/server/Utils.java
deleted file mode 100644
index fe3c663..0000000
--- a/discovery/zookeeper-server/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/server/Utils.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.dosgi.discovery.zookeeper.server;
-
-import java.util.ArrayList;
-import java.util.Dictionary;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-/**
- * General purpose utility methods.
- */
-public final class Utils {
-
- private Utils() {
- // prevent instantiation
- }
-
- /**
- * Remove entries whose values are empty from the given dictionary.
- *
- * @param dict a dictionary
- */
- public static void removeEmptyValues(Dictionary<String, ?> dict) {
- List<String> keysToRemove = new ArrayList<String>();
- Enumeration<String> keys = dict.keys();
- while (keys.hasMoreElements()) {
- String key = keys.nextElement();
- Object value = dict.get(key);
- if (value instanceof String && "".equals(value)) {
- keysToRemove.add(key);
- }
- }
- for (String key : keysToRemove) {
- dict.remove(key);
- }
- }
-
- /**
- * Puts the given key-value pair in the given dictionary if the key does not
- * already exist in it or if its existing value is null.
- *
- * @param dict a dictionary
- * @param key the key
- * @param value the default value to set
- */
- public static void setDefault(Dictionary<String, String> dict, String key, String value) {
- if (dict.get(key) == null) {
- dict.put(key, value);
- }
- }
-
- /**
- * Converts the given Dictionary to a Map.
- *
- * @param dict a dictionary
- * @param <K> the key type
- * @param <V> the value type
- * @return the converted map, or an empty map if the given dictionary is null
- */
- public static <K, V> Map<K, V> toMap(Dictionary<K, V> dict) {
- Map<K, V> map = new HashMap<K, V>();
- if (dict != null) {
- Enumeration<K> keys = dict.keys();
- while (keys.hasMoreElements()) {
- K key = keys.nextElement();
- map.put(key, dict.get(key));
- }
- }
- return map;
- }
-
- /**
- * Converts a Dictionary into a Properties instance.
- *
- * @param dict a dictionary
- * @param <K> the key type
- * @param <V> the value type
- * @return the properties
- */
- public static <K, V> Properties toProperties(Dictionary<K, V> dict) {
- Properties props = new Properties();
- for (Enumeration<K> e = dict.keys(); e.hasMoreElements();) {
- K key = e.nextElement();
- props.put(key, dict.get(key));
- }
- return props;
- }
-}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper-server/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/server/ZookeeperStarter.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper-server/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/server/ZookeeperStarter.java b/discovery/zookeeper-server/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/server/ZookeeperStarter.java
deleted file mode 100644
index bd5618f..0000000
--- a/discovery/zookeeper-server/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/server/ZookeeperStarter.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.dosgi.discovery.zookeeper.server;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Dictionary;
-import java.util.Map;
-
-import org.apache.zookeeper.server.ServerConfig;
-import org.apache.zookeeper.server.ZooKeeperServerMain;
-import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
-import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
-import org.apache.zookeeper.server.quorum.QuorumPeerMain;
-import org.osgi.framework.BundleContext;
-import org.osgi.service.cm.ConfigurationException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ZookeeperStarter implements org.osgi.service.cm.ManagedService {
-
- private static final Logger LOG = LoggerFactory.getLogger(ZookeeperStarter.class); //NOPMD - using log4j here
-
- protected ZookeeperServer main;
- private final BundleContext bundleContext;
- private Thread zkMainThread;
- private Map<String, ?> curConfiguration;
-
- public ZookeeperStarter(BundleContext ctx) {
- bundleContext = ctx;
- }
-
- synchronized void shutdown() {
- if (main != null) {
- LOG.info("Shutting down ZooKeeper server");
- try {
- main.shutdown();
- if (zkMainThread != null) {
- zkMainThread.join();
- }
- } catch (Throwable e) {
- LOG.error(e.getMessage(), e);
- }
- main = null;
- zkMainThread = null;
- }
- }
-
- private void setDefaults(Dictionary<String, String> dict) throws IOException {
- Utils.removeEmptyValues(dict); // to avoid NumberFormatExceptions
- Utils.setDefault(dict, "tickTime", "2000");
- Utils.setDefault(dict, "initLimit", "10");
- Utils.setDefault(dict, "syncLimit", "5");
- Utils.setDefault(dict, "clientPort", "2181");
- Utils.setDefault(dict, "dataDir", new File(bundleContext.getDataFile(""), "zkdata").getCanonicalPath());
- }
-
- @SuppressWarnings("unchecked")
- public synchronized void updated(Dictionary<String, ?> dict) throws ConfigurationException {
- LOG.debug("Received configuration update for Zookeeper Server: " + dict);
- try {
- if (dict != null) {
- setDefaults((Dictionary<String, String>)dict);
- }
- Map<String, ?> configMap = Utils.toMap(dict);
- if (!configMap.equals(curConfiguration)) { // only if something actually changed
- shutdown();
- curConfiguration = configMap;
- // config is null if it doesn't exist, is being deleted or has not yet been loaded
- // in which case we just stop running
- if (dict != null) {
- startFromConfig(parseConfig(dict));
- LOG.info("Applied configuration update: " + dict);
- }
- }
- } catch (Exception th) {
- LOG.error("Problem applying configuration update: " + dict, th);
- }
- }
-
- private QuorumPeerConfig parseConfig(Dictionary<String, ?> dict) throws IOException, ConfigException {
- QuorumPeerConfig config = new QuorumPeerConfig();
- config.parseProperties(Utils.toProperties(dict));
- return config;
- }
-
- protected void startFromConfig(final QuorumPeerConfig config) {
- int numServers = config.getServers().size();
- main = numServers > 1 ? new MyQuorumPeerMain(config) : new MyZooKeeperServerMain(config);
- zkMainThread = new Thread(new Runnable() {
- public void run() {
- try {
- main.startup();
- } catch (Throwable e) {
- LOG.error("Problem running ZooKeeper server.", e);
- }
- }
- });
- zkMainThread.start();
- }
-
- interface ZookeeperServer {
- void startup() throws IOException;
- void shutdown();
- }
-
- static class MyQuorumPeerMain extends QuorumPeerMain implements ZookeeperServer {
-
- private QuorumPeerConfig config;
-
- MyQuorumPeerMain(QuorumPeerConfig config) {
- this.config = config;
- }
-
- public void startup() throws IOException {
- runFromConfig(config);
- }
-
- public void shutdown() {
- if (null != quorumPeer) {
- quorumPeer.shutdown();
- }
- }
- }
-
- static class MyZooKeeperServerMain extends ZooKeeperServerMain implements ZookeeperServer {
-
- private QuorumPeerConfig config;
-
- MyZooKeeperServerMain(QuorumPeerConfig config) {
- this.config = config;
- }
-
- public void startup() throws IOException {
- ServerConfig serverConfig = new ServerConfig();
- serverConfig.readFrom(config);
- runFromConfig(serverConfig);
- }
-
- public void shutdown() {
- try {
- super.shutdown();
- } catch (Exception e) {
- LOG.error("Error shutting down ZooKeeper", e);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper-server/src/main/resources/OSGI-INF/metatype/zookeeper.xml
----------------------------------------------------------------------
diff --git a/discovery/zookeeper-server/src/main/resources/OSGI-INF/metatype/zookeeper.xml b/discovery/zookeeper-server/src/main/resources/OSGI-INF/metatype/zookeeper.xml
deleted file mode 100644
index efd9403..0000000
--- a/discovery/zookeeper-server/src/main/resources/OSGI-INF/metatype/zookeeper.xml
+++ /dev/null
@@ -1,34 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
--->
-<MetaData xmlns="http://www.osgi.org/xmlns/metadata/v1.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="
- http://www.osgi.org/xmlns/metadata/v1.0.0 http://www.osgi.org/xmlns/metatype/v1.1.0/metatype.xsd
- ">
- <OCD description="" name="Zookeeper server config" id="org.apache.cxf.dosgi.discovery.zookeeper.server">
- <AD id="clientPort" required="false" type="String" default="2181" description=""/>
- <AD id="tickTime" required="false" type="String" default="2000" description=""/>
- <AD id="initLimit" required="false" type="String" default="10" description=""/>
- <AD id="syncLimit" required="false" type="String" default="5" description=""/>
- </OCD>
- <Designate pid="org.apache.cxf.dosgi.discovery.zookeeper.server">
- <Object ocdref="org.apache.cxf.dosgi.discovery.zookeeper.server"/>
- </Designate>
-</MetaData>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper-server/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/server/ZookeeperStarterTest.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper-server/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/server/ZookeeperStarterTest.java b/discovery/zookeeper-server/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/server/ZookeeperStarterTest.java
deleted file mode 100644
index 17ca117..0000000
--- a/discovery/zookeeper-server/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/server/ZookeeperStarterTest.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.dosgi.discovery.zookeeper.server;
-
-import java.io.File;
-import java.util.Dictionary;
-import java.util.Hashtable;
-
-import junit.framework.TestCase;
-
-import org.apache.cxf.dosgi.discovery.zookeeper.server.ZookeeperStarter.MyZooKeeperServerMain;
-import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
-import org.easymock.classextension.EasyMock;
-import org.easymock.classextension.IMocksControl;
-import org.osgi.framework.BundleContext;
-
-import static org.easymock.EasyMock.expect;
-import static org.easymock.classextension.EasyMock.replay;
-import static org.easymock.classextension.EasyMock.verify;
-
-public class ZookeeperStarterTest extends TestCase {
-
- public void testUpdateConfig() throws Exception {
- final File tempDir = new File("target");
- IMocksControl control = EasyMock.createControl();
- BundleContext bc = control.createMock(BundleContext.class);
- expect(bc.getDataFile("")).andReturn(tempDir);
- final MyZooKeeperServerMain mockServer = control.createMock(MyZooKeeperServerMain.class);
- control.replay();
-
- ZookeeperStarter starter = new ZookeeperStarter(bc) {
- @Override
- protected void startFromConfig(QuorumPeerConfig config) {
- assertEquals(1234, config.getClientPortAddress().getPort());
- assertTrue(config.getDataDir().contains(tempDir + File.separator + "zkdata"));
- assertEquals(2000, config.getTickTime());
- assertEquals(10, config.getInitLimit());
- assertEquals(5, config.getSyncLimit());
- this.main = mockServer;
- }
- };
- Dictionary<String, Object> props = new Hashtable<String, Object>();
- props.put("clientPort", "1234");
- starter.updated(props);
- assertNotNull(starter.main);
-
- control.verify();
- }
-
- public void testRemoveConfiguration() throws Exception {
- BundleContext bc = EasyMock.createMock(BundleContext.class);
- MyZooKeeperServerMain zkServer = EasyMock.createMock(MyZooKeeperServerMain.class);
- zkServer.shutdown();
- EasyMock.expectLastCall();
-
- replay(zkServer);
-
- ZookeeperStarter starter = new ZookeeperStarter(bc);
- starter.main = zkServer;
- starter.updated(null);
-
- verify(zkServer);
- assertNull("main should be null", starter.main);
- }
-}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper/bnd.bnd
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/bnd.bnd b/discovery/zookeeper/bnd.bnd
index 5c1f23d..3e572c6 100644
--- a/discovery/zookeeper/bnd.bnd
+++ b/discovery/zookeeper/bnd.bnd
@@ -1 +1,2 @@
-Bundle-Activator: org.apache.cxf.dosgi.discovery.zookeeper.Activator
+Bundle-Activator: org.apache.aries.rsa.discovery.zookeeper.Activator
+
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/Activator.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/Activator.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/Activator.java
new file mode 100644
index 0000000..3b17f35
--- /dev/null
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/Activator.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.discovery.zookeeper;
+
+import java.util.Dictionary;
+import java.util.Hashtable;
+
+import org.apache.aries.rsa.discovery.zookeeper.server.ZookeeperStarter;
+import org.osgi.framework.BundleActivator;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.Constants;
+import org.osgi.service.cm.ManagedService;
+
+public class Activator implements BundleActivator {
+
+ private static final String PID_DISCOVERY_ZOOKEEPER = "org.apache.aries.rsa.discovery.zookeeper";
+ private static final String PID_ZOOKEEPER_SERVER = "org.apache.aries.rsa.discovery.zookeeper.server";
+ private ZooKeeperDiscovery zkd;
+ private ZookeeperStarter zkStarter;
+
+ public synchronized void start(BundleContext bc) throws Exception {
+ zkd = new ZooKeeperDiscovery(bc);
+ bc.registerService(ManagedService.class, zkd, configProperties(PID_DISCOVERY_ZOOKEEPER));
+
+ zkStarter = new ZookeeperStarter(bc);
+ bc.registerService(ManagedService.class, zkStarter, configProperties(PID_ZOOKEEPER_SERVER));
+ }
+
+ public synchronized void stop(BundleContext bc) throws Exception {
+ zkd.stop(true);
+
+ if (zkStarter != null) {
+ zkStarter.shutdown();
+ }
+ }
+
+ private Dictionary<String, String> configProperties(String pid) {
+ Dictionary<String, String> props = new Hashtable<String, String>();
+ props.put(Constants.SERVICE_PID, pid);
+ return props;
+ }
+}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java
new file mode 100644
index 0000000..085c074
--- /dev/null
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.discovery.zookeeper;
+
+import java.io.IOException;
+import java.util.Dictionary;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.aries.rsa.discovery.zookeeper.publish.PublishingEndpointListenerFactory;
+import org.apache.aries.rsa.discovery.zookeeper.subscribe.EndpointListenerTracker;
+import org.apache.aries.rsa.discovery.zookeeper.subscribe.InterfaceMonitorManager;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.osgi.framework.BundleContext;
+import org.osgi.service.cm.ConfigurationException;
+import org.osgi.service.cm.ManagedService;
+import org.osgi.service.remoteserviceadmin.EndpointListener;
+import org.osgi.util.tracker.ServiceTracker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ZooKeeperDiscovery implements Watcher, ManagedService {
+
+ public static final String DISCOVERY_ZOOKEEPER_ID = "org.apache.cxf.dosgi.discovery.zookeeper";
+
+ private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperDiscovery.class);
+
+ private final BundleContext bctx;
+
+ private PublishingEndpointListenerFactory endpointListenerFactory;
+ private ServiceTracker<EndpointListener, EndpointListener> endpointListenerTracker;
+ private InterfaceMonitorManager imManager;
+ private ZooKeeper zkClient;
+ private boolean closed;
+ private boolean started;
+
+ private Dictionary<String, ?> curConfiguration;
+
+ public ZooKeeperDiscovery(BundleContext bctx) {
+ this.bctx = bctx;
+ }
+
+ public synchronized void updated(Dictionary<String, ?> configuration) throws ConfigurationException {
+ LOG.debug("Received configuration update for Zookeeper Discovery: {}", configuration);
+ // make changes only if config actually changed, to prevent unnecessary ZooKeeper reconnections
+ if (!ZooKeeperDiscovery.toMap(configuration).equals(ZooKeeperDiscovery.toMap(curConfiguration))) {
+ stop(false);
+ curConfiguration = configuration;
+ // config is null if it doesn't exist, is being deleted or has not yet been loaded
+ // in which case we just stop running
+ if (!closed && configuration != null) {
+ try {
+ createZookeeper(configuration);
+ } catch (IOException e) {
+ throw new ConfigurationException(null, "Error starting zookeeper client", e);
+ }
+ }
+ }
+ }
+
+ private synchronized void start() {
+ if (closed) {
+ return;
+ }
+ if (started) {
+ // we must be re-entrant, i.e. can be called when already started
+ LOG.debug("ZookeeperDiscovery already started");
+ return;
+ }
+ LOG.debug("starting ZookeeperDiscovery");
+ endpointListenerFactory = new PublishingEndpointListenerFactory(zkClient, bctx);
+ endpointListenerFactory.start();
+ imManager = new InterfaceMonitorManager(bctx, zkClient);
+ endpointListenerTracker = new EndpointListenerTracker(bctx, imManager);
+ endpointListenerTracker.open();
+ started = true;
+ }
+
+ public synchronized void stop(boolean close) {
+ if (started) {
+ LOG.debug("stopping ZookeeperDiscovery");
+ }
+ started = false;
+ closed |= close;
+ if (endpointListenerFactory != null) {
+ endpointListenerFactory.stop();
+ }
+ if (endpointListenerTracker != null) {
+ endpointListenerTracker.close();
+ }
+ if (imManager != null) {
+ imManager.close();
+ }
+ if (zkClient != null) {
+ try {
+ zkClient.close();
+ } catch (InterruptedException e) {
+ LOG.error("Error closing ZooKeeper", e);
+ }
+ }
+ }
+
+ protected ZooKeeper createZooKeeper(String host, String port, int timeout) throws IOException {
+ LOG.info("ZooKeeper discovery connecting to {}:{} with timeout {}",
+ new Object[]{host, port, timeout});
+ return new ZooKeeper(host + ":" + port, timeout, this);
+ }
+
+ /* Callback for ZooKeeper */
+ public void process(WatchedEvent event) {
+ LOG.debug("got ZooKeeper event " + event);
+ switch (event.getState()) {
+ case SyncConnected:
+ LOG.info("Connection to ZooKeeper established");
+ // this event can be triggered more than once in a row (e.g. after Disconnected event),
+ // so we must be re-entrant here
+ start();
+ break;
+
+ case Expired:
+ LOG.info("Connection to ZooKeeper expired. Trying to create a new connection");
+ stop(false);
+ try {
+ createZookeeper(curConfiguration);
+ } catch (IOException e) {
+ LOG.error("Error starting zookeeper client", e);
+ }
+ break;
+
+ default:
+ // ignore other events
+ break;
+ }
+ }
+
+ private void createZookeeper(Dictionary<String, ?> config) throws IOException {
+ String host = (String)getWithDefault(config, "zookeeper.host", "localhost");
+ String port = (String)getWithDefault(config, "zookeeper.port", "2181");
+ int timeout = Integer.parseInt((String)getWithDefault(config, "zookeeper.timeout", "3000"));
+ zkClient = createZooKeeper(host, port, timeout);
+ }
+
+ public Object getWithDefault(Dictionary<String, ?> config, String key, Object defaultValue) {
+ Object value = config.get(key);
+ return value != null ? value : defaultValue;
+ }
+
+ /**
+ * Converts the given Dictionary to a Map.
+ *
+ * @param dict a dictionary
+ * @param <K> the key type
+ * @param <V> the value type
+ * @return the converted map, or an empty map if the given dictionary is null
+ */
+ public static <K, V> Map<K, V> toMap(Dictionary<K, V> dict) {
+ Map<K, V> map = new HashMap<K, V>();
+ if (dict != null) {
+ Enumeration<K> keys = dict.keys();
+ while (keys.hasMoreElements()) {
+ K key = keys.nextElement();
+ map.put(key, dict.get(key));
+ }
+ }
+ return map;
+ }
+}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/DiscoveryPlugin.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/DiscoveryPlugin.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/DiscoveryPlugin.java
new file mode 100644
index 0000000..033bee2
--- /dev/null
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/DiscoveryPlugin.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.discovery.zookeeper.publish;
+
+import java.util.Map;
+
+/**
+ * This interface allows transformation of service registration information before it is pushed into the ZooKeeper
+ * discovery system.
+ * It can be useful for situations where a host name or port number needs to be changed in cases where the host running
+ * the service is known differently from the outside to what the local Java process thinks it is.
+ * Extra service properties can also be added to the registration which can be useful to refine the remote service
+ * lookup process. <p/>
+ *
+ * DiscoveryPlugins use the OSGi WhiteBoard pattern. To add one to the system, register an instance under this interface
+ * with the OSGi Service Registry. All registered DiscoveryPlugin instances are visited and given a chance to
+ * process the information before it is pushed into ZooKeeper. <p/>
+ *
+ * Note that the changes made using this plugin do not modify the local service registration.
+ *
+ */
+public interface DiscoveryPlugin {
+
+ /**
+ * Process service registration information. Plugins can change this information before it is published into the
+ * ZooKeeper discovery system.
+ *
+ * @param mutableProperties A map of service registration properties. The map is mutable and any changes to the map
+ * will be reflected in the ZooKeeper registration.
+ * @param endpointKey The key under which the service is registered in ZooKeeper. This key typically has the
+ * following format: hostname#port##context. While the actual value of this key is not actually used by the
+ * system (people can use it as a hint to understand where the service is located), the value <i>must</i> be
+ * unique for all services of a given type.
+ * @return The <tt>endpointKey</tt> value to be used. If there is no need to change this simply return the value
+ * of the <tt>endpointKey</tt> parameter.
+ */
+ String process(Map<String, Object> mutableProperties, String endpointKey);
+}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListener.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListener.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListener.java
new file mode 100644
index 0000000..75efbd3
--- /dev/null
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListener.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.discovery.zookeeper.publish;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.aries.rsa.discovery.endpoint.EndpointDescriptionParser;
+import org.apache.aries.rsa.discovery.endpoint.PropertiesMapper;
+import org.apache.aries.rsa.discovery.zookeeper.util.Utils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.osgi.framework.BundleContext;
+import org.osgi.service.remoteserviceadmin.EndpointDescription;
+import org.osgi.service.remoteserviceadmin.EndpointListener;
+import org.osgi.util.tracker.ServiceTracker;
+import org.osgi.xmlns.rsa.v1_0.EndpointDescriptionType;
+import org.osgi.xmlns.rsa.v1_0.PropertyType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Listens for local Endpoints and publishes them to ZooKeeper.
+ */
+public class PublishingEndpointListener implements EndpointListener {
+
+ private static final Logger LOG = LoggerFactory.getLogger(PublishingEndpointListener.class);
+
+ private final ZooKeeper zk;
+ private final ServiceTracker<DiscoveryPlugin, DiscoveryPlugin> discoveryPluginTracker;
+ private final List<EndpointDescription> endpoints = new ArrayList<EndpointDescription>();
+ private boolean closed;
+
+ private final EndpointDescriptionParser endpointDescriptionParser;
+
+ public PublishingEndpointListener(ZooKeeper zk, BundleContext bctx) {
+ this.zk = zk;
+ discoveryPluginTracker = new ServiceTracker<DiscoveryPlugin, DiscoveryPlugin>(bctx,
+ DiscoveryPlugin.class, null);
+ discoveryPluginTracker.open();
+ endpointDescriptionParser = new EndpointDescriptionParser();
+ }
+
+ public void endpointAdded(EndpointDescription endpoint, String matchedFilter) {
+ LOG.info("Local EndpointDescription added: {}", endpoint);
+
+ synchronized (endpoints) {
+ if (closed) {
+ return;
+ }
+ if (endpoints.contains(endpoint)) {
+ // TODO -> Should the published endpoint be updated here?
+ return;
+ }
+
+ try {
+ addEndpoint(endpoint);
+ endpoints.add(endpoint);
+ } catch (Exception ex) {
+ LOG.error("Exception while processing the addition of an endpoint.", ex);
+ }
+ }
+ }
+
+ private void addEndpoint(EndpointDescription endpoint) throws URISyntaxException, KeeperException,
+ InterruptedException, IOException {
+ Collection<String> interfaces = endpoint.getInterfaces();
+ String endpointKey = getKey(endpoint);
+ Map<String, Object> props = new HashMap<String, Object>(endpoint.getProperties());
+
+ // process plugins
+ Object[] plugins = discoveryPluginTracker.getServices();
+ if (plugins != null) {
+ for (Object plugin : plugins) {
+ if (plugin instanceof DiscoveryPlugin) {
+ endpointKey = ((DiscoveryPlugin)plugin).process(props, endpointKey);
+ }
+ }
+ }
+
+ for (String name : interfaces) {
+ String path = Utils.getZooKeeperPath(name);
+ String fullPath = path + '/' + endpointKey;
+ LOG.info("Creating ZooKeeper node for service with path {}", fullPath);
+ createPath(path, zk);
+ List<PropertyType> propsOut = new PropertiesMapper().fromProps(props);
+ EndpointDescriptionType epd = new EndpointDescriptionType();
+ epd.getProperty().addAll(propsOut);
+ byte[] epData = endpointDescriptionParser.getData(epd);
+ createEphemeralNode(fullPath, epData);
+ }
+ }
+
+ private void createEphemeralNode(String fullPath, byte[] data) throws KeeperException, InterruptedException {
+ try {
+ zk.create(fullPath, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+ } catch (NodeExistsException nee) {
+ // this sometimes happens after a ZooKeeper node dies and the ephemeral node
+ // that belonged to the old session was not yet deleted. We need to make our
+ // session the owner of the node so it won't get deleted automatically -
+ // we do this by deleting and recreating it ourselves.
+ LOG.info("node for endpoint already exists, recreating: {}", fullPath);
+ try {
+ zk.delete(fullPath, -1);
+ } catch (NoNodeException nne) {
+ // it's a race condition, but as long as it got deleted - it's ok
+ }
+ zk.create(fullPath, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+ }
+ }
+
+ public void endpointRemoved(EndpointDescription endpoint, String matchedFilter) {
+ LOG.info("Local EndpointDescription removed: {}", endpoint);
+
+ synchronized (endpoints) {
+ if (closed) {
+ return;
+ }
+ if (!endpoints.contains(endpoint)) {
+ return;
+ }
+
+ try {
+ removeEndpoint(endpoint);
+ endpoints.remove(endpoint);
+ } catch (Exception ex) {
+ LOG.error("Exception while processing the removal of an endpoint", ex);
+ }
+ }
+ }
+
+ private void removeEndpoint(EndpointDescription endpoint) throws UnknownHostException, URISyntaxException {
+ Collection<String> interfaces = endpoint.getInterfaces();
+ String endpointKey = getKey(endpoint);
+ for (String name : interfaces) {
+ String path = Utils.getZooKeeperPath(name);
+ String fullPath = path + '/' + endpointKey;
+ LOG.debug("Removing ZooKeeper node: {}", fullPath);
+ try {
+ zk.delete(fullPath, -1);
+ } catch (Exception ex) {
+ LOG.debug("Error while removing endpoint: {}", ex); // e.g. session expired
+ }
+ }
+ }
+
+ private static void createPath(String path, ZooKeeper zk) throws KeeperException, InterruptedException {
+ StringBuilder current = new StringBuilder();
+ List<String> parts = Utils.removeEmpty(Arrays.asList(path.split("/")));
+ for (String part : parts) {
+ current.append('/');
+ current.append(part);
+ try {
+ zk.create(current.toString(), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ } catch (NodeExistsException nee) {
+ // it's not the first node with this path to ever exist - that's normal
+ }
+ }
+ }
+
+ private static String getKey(EndpointDescription endpoint) throws URISyntaxException {
+ URI uri = new URI(endpoint.getId());
+ return new StringBuilder().append(uri.getHost()).append("#").append(uri.getPort())
+ .append("#").append(uri.getPath().replace('/', '#')).toString();
+ }
+
+ public void close() {
+ LOG.debug("closing - removing all endpoints");
+ synchronized (endpoints) {
+ closed = true;
+ for (EndpointDescription endpoint : endpoints) {
+ try {
+ removeEndpoint(endpoint);
+ } catch (Exception ex) {
+ LOG.error("Exception while removing endpoint during close", ex);
+ }
+ }
+ endpoints.clear();
+ }
+ discoveryPluginTracker.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerFactory.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerFactory.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerFactory.java
new file mode 100644
index 0000000..1eabec3
--- /dev/null
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerFactory.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.discovery.zookeeper.publish;
+
+import java.util.ArrayList;
+import java.util.Dictionary;
+import java.util.Hashtable;
+import java.util.List;
+
+import org.apache.aries.rsa.discovery.zookeeper.ZooKeeperDiscovery;
+import org.apache.zookeeper.ZooKeeper;
+import org.osgi.framework.Bundle;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.Constants;
+import org.osgi.framework.ServiceFactory;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.remoteserviceadmin.EndpointListener;
+import org.osgi.service.remoteserviceadmin.RemoteConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Creates local EndpointListeners that publish to ZooKeeper.
+ */
+public class PublishingEndpointListenerFactory implements ServiceFactory<PublishingEndpointListener> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(PublishingEndpointListenerFactory.class);
+
+ private final BundleContext bctx;
+ private final ZooKeeper zk;
+ private final List<PublishingEndpointListener> listeners = new ArrayList<PublishingEndpointListener>();
+ private ServiceRegistration<?> serviceRegistration;
+
+ public PublishingEndpointListenerFactory(ZooKeeper zk, BundleContext bctx) {
+ this.bctx = bctx;
+ this.zk = zk;
+ }
+
+ public PublishingEndpointListener getService(Bundle b, ServiceRegistration<PublishingEndpointListener> sr) {
+ LOG.debug("new EndpointListener from factory");
+ synchronized (listeners) {
+ PublishingEndpointListener pel = new PublishingEndpointListener(zk, bctx);
+ listeners.add(pel);
+ return pel;
+ }
+ }
+
+ public void ungetService(Bundle b, ServiceRegistration<PublishingEndpointListener> sr,
+ PublishingEndpointListener pel) {
+ LOG.debug("remove EndpointListener");
+ synchronized (listeners) {
+ if (listeners.remove(pel)) {
+ pel.close();
+ }
+ }
+ }
+
+ public synchronized void start() {
+ Dictionary<String, String> props = new Hashtable<String, String>();
+ String uuid = bctx.getProperty(Constants.FRAMEWORK_UUID);
+ props.put(EndpointListener.ENDPOINT_LISTENER_SCOPE,
+ String.format("(&(%s=*)(%s=%s))", Constants.OBJECTCLASS,
+ RemoteConstants.ENDPOINT_FRAMEWORK_UUID, uuid));
+ props.put(ZooKeeperDiscovery.DISCOVERY_ZOOKEEPER_ID, "true");
+ serviceRegistration = bctx.registerService(EndpointListener.class.getName(), this, props);
+ }
+
+ public synchronized void stop() {
+ if (serviceRegistration != null) {
+ serviceRegistration.unregister();
+ serviceRegistration = null;
+ }
+ synchronized (listeners) {
+ for (PublishingEndpointListener pel : listeners) {
+ pel.close();
+ }
+ listeners.clear();
+ }
+ }
+
+ /**
+ * Only for the test case!
+ */
+ protected List<PublishingEndpointListener> getListeners() {
+ synchronized (listeners) {
+ return listeners;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/server/Utils.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/server/Utils.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/server/Utils.java
new file mode 100644
index 0000000..67ea3a4
--- /dev/null
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/server/Utils.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.discovery.zookeeper.server;
+
+import java.util.ArrayList;
+import java.util.Dictionary;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * General purpose utility methods.
+ */
+public final class Utils {
+
+ private Utils() {
+ // prevent instantiation
+ }
+
+ /**
+ * Remove entries whose values are empty from the given dictionary.
+ *
+ * @param dict a dictionary
+ */
+ public static void removeEmptyValues(Dictionary<String, ?> dict) {
+ List<String> keysToRemove = new ArrayList<String>();
+ Enumeration<String> keys = dict.keys();
+ while (keys.hasMoreElements()) {
+ String key = keys.nextElement();
+ Object value = dict.get(key);
+ if (value instanceof String && "".equals(value)) {
+ keysToRemove.add(key);
+ }
+ }
+ for (String key : keysToRemove) {
+ dict.remove(key);
+ }
+ }
+
+ /**
+ * Puts the given key-value pair in the given dictionary if the key does not
+ * already exist in it or if its existing value is null.
+ *
+ * @param dict a dictionary
+ * @param key the key
+ * @param value the default value to set
+ */
+ public static void setDefault(Dictionary<String, String> dict, String key, String value) {
+ if (dict.get(key) == null) {
+ dict.put(key, value);
+ }
+ }
+
+ /**
+ * Converts the given Dictionary to a Map.
+ *
+ * @param dict a dictionary
+ * @param <K> the key type
+ * @param <V> the value type
+ * @return the converted map, or an empty map if the given dictionary is null
+ */
+ public static <K, V> Map<K, V> toMap(Dictionary<K, V> dict) {
+ Map<K, V> map = new HashMap<K, V>();
+ if (dict != null) {
+ Enumeration<K> keys = dict.keys();
+ while (keys.hasMoreElements()) {
+ K key = keys.nextElement();
+ map.put(key, dict.get(key));
+ }
+ }
+ return map;
+ }
+
+ /**
+ * Converts a Dictionary into a Properties instance.
+ *
+ * @param dict a dictionary
+ * @param <K> the key type
+ * @param <V> the value type
+ * @return the properties
+ */
+ public static <K, V> Properties toProperties(Dictionary<K, V> dict) {
+ Properties props = new Properties();
+ for (Enumeration<K> e = dict.keys(); e.hasMoreElements();) {
+ K key = e.nextElement();
+ props.put(key, dict.get(key));
+ }
+ return props;
+ }
+}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/server/ZookeeperStarter.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/server/ZookeeperStarter.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/server/ZookeeperStarter.java
new file mode 100644
index 0000000..520aa99
--- /dev/null
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/server/ZookeeperStarter.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.discovery.zookeeper.server;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Dictionary;
+import java.util.Map;
+
+import org.apache.zookeeper.server.ServerConfig;
+import org.apache.zookeeper.server.ZooKeeperServerMain;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
+import org.apache.zookeeper.server.quorum.QuorumPeerMain;
+import org.osgi.framework.BundleContext;
+import org.osgi.service.cm.ConfigurationException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ZookeeperStarter implements org.osgi.service.cm.ManagedService {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ZookeeperStarter.class);
+
+ protected ZookeeperServer main;
+ private final BundleContext bundleContext;
+ private Thread zkMainThread;
+ private Map<String, ?> curConfiguration;
+
+ public ZookeeperStarter(BundleContext ctx) {
+ bundleContext = ctx;
+ }
+
+ public synchronized void shutdown() {
+ if (main != null) {
+ LOG.info("Shutting down ZooKeeper server");
+ try {
+ main.shutdown();
+ if (zkMainThread != null) {
+ zkMainThread.join();
+ }
+ } catch (Throwable e) {
+ LOG.error(e.getMessage(), e);
+ }
+ main = null;
+ zkMainThread = null;
+ }
+ }
+
+ private void setDefaults(Dictionary<String, String> dict) throws IOException {
+ Utils.removeEmptyValues(dict); // to avoid NumberFormatExceptions
+ Utils.setDefault(dict, "tickTime", "2000");
+ Utils.setDefault(dict, "initLimit", "10");
+ Utils.setDefault(dict, "syncLimit", "5");
+ Utils.setDefault(dict, "clientPort", "2181");
+ Utils.setDefault(dict, "dataDir", new File(bundleContext.getDataFile(""), "zkdata").getCanonicalPath());
+ }
+
+ @SuppressWarnings("unchecked")
+ public synchronized void updated(Dictionary<String, ?> dict) throws ConfigurationException {
+ LOG.debug("Received configuration update for Zookeeper Server: " + dict);
+ try {
+ if (dict != null) {
+ setDefaults((Dictionary<String, String>)dict);
+ }
+ Map<String, ?> configMap = Utils.toMap(dict);
+ if (!configMap.equals(curConfiguration)) { // only if something actually changed
+ shutdown();
+ curConfiguration = configMap;
+ // config is null if it doesn't exist, is being deleted or has not yet been loaded
+ // in which case we just stop running
+ if (dict != null) {
+ startFromConfig(parseConfig(dict));
+ LOG.info("Applied configuration update: " + dict);
+ }
+ }
+ } catch (Exception th) {
+ LOG.error("Problem applying configuration update: " + dict, th);
+ }
+ }
+
+ private QuorumPeerConfig parseConfig(Dictionary<String, ?> dict) throws IOException, ConfigException {
+ QuorumPeerConfig config = new QuorumPeerConfig();
+ config.parseProperties(Utils.toProperties(dict));
+ return config;
+ }
+
+ protected void startFromConfig(final QuorumPeerConfig config) {
+ int numServers = config.getServers().size();
+ main = numServers > 1 ? new MyQuorumPeerMain(config) : new MyZooKeeperServerMain(config);
+ zkMainThread = new Thread(new Runnable() {
+ public void run() {
+ try {
+ main.startup();
+ } catch (Throwable e) {
+ LOG.error("Problem running ZooKeeper server.", e);
+ }
+ }
+ });
+ zkMainThread.start();
+ }
+
+ interface ZookeeperServer {
+ void startup() throws IOException;
+ void shutdown();
+ }
+
+ static class MyQuorumPeerMain extends QuorumPeerMain implements ZookeeperServer {
+
+ private QuorumPeerConfig config;
+
+ MyQuorumPeerMain(QuorumPeerConfig config) {
+ this.config = config;
+ }
+
+ public void startup() throws IOException {
+ runFromConfig(config);
+ }
+
+ public void shutdown() {
+ if (null != quorumPeer) {
+ quorumPeer.shutdown();
+ }
+ }
+ }
+
+ static class MyZooKeeperServerMain extends ZooKeeperServerMain implements ZookeeperServer {
+
+ private QuorumPeerConfig config;
+
+ MyZooKeeperServerMain(QuorumPeerConfig config) {
+ this.config = config;
+ }
+
+ public void startup() throws IOException {
+ ServerConfig serverConfig = new ServerConfig();
+ serverConfig.readFrom(config);
+ runFromConfig(serverConfig);
+ }
+
+ public void shutdown() {
+ try {
+ super.shutdown();
+ } catch (Exception e) {
+ LOG.error("Error shutting down ZooKeeper", e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/EndpointListenerTracker.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/EndpointListenerTracker.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/EndpointListenerTracker.java
new file mode 100644
index 0000000..5909ee0
--- /dev/null
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/EndpointListenerTracker.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.discovery.zookeeper.subscribe;
+
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceReference;
+import org.osgi.service.remoteserviceadmin.EndpointListener;
+import org.osgi.util.tracker.ServiceTracker;
+
+/**
+ * Tracks interest in EndpointListeners. Delegates to InterfaceMonitorManager to manage
+ * interest in the scopes of each EndpointListener.
+ */
+public class EndpointListenerTracker extends ServiceTracker<EndpointListener, EndpointListener> {
+ private final InterfaceMonitorManager imManager;
+
+ public EndpointListenerTracker(BundleContext bctx, InterfaceMonitorManager imManager) {
+ super(bctx, EndpointListener.class, null);
+ this.imManager = imManager;
+ }
+
+ @Override
+ public EndpointListener addingService(ServiceReference<EndpointListener> endpointListener) {
+ imManager.addInterest(endpointListener);
+ return null;
+ }
+
+ @Override
+ public void modifiedService(ServiceReference<EndpointListener> endpointListener, EndpointListener service) {
+ // called when an EndpointListener updates its service properties,
+ // e.g. when its interest scope is expanded/reduced
+ imManager.addInterest(endpointListener);
+ }
+
+ @Override
+ public void removedService(ServiceReference<EndpointListener> endpointListener, EndpointListener service) {
+ imManager.removeInterest(endpointListener);
+ }
+
+}