You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aries.apache.org by cs...@apache.org on 2016/03/14 17:49:39 UTC

[5/5] aries-rsa git commit: Adding roundtrip test

Adding roundtrip test


Project: http://git-wip-us.apache.org/repos/asf/aries-rsa/repo
Commit: http://git-wip-us.apache.org/repos/asf/aries-rsa/commit/69bb901e
Tree: http://git-wip-us.apache.org/repos/asf/aries-rsa/tree/69bb901e
Diff: http://git-wip-us.apache.org/repos/asf/aries-rsa/diff/69bb901e

Branch: refs/heads/master
Commit: 69bb901e935452f25ae1ed8ba86484b33deb1bd5
Parents: bd0352f
Author: Christian Schneider <ch...@die-schneider.net>
Authored: Mon Mar 14 17:49:19 2016 +0100
Committer: Christian Schneider <ch...@die-schneider.net>
Committed: Mon Mar 14 17:49:19 2016 +0100

----------------------------------------------------------------------
 discovery/pom.xml                               |   2 -
 discovery/zookeeper-server-config/bnd.bnd       |   1 -
 discovery/zookeeper-server-config/pom.xml       |  40 ---
 .../zookeeper/server/config/Activator.java      | 110 -------
 discovery/zookeeper-server/bnd.bnd              |   1 -
 discovery/zookeeper-server/pom.xml              |  50 ---
 .../discovery/zookeeper/server/Activator.java   |  44 ---
 .../dosgi/discovery/zookeeper/server/Utils.java | 108 -------
 .../zookeeper/server/ZookeeperStarter.java      | 164 ----------
 .../resources/OSGI-INF/metatype/zookeeper.xml   |  34 ---
 .../zookeeper/server/ZookeeperStarterTest.java  |  81 -----
 discovery/zookeeper/bnd.bnd                     |   3 +-
 .../rsa/discovery/zookeeper/Activator.java      |  58 ++++
 .../discovery/zookeeper/ZooKeeperDiscovery.java | 186 ++++++++++++
 .../zookeeper/publish/DiscoveryPlugin.java      |  54 ++++
 .../publish/PublishingEndpointListener.java     | 210 +++++++++++++
 .../PublishingEndpointListenerFactory.java      | 105 +++++++
 .../rsa/discovery/zookeeper/server/Utils.java   | 108 +++++++
 .../zookeeper/server/ZookeeperStarter.java      | 164 ++++++++++
 .../subscribe/EndpointListenerTracker.java      |  56 ++++
 .../zookeeper/subscribe/InterfaceMonitor.java   | 262 ++++++++++++++++
 .../subscribe/InterfaceMonitorManager.java      | 261 ++++++++++++++++
 .../rsa/discovery/zookeeper/util/Utils.java     |  54 ++++
 .../dosgi/discovery/zookeeper/Activator.java    |  43 ---
 .../discovery/zookeeper/ZooKeeperDiscovery.java | 186 ------------
 .../zookeeper/publish/DiscoveryPlugin.java      |  54 ----
 .../publish/PublishingEndpointListener.java     | 210 -------------
 .../PublishingEndpointListenerFactory.java      | 105 -------
 .../subscribe/EndpointListenerTracker.java      |  56 ----
 .../zookeeper/subscribe/InterfaceMonitor.java   | 262 ----------------
 .../subscribe/InterfaceMonitorManager.java      | 261 ----------------
 .../dosgi/discovery/zookeeper/util/Utils.java   |  54 ----
 .../resources/OSGI-INF/metatype/zookeeper.xml   |  34 +++
 .../zookeeper/DiscoveryDriverTest.java          | 135 +++++++++
 .../FindInZooKeeperCustomizerTest.java          | 301 +++++++++++++++++++
 .../InterfaceDataMonitorListenerImplTest.java   | 183 +++++++++++
 .../zookeeper/ZookeeperDiscoveryTest.java       |  56 ++++
 .../PublishingEndpointListenerFactoryTest.java  | 102 +++++++
 .../publish/PublishingEndpointListenerTest.java | 209 +++++++++++++
 .../zookeeper/server/ZookeeperStarterTest.java  |  82 +++++
 .../subscribe/InterfaceMonitorManagerTest.java  | 113 +++++++
 .../subscribe/InterfaceMonitorTest.java         |  68 +++++
 .../rsa/discovery/zookeeper/util/UtilsTest.java |  37 +++
 .../zookeeper/DiscoveryDriverTest.java          | 135 ---------
 .../FindInZooKeeperCustomizerTest.java          | 301 -------------------
 .../InterfaceDataMonitorListenerImplTest.java   | 183 -----------
 .../zookeeper/ZookeeperDiscoveryTest.java       |  55 ----
 .../PublishingEndpointListenerFactoryTest.java  | 100 ------
 .../publish/PublishingEndpointListenerTest.java | 207 -------------
 .../subscribe/InterfaceMonitorManagerTest.java  | 112 -------
 .../subscribe/InterfaceMonitorTest.java         |  67 -----
 .../discovery/zookeeper/util/UtilsTest.java     |  35 ---
 examples/echotcp/Readme.md                      |  44 +++
 examples/echotcp/api/bnd.bnd                    |   1 +
 examples/echotcp/api/pom.xml                    |  11 +
 .../rsa/examples/echotcp/api/EchoService.java   |   5 +
 .../aries/rsa/examples/echotcp/api/packageinfo  |  19 ++
 examples/echotcp/consumer/bnd.bnd               |   1 +
 examples/echotcp/consumer/pom.xml               |  39 +++
 .../examples/echotcp/consumer/EchoConsumer.java |  41 +++
 examples/echotcp/pom.xml                        |  76 +++++
 examples/echotcp/service/bnd.bnd                |   1 +
 examples/echotcp/service/pom.xml                |  39 +++
 .../echotcp/service/EchoServiceImpl.java        |  14 +
 examples/pom.xml                                |  43 +++
 features/src/main/resources/features.xml        |   4 -
 itests/felix/pom.xml                            |  35 ++-
 .../aries/rsa/itests/felix/RsaTestBase.java     | 120 ++++++++
 .../rsa/itests/felix/TestDiscoveryExport.java   |  90 +-----
 .../aries/rsa/itests/felix/TestRoundTrip.java   |  88 ++++++
 .../felix/ZookeeperDiscoveryConfigurer.java     |  28 ++
 .../itests/felix/ZookeeperServerConfigurer.java |  32 ++
 itests/pom.xml                                  |   1 -
 itests/testbundle-service-tcp/bnd.bnd           |   3 -
 itests/testbundle-service-tcp/pom.xml           |  40 ---
 .../aries/rsa/itests/tcp/api/EchoService.java   |   5 -
 .../aries/rsa/itests/tcp/service/Activator.java |  25 --
 .../rsa/itests/tcp/service/EchoServiceImpl.java |  12 -
 pom.xml                                         |   1 +
 .../aries/rsa/core/ClientServiceFactory.java    |   4 -
 spi/pom.xml                                     |  19 ++
 81 files changed, 3503 insertions(+), 3240 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/pom.xml
----------------------------------------------------------------------
diff --git a/discovery/pom.xml b/discovery/pom.xml
index 40b5f80..9fc449b 100644
--- a/discovery/pom.xml
+++ b/discovery/pom.xml
@@ -34,7 +34,5 @@
     <modules>
       <module>local</module>
       <module>zookeeper</module>
-      <module>zookeeper-server</module>
-      <module>zookeeper-server-config</module>
     </modules>
 </project>

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper-server-config/bnd.bnd
----------------------------------------------------------------------
diff --git a/discovery/zookeeper-server-config/bnd.bnd b/discovery/zookeeper-server-config/bnd.bnd
deleted file mode 100644
index 769558e..0000000
--- a/discovery/zookeeper-server-config/bnd.bnd
+++ /dev/null
@@ -1 +0,0 @@
-Bundle-Activator: org.apache.cxf.dosgi.discovery.zookeeper.server.config.Activator

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper-server-config/pom.xml
----------------------------------------------------------------------
diff --git a/discovery/zookeeper-server-config/pom.xml b/discovery/zookeeper-server-config/pom.xml
deleted file mode 100644
index 4f7ac7a..0000000
--- a/discovery/zookeeper-server-config/pom.xml
+++ /dev/null
@@ -1,40 +0,0 @@
-<?xml version='1.0' encoding='UTF-8' ?>
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one
-  or more contributor license agreements. See the NOTICE file
-  distributed with this work for additional information
-  regarding copyright ownership. The ASF licenses this file
-  to you under the Apache License, Version 2.0 (the
-  "License"); you may not use this file except in compliance
-  with the License. You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing,
-  software distributed under the License is distributed on an
-  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-  KIND, either express or implied. See the License for the
-  specific language governing permissions and limitations
-  under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
-    <modelVersion>4.0.0</modelVersion>
-
-    <parent>
-      <groupId>org.apache.aries.rsa</groupId>
-      <artifactId>parent</artifactId>
-      <version>1.8-SNAPSHOT</version>
-      <relativePath>../../parent/pom.xml</relativePath>
-    </parent>
-    
-    <groupId>org.apache.aries.rsa.discovery</groupId>
-    <artifactId>zookeeper-server-config</artifactId>
-    <packaging>bundle</packaging>
-    <name>Aries Remote Service Admin Discovery Zookeeper Config</name>
-
-    <properties>
-        <topDirectoryLocation>../..</topDirectoryLocation>
-    </properties>
-
-</project>

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper-server-config/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/server/config/Activator.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper-server-config/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/server/config/Activator.java b/discovery/zookeeper-server-config/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/server/config/Activator.java
deleted file mode 100644
index e92fe0b..0000000
--- a/discovery/zookeeper-server-config/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/server/config/Activator.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.dosgi.discovery.zookeeper.server.config;
-
-import java.io.IOException;
-import java.net.ServerSocket;
-import java.util.Dictionary;
-import java.util.Hashtable;
-
-import org.osgi.framework.BundleActivator;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.ServiceReference;
-import org.osgi.service.cm.Configuration;
-import org.osgi.service.cm.ConfigurationAdmin;
-import org.osgi.service.cm.ManagedService;
-import org.osgi.util.tracker.ServiceTracker;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class Activator implements BundleActivator {
-
-    private static final Logger LOG = LoggerFactory.getLogger(Activator.class);
-    private static final String ZOOKEEPER_PORT = "org.apache.aries.rsa.discovery.zookeeper.port";
-    private static final String PID = "org.apache.aries.rsa.discovery.zookeeper.server";
-    private ServiceTracker<ConfigurationAdmin, ConfigurationAdmin> st;
-
-    public void start(BundleContext context) throws Exception {
-        synchronized (Activator.class) {
-            // Only one thread gets to set the port number
-            if (System.getProperty(ZOOKEEPER_PORT) == null) {
-                String port = getFreePort();
-                System.setProperty(ZOOKEEPER_PORT, port);
-                LOG.info("Global ZooKeeper port: {}", port);
-            }
-        }
-
-        st = new ServiceTracker<ConfigurationAdmin, ConfigurationAdmin>(context, ConfigurationAdmin.class, null) {
-            @Override
-            public ConfigurationAdmin addingService(ServiceReference<ConfigurationAdmin> reference) {
-                ConfigurationAdmin service = super.addingService(reference);
-                try {
-                    Configuration cfg = service.getConfiguration(PID, null);
-                    Dictionary<String, Object> props = new Hashtable<String, Object>();
-                    String zp = System.getProperty(ZOOKEEPER_PORT);
-                    props.put("clientPort", zp);
-                    cfg.update(props);
-                    LOG.debug("Set ZooKeeper client port to {}", zp);
-                } catch (IOException e) {
-                    LOG.error("Failed to configure ZooKeeper server!", e);
-                }
-                return service;
-            }
-        };
-        st.open();
-
-        // The following section is done synchronously otherwise it doesn't happen in time for the CT
-        ServiceReference<?>[] refs = context.getServiceReferences(ManagedService.class.getName(),
-                "(service.pid=org.apache.cxf.dosgi.discovery.zookeeper)");
-        if (refs == null || refs.length == 0) {
-            throw new RuntimeException("This bundle must be started after the bundle with the ZooKeeper "
-                                       + "Discovery Managed Service was started.");
-        }
-
-        Dictionary<String, Object> props = new Hashtable<String, Object>();
-        props.put("zookeeper.host", "127.0.0.1");
-        props.put("zookeeper.port", System.getProperty(ZOOKEEPER_PORT));
-
-        ManagedService ms = (ManagedService) context.getService(refs[0]);
-        try {
-            ms.updated(props);
-        } finally {
-            if (ms != null) {
-                context.ungetService(refs[0]);
-            }
-        }
-        LOG.debug("Passed the zookeeper.host property to the ZooKeeper Client managed service.");
-    }
-
-    private String getFreePort() {
-        try {
-            ServerSocket ss = new ServerSocket(0);
-            String port = "" + ss.getLocalPort();
-            ss.close();
-            return port;
-        } catch (IOException e) {
-            LOG.error("Failed to find a free port!", e);
-            return null;
-        }
-    }
-
-    public void stop(BundleContext context) throws Exception {
-        st.close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper-server/bnd.bnd
----------------------------------------------------------------------
diff --git a/discovery/zookeeper-server/bnd.bnd b/discovery/zookeeper-server/bnd.bnd
deleted file mode 100644
index cef642b..0000000
--- a/discovery/zookeeper-server/bnd.bnd
+++ /dev/null
@@ -1 +0,0 @@
-Bundle-Activator: org.apache.cxf.dosgi.discovery.zookeeper.server.Activator

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper-server/pom.xml
----------------------------------------------------------------------
diff --git a/discovery/zookeeper-server/pom.xml b/discovery/zookeeper-server/pom.xml
deleted file mode 100644
index e6bcdba..0000000
--- a/discovery/zookeeper-server/pom.xml
+++ /dev/null
@@ -1,50 +0,0 @@
-<?xml version='1.0' encoding='UTF-8' ?>
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one
-  or more contributor license agreements. See the NOTICE file
-  distributed with this work for additional information
-  regarding copyright ownership. The ASF licenses this file
-  to you under the Apache License, Version 2.0 (the
-  "License"); you may not use this file except in compliance
-  with the License. You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing,
-  software distributed under the License is distributed on an
-  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-  KIND, either express or implied. See the License for the
-  specific language governing permissions and limitations
-  under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
-    <modelVersion>4.0.0</modelVersion>
-
-    <parent>
-      <groupId>org.apache.aries.rsa</groupId>
-      <artifactId>parent</artifactId>
-      <version>1.8-SNAPSHOT</version>
-      <relativePath>../../parent/pom.xml</relativePath>
-    </parent>
-    
-    <groupId>org.apache.aries.rsa.discovery</groupId>
-    <artifactId>zookeeper-server</artifactId>
-    <packaging>bundle</packaging>
-    <name>Aries Remote Service Admin Discovery Zookeeper Server</name>
-    
-
-    <properties>
-        <topDirectoryLocation>../..</topDirectoryLocation>
-    </properties>
-
-    <dependencies>
-        <dependency>
-            <groupId>org.apache.zookeeper</groupId>
-            <artifactId>zookeeper</artifactId>
-            <scope>provided</scope>
-        </dependency>
-
-    </dependencies>
-
-</project>

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper-server/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/server/Activator.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper-server/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/server/Activator.java b/discovery/zookeeper-server/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/server/Activator.java
deleted file mode 100644
index 17c5568..0000000
--- a/discovery/zookeeper-server/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/server/Activator.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.dosgi.discovery.zookeeper.server;
-
-import java.util.Dictionary;
-import java.util.Hashtable;
-
-import org.osgi.framework.BundleActivator;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.Constants;
-
-public class Activator implements BundleActivator {
-
-    ZookeeperStarter zkStarter;
-
-    public void start(BundleContext context) throws Exception {
-        zkStarter = new ZookeeperStarter(context);
-        Dictionary<String, Object> props = new Hashtable<String, Object>();
-        props.put(Constants.SERVICE_PID, "org.apache.aries.rsa.discovery.zookeeper.server");
-        context.registerService(org.osgi.service.cm.ManagedService.class.getName(), zkStarter, props);
-    }
-
-    public void stop(BundleContext context) throws Exception {
-        if (zkStarter != null) {
-            zkStarter.shutdown();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper-server/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/server/Utils.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper-server/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/server/Utils.java b/discovery/zookeeper-server/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/server/Utils.java
deleted file mode 100644
index fe3c663..0000000
--- a/discovery/zookeeper-server/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/server/Utils.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.dosgi.discovery.zookeeper.server;
-
-import java.util.ArrayList;
-import java.util.Dictionary;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-/**
- * General purpose utility methods.
- */
-public final class Utils {
-
-    private Utils() {
-        // prevent instantiation
-    }
-
-    /**
-     * Remove entries whose values are empty from the given dictionary.
-     *
-     * @param dict a dictionary
-     */
-    public static void removeEmptyValues(Dictionary<String, ?> dict) {
-        List<String> keysToRemove = new ArrayList<String>();
-        Enumeration<String> keys = dict.keys();
-        while (keys.hasMoreElements()) {
-            String key = keys.nextElement();
-            Object value = dict.get(key);
-            if (value instanceof String && "".equals(value)) {
-                keysToRemove.add(key);
-            }
-        }
-        for (String key : keysToRemove) {
-            dict.remove(key);
-        }
-    }
-
-    /**
-     * Puts the given key-value pair in the given dictionary if the key does not
-     * already exist in it or if its existing value is null.
-     *
-     * @param dict a dictionary
-     * @param key the key
-     * @param value the default value to set
-     */
-    public static void setDefault(Dictionary<String, String> dict, String key, String value) {
-        if (dict.get(key) == null) {
-            dict.put(key, value);
-        }
-    }
-
-    /**
-     * Converts the given Dictionary to a Map.
-     *
-     * @param dict a dictionary
-     * @param <K> the key type
-     * @param <V> the value type
-     * @return the converted map, or an empty map if the given dictionary is null
-     */
-    public static <K, V> Map<K, V> toMap(Dictionary<K, V> dict) {
-        Map<K, V> map = new HashMap<K, V>();
-        if (dict != null) {
-            Enumeration<K> keys = dict.keys();
-            while (keys.hasMoreElements()) {
-                K key = keys.nextElement();
-                map.put(key, dict.get(key));
-            }
-        }
-        return map;
-    }
-
-    /**
-     * Converts a Dictionary into a Properties instance.
-     *
-     * @param dict a dictionary
-     * @param <K> the key type
-     * @param <V> the value type
-     * @return the properties
-     */
-    public static <K, V> Properties toProperties(Dictionary<K, V> dict) {
-        Properties props = new Properties();
-        for (Enumeration<K> e = dict.keys(); e.hasMoreElements();) {
-            K key = e.nextElement();
-            props.put(key, dict.get(key));
-        }
-        return props;
-    }
-}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper-server/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/server/ZookeeperStarter.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper-server/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/server/ZookeeperStarter.java b/discovery/zookeeper-server/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/server/ZookeeperStarter.java
deleted file mode 100644
index bd5618f..0000000
--- a/discovery/zookeeper-server/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/server/ZookeeperStarter.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.dosgi.discovery.zookeeper.server;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Dictionary;
-import java.util.Map;
-
-import org.apache.zookeeper.server.ServerConfig;
-import org.apache.zookeeper.server.ZooKeeperServerMain;
-import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
-import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
-import org.apache.zookeeper.server.quorum.QuorumPeerMain;
-import org.osgi.framework.BundleContext;
-import org.osgi.service.cm.ConfigurationException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ZookeeperStarter implements org.osgi.service.cm.ManagedService {
-
-    private static final Logger LOG = LoggerFactory.getLogger(ZookeeperStarter.class); //NOPMD - using log4j here
-
-    protected ZookeeperServer main;
-    private final BundleContext bundleContext;
-    private Thread zkMainThread;
-    private Map<String, ?> curConfiguration;
-
-    public ZookeeperStarter(BundleContext ctx) {
-        bundleContext = ctx;
-    }
-
-    synchronized void shutdown() {
-        if (main != null) {
-            LOG.info("Shutting down ZooKeeper server");
-            try {
-                main.shutdown();
-                if (zkMainThread != null) {
-                    zkMainThread.join();
-                }
-            } catch (Throwable e) {
-                LOG.error(e.getMessage(), e);
-            }
-            main = null;
-            zkMainThread = null;
-        }
-    }
-
-    private void setDefaults(Dictionary<String, String> dict) throws IOException {
-        Utils.removeEmptyValues(dict); // to avoid NumberFormatExceptions
-        Utils.setDefault(dict, "tickTime", "2000");
-        Utils.setDefault(dict, "initLimit", "10");
-        Utils.setDefault(dict, "syncLimit", "5");
-        Utils.setDefault(dict, "clientPort", "2181");
-        Utils.setDefault(dict, "dataDir", new File(bundleContext.getDataFile(""), "zkdata").getCanonicalPath());
-    }
-
-    @SuppressWarnings("unchecked")
-    public synchronized void updated(Dictionary<String, ?> dict) throws ConfigurationException {
-        LOG.debug("Received configuration update for Zookeeper Server: " + dict);
-        try {
-            if (dict != null) {
-                setDefaults((Dictionary<String, String>)dict);
-            }
-            Map<String, ?> configMap = Utils.toMap(dict);
-            if (!configMap.equals(curConfiguration)) { // only if something actually changed
-                shutdown();
-                curConfiguration = configMap;
-                // config is null if it doesn't exist, is being deleted or has not yet been loaded
-                // in which case we just stop running
-                if (dict != null) {
-                    startFromConfig(parseConfig(dict));
-                    LOG.info("Applied configuration update: " + dict);
-                }
-            }
-        } catch (Exception th) {
-            LOG.error("Problem applying configuration update: " + dict, th);
-        }
-    }
-
-    private QuorumPeerConfig parseConfig(Dictionary<String, ?> dict) throws IOException, ConfigException {
-        QuorumPeerConfig config = new QuorumPeerConfig();
-        config.parseProperties(Utils.toProperties(dict));
-        return config;
-    }
-
-    protected void startFromConfig(final QuorumPeerConfig config) {
-        int numServers = config.getServers().size();
-        main = numServers > 1 ? new MyQuorumPeerMain(config) : new MyZooKeeperServerMain(config);
-        zkMainThread = new Thread(new Runnable() {
-            public void run() {
-                try {
-                    main.startup();
-                } catch (Throwable e) {
-                    LOG.error("Problem running ZooKeeper server.", e);
-                }
-            }
-        });
-        zkMainThread.start();
-    }
-
-    interface ZookeeperServer {
-        void startup() throws IOException;
-        void shutdown();
-    }
-
-    static class MyQuorumPeerMain extends QuorumPeerMain implements ZookeeperServer {
-
-        private QuorumPeerConfig config;
-
-        MyQuorumPeerMain(QuorumPeerConfig config) {
-            this.config = config;
-        }
-
-        public void startup() throws IOException {
-            runFromConfig(config);
-        }
-
-        public void shutdown() {
-            if (null != quorumPeer) {
-                quorumPeer.shutdown();
-            }
-        }
-    }
-
-    static class MyZooKeeperServerMain extends ZooKeeperServerMain implements ZookeeperServer {
-
-        private QuorumPeerConfig config;
-
-        MyZooKeeperServerMain(QuorumPeerConfig config) {
-            this.config = config;
-        }
-
-        public void startup() throws IOException {
-            ServerConfig serverConfig = new ServerConfig();
-            serverConfig.readFrom(config);
-            runFromConfig(serverConfig);
-        }
-
-        public void shutdown() {
-            try {
-                super.shutdown();
-            } catch (Exception e) {
-                LOG.error("Error shutting down ZooKeeper", e);
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper-server/src/main/resources/OSGI-INF/metatype/zookeeper.xml
----------------------------------------------------------------------
diff --git a/discovery/zookeeper-server/src/main/resources/OSGI-INF/metatype/zookeeper.xml b/discovery/zookeeper-server/src/main/resources/OSGI-INF/metatype/zookeeper.xml
deleted file mode 100644
index efd9403..0000000
--- a/discovery/zookeeper-server/src/main/resources/OSGI-INF/metatype/zookeeper.xml
+++ /dev/null
@@ -1,34 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one
-  or more contributor license agreements. See the NOTICE file
-  distributed with this work for additional information
-  regarding copyright ownership. The ASF licenses this file
-  to you under the Apache License, Version 2.0 (the
-  "License"); you may not use this file except in compliance
-  with the License. You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing,
-  software distributed under the License is distributed on an
-  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-  KIND, either express or implied. See the License for the
-  specific language governing permissions and limitations
-  under the License.
--->
-<MetaData xmlns="http://www.osgi.org/xmlns/metadata/v1.0.0"
-	 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-	 xsi:schemaLocation="
-		 http://www.osgi.org/xmlns/metadata/v1.0.0 http://www.osgi.org/xmlns/metatype/v1.1.0/metatype.xsd
-	 ">
-	 <OCD description="" name="Zookeeper server config" id="org.apache.cxf.dosgi.discovery.zookeeper.server">
-        <AD id="clientPort" required="false" type="String" default="2181" description=""/>
-	 	<AD id="tickTime" required="false" type="String" default="2000" description=""/>
-        <AD id="initLimit" required="false" type="String" default="10" description=""/>
-        <AD id="syncLimit" required="false" type="String" default="5" description=""/>
-	 </OCD>
-	 <Designate pid="org.apache.cxf.dosgi.discovery.zookeeper.server">
-	 	<Object ocdref="org.apache.cxf.dosgi.discovery.zookeeper.server"/>
-	 </Designate>
-</MetaData>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper-server/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/server/ZookeeperStarterTest.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper-server/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/server/ZookeeperStarterTest.java b/discovery/zookeeper-server/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/server/ZookeeperStarterTest.java
deleted file mode 100644
index 17ca117..0000000
--- a/discovery/zookeeper-server/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/server/ZookeeperStarterTest.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.dosgi.discovery.zookeeper.server;
-
-import java.io.File;
-import java.util.Dictionary;
-import java.util.Hashtable;
-
-import junit.framework.TestCase;
-
-import org.apache.cxf.dosgi.discovery.zookeeper.server.ZookeeperStarter.MyZooKeeperServerMain;
-import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
-import org.easymock.classextension.EasyMock;
-import org.easymock.classextension.IMocksControl;
-import org.osgi.framework.BundleContext;
-
-import static org.easymock.EasyMock.expect;
-import static org.easymock.classextension.EasyMock.replay;
-import static org.easymock.classextension.EasyMock.verify;
-
-public class ZookeeperStarterTest extends TestCase {
-
-    public void testUpdateConfig() throws Exception {
-        final File tempDir = new File("target");
-        IMocksControl control = EasyMock.createControl();
-        BundleContext bc = control.createMock(BundleContext.class);
-        expect(bc.getDataFile("")).andReturn(tempDir);
-        final MyZooKeeperServerMain mockServer = control.createMock(MyZooKeeperServerMain.class);
-        control.replay();
-
-        ZookeeperStarter starter = new ZookeeperStarter(bc) {
-            @Override
-            protected void startFromConfig(QuorumPeerConfig config) {
-                assertEquals(1234, config.getClientPortAddress().getPort());
-                assertTrue(config.getDataDir().contains(tempDir + File.separator + "zkdata"));
-                assertEquals(2000, config.getTickTime());
-                assertEquals(10, config.getInitLimit());
-                assertEquals(5, config.getSyncLimit());
-                this.main = mockServer;
-            }
-        };
-        Dictionary<String, Object> props = new Hashtable<String, Object>();
-        props.put("clientPort", "1234");
-        starter.updated(props);
-        assertNotNull(starter.main);
-
-        control.verify();
-    }
-
-    public void testRemoveConfiguration() throws Exception {
-        BundleContext bc = EasyMock.createMock(BundleContext.class);
-        MyZooKeeperServerMain zkServer = EasyMock.createMock(MyZooKeeperServerMain.class);
-        zkServer.shutdown();
-        EasyMock.expectLastCall();
-
-        replay(zkServer);
-
-        ZookeeperStarter starter = new ZookeeperStarter(bc);
-        starter.main = zkServer;
-        starter.updated(null);
-
-        verify(zkServer);
-        assertNull("main should be null", starter.main);
-    }
-}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper/bnd.bnd
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/bnd.bnd b/discovery/zookeeper/bnd.bnd
index 5c1f23d..3e572c6 100644
--- a/discovery/zookeeper/bnd.bnd
+++ b/discovery/zookeeper/bnd.bnd
@@ -1 +1,2 @@
-Bundle-Activator:  org.apache.cxf.dosgi.discovery.zookeeper.Activator
+Bundle-Activator: org.apache.aries.rsa.discovery.zookeeper.Activator
+

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/Activator.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/Activator.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/Activator.java
new file mode 100644
index 0000000..3b17f35
--- /dev/null
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/Activator.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.discovery.zookeeper;
+
+import java.util.Dictionary;
+import java.util.Hashtable;
+
+import org.apache.aries.rsa.discovery.zookeeper.server.ZookeeperStarter;
+import org.osgi.framework.BundleActivator;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.Constants;
+import org.osgi.service.cm.ManagedService;
+
+public class Activator implements BundleActivator {
+
+    private static final String PID_DISCOVERY_ZOOKEEPER = "org.apache.aries.rsa.discovery.zookeeper";
+    private static final String PID_ZOOKEEPER_SERVER    = "org.apache.aries.rsa.discovery.zookeeper.server";
+    private ZooKeeperDiscovery zkd;
+    private ZookeeperStarter zkStarter;
+
+    public synchronized void start(BundleContext bc) throws Exception {
+        zkd = new ZooKeeperDiscovery(bc);
+        bc.registerService(ManagedService.class, zkd, configProperties(PID_DISCOVERY_ZOOKEEPER));
+        
+        zkStarter = new ZookeeperStarter(bc);
+        bc.registerService(ManagedService.class, zkStarter, configProperties(PID_ZOOKEEPER_SERVER));
+    }
+
+    public synchronized void stop(BundleContext bc) throws Exception {
+        zkd.stop(true);
+        
+        if (zkStarter != null) {
+            zkStarter.shutdown();
+        }
+    }
+    
+    private Dictionary<String, String> configProperties(String pid) {
+        Dictionary<String, String> props = new Hashtable<String, String>();
+        props.put(Constants.SERVICE_PID, pid);
+        return props;
+    }
+}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java
new file mode 100644
index 0000000..085c074
--- /dev/null
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.discovery.zookeeper;
+
+import java.io.IOException;
+import java.util.Dictionary;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.aries.rsa.discovery.zookeeper.publish.PublishingEndpointListenerFactory;
+import org.apache.aries.rsa.discovery.zookeeper.subscribe.EndpointListenerTracker;
+import org.apache.aries.rsa.discovery.zookeeper.subscribe.InterfaceMonitorManager;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.osgi.framework.BundleContext;
+import org.osgi.service.cm.ConfigurationException;
+import org.osgi.service.cm.ManagedService;
+import org.osgi.service.remoteserviceadmin.EndpointListener;
+import org.osgi.util.tracker.ServiceTracker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ZooKeeperDiscovery implements Watcher, ManagedService {
+
+    public static final String DISCOVERY_ZOOKEEPER_ID = "org.apache.cxf.dosgi.discovery.zookeeper";
+
+    private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperDiscovery.class);
+
+    private final BundleContext bctx;
+
+    private PublishingEndpointListenerFactory endpointListenerFactory;
+    private ServiceTracker<EndpointListener, EndpointListener> endpointListenerTracker;
+    private InterfaceMonitorManager imManager;
+    private ZooKeeper zkClient;
+    private boolean closed;
+    private boolean started;
+
+    private Dictionary<String, ?> curConfiguration;
+
+    public ZooKeeperDiscovery(BundleContext bctx) {
+        this.bctx = bctx;
+    }
+
+    public synchronized void updated(Dictionary<String, ?> configuration) throws ConfigurationException {
+        LOG.debug("Received configuration update for Zookeeper Discovery: {}", configuration);
+        // make changes only if config actually changed, to prevent unnecessary ZooKeeper reconnections
+        if (!ZooKeeperDiscovery.toMap(configuration).equals(ZooKeeperDiscovery.toMap(curConfiguration))) {
+            stop(false);
+            curConfiguration = configuration;
+            // config is null if it doesn't exist, is being deleted or has not yet been loaded
+            // in which case we just stop running
+            if (!closed && configuration != null) {
+                try {
+                    createZookeeper(configuration);
+                } catch (IOException e) {
+                    throw new ConfigurationException(null, "Error starting zookeeper client", e);
+                }
+            }
+        }
+    }
+
+    private synchronized void start() {
+        if (closed) {
+            return;
+        }
+        if (started) {
+            // we must be re-entrant, i.e. can be called when already started
+            LOG.debug("ZookeeperDiscovery already started");
+            return;
+        }
+        LOG.debug("starting ZookeeperDiscovery");
+        endpointListenerFactory = new PublishingEndpointListenerFactory(zkClient, bctx);
+        endpointListenerFactory.start();
+        imManager = new InterfaceMonitorManager(bctx, zkClient);
+        endpointListenerTracker = new EndpointListenerTracker(bctx, imManager);
+        endpointListenerTracker.open();
+        started = true;
+    }
+
+    public synchronized void stop(boolean close) {
+        if (started) {
+            LOG.debug("stopping ZookeeperDiscovery");
+        }
+        started = false;
+        closed |= close;
+        if (endpointListenerFactory != null) {
+            endpointListenerFactory.stop();
+        }
+        if (endpointListenerTracker != null) {
+            endpointListenerTracker.close();
+        }
+        if (imManager != null) {
+            imManager.close();
+        }
+        if (zkClient != null) {
+            try {
+                zkClient.close();
+            } catch (InterruptedException e) {
+                LOG.error("Error closing ZooKeeper", e);
+            }
+        }
+    }
+
+    protected ZooKeeper createZooKeeper(String host, String port, int timeout) throws IOException {
+        LOG.info("ZooKeeper discovery connecting to {}:{} with timeout {}",
+                new Object[]{host, port, timeout});
+        return new ZooKeeper(host + ":" + port, timeout, this);
+    }
+
+    /* Callback for ZooKeeper */
+    public void process(WatchedEvent event) {
+        LOG.debug("got ZooKeeper event " + event);
+        switch (event.getState()) {
+        case SyncConnected:
+            LOG.info("Connection to ZooKeeper established");
+            // this event can be triggered more than once in a row (e.g. after Disconnected event),
+            // so we must be re-entrant here
+            start();
+            break;
+
+        case Expired:
+            LOG.info("Connection to ZooKeeper expired. Trying to create a new connection");
+            stop(false);
+            try {
+                createZookeeper(curConfiguration);
+            } catch (IOException e) {
+                LOG.error("Error starting zookeeper client", e);
+            }
+            break;
+
+        default:
+            // ignore other events
+            break;
+        }
+    }
+
+    private void createZookeeper(Dictionary<String, ?> config) throws IOException {
+        String host = (String)getWithDefault(config, "zookeeper.host", "localhost");
+        String port = (String)getWithDefault(config, "zookeeper.port", "2181");
+        int timeout = Integer.parseInt((String)getWithDefault(config, "zookeeper.timeout", "3000"));
+        zkClient = createZooKeeper(host, port, timeout);
+    }
+    
+    public Object getWithDefault(Dictionary<String, ?> config, String key, Object defaultValue) {
+        Object value = config.get(key);
+        return value != null ? value : defaultValue;
+    }
+    
+    /**
+     * Converts the given Dictionary to a Map.
+     *
+     * @param dict a dictionary
+     * @param <K> the key type
+     * @param <V> the value type
+     * @return the converted map, or an empty map if the given dictionary is null
+     */
+    public static <K, V> Map<K, V> toMap(Dictionary<K, V> dict) {
+        Map<K, V> map = new HashMap<K, V>();
+        if (dict != null) {
+            Enumeration<K> keys = dict.keys();
+            while (keys.hasMoreElements()) {
+                K key = keys.nextElement();
+                map.put(key, dict.get(key));
+            }
+        }
+        return map;
+    }
+}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/DiscoveryPlugin.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/DiscoveryPlugin.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/DiscoveryPlugin.java
new file mode 100644
index 0000000..033bee2
--- /dev/null
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/DiscoveryPlugin.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.discovery.zookeeper.publish;
+
+import java.util.Map;
+
+/**
+ * This interface allows transformation of service registration information before it is pushed into the ZooKeeper
+ * discovery system.
+ * It can be useful for situations where a host name or port number needs to be changed in cases where the host running
+ * the service is known differently from the outside to what the local Java process thinks it is.
+ * Extra service properties can also be added to the registration which can be useful to refine the remote service
+ * lookup process. <p/>
+ *
+ * DiscoveryPlugins use the OSGi WhiteBoard pattern. To add one to the system, register an instance under this interface
+ * with the OSGi Service Registry. All registered DiscoveryPlugin instances are visited and given a chance to
+ * process the information before it is pushed into ZooKeeper. <p/>
+ *
+ * Note that the changes made using this plugin do not modify the local service registration.
+ *
+ */
+public interface DiscoveryPlugin {
+
+    /**
+     * Process service registration information. Plugins can change this information before it is published into the
+     * ZooKeeper discovery system.
+     *
+     * @param mutableProperties A map of service registration properties. The map is mutable and any changes to the map
+     * will be reflected in the ZooKeeper registration.
+     * @param endpointKey The key under which the service is registered in ZooKeeper. This key typically has the
+     * following format: hostname#port##context. While the actual value of this key is not actually used by the
+     * system (people can use it as a hint to understand where the service is located), the value <i>must</i> be
+     * unique for all services of a given type.
+     * @return The <tt>endpointKey</tt> value to be used. If there is no need to change this simply return the value
+     * of the <tt>endpointKey</tt> parameter.
+     */
+    String process(Map<String, Object> mutableProperties, String endpointKey);
+}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListener.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListener.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListener.java
new file mode 100644
index 0000000..75efbd3
--- /dev/null
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListener.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.discovery.zookeeper.publish;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.aries.rsa.discovery.endpoint.EndpointDescriptionParser;
+import org.apache.aries.rsa.discovery.endpoint.PropertiesMapper;
+import org.apache.aries.rsa.discovery.zookeeper.util.Utils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.osgi.framework.BundleContext;
+import org.osgi.service.remoteserviceadmin.EndpointDescription;
+import org.osgi.service.remoteserviceadmin.EndpointListener;
+import org.osgi.util.tracker.ServiceTracker;
+import org.osgi.xmlns.rsa.v1_0.EndpointDescriptionType;
+import org.osgi.xmlns.rsa.v1_0.PropertyType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Listens for local Endpoints and publishes them to ZooKeeper.
+ */
+public class PublishingEndpointListener implements EndpointListener {
+
+    private static final Logger LOG = LoggerFactory.getLogger(PublishingEndpointListener.class);
+
+    private final ZooKeeper zk;
+    private final ServiceTracker<DiscoveryPlugin, DiscoveryPlugin> discoveryPluginTracker;
+    private final List<EndpointDescription> endpoints = new ArrayList<EndpointDescription>();
+    private boolean closed;
+
+    private final EndpointDescriptionParser endpointDescriptionParser;
+
+    public PublishingEndpointListener(ZooKeeper zk, BundleContext bctx) {
+        this.zk = zk;
+        discoveryPluginTracker = new ServiceTracker<DiscoveryPlugin, DiscoveryPlugin>(bctx, 
+            DiscoveryPlugin.class, null);
+        discoveryPluginTracker.open();
+        endpointDescriptionParser = new EndpointDescriptionParser();
+    }
+
+    public void endpointAdded(EndpointDescription endpoint, String matchedFilter) {
+        LOG.info("Local EndpointDescription added: {}", endpoint);
+
+        synchronized (endpoints) {
+            if (closed) {
+                return;
+            }
+            if (endpoints.contains(endpoint)) {
+                // TODO -> Should the published endpoint be updated here?
+                return;
+            }
+
+            try {
+                addEndpoint(endpoint);
+                endpoints.add(endpoint);
+            } catch (Exception ex) {
+                LOG.error("Exception while processing the addition of an endpoint.", ex);
+            }
+        }
+    }
+
+    private void addEndpoint(EndpointDescription endpoint) throws URISyntaxException, KeeperException,
+                                                                  InterruptedException, IOException {
+        Collection<String> interfaces = endpoint.getInterfaces();
+        String endpointKey = getKey(endpoint);
+        Map<String, Object> props = new HashMap<String, Object>(endpoint.getProperties());
+
+        // process plugins
+        Object[] plugins = discoveryPluginTracker.getServices();
+        if (plugins != null) {
+            for (Object plugin : plugins) {
+                if (plugin instanceof DiscoveryPlugin) {
+                    endpointKey = ((DiscoveryPlugin)plugin).process(props, endpointKey);
+                }
+            }
+        }
+
+        for (String name : interfaces) {
+            String path = Utils.getZooKeeperPath(name);
+            String fullPath = path + '/' + endpointKey;
+            LOG.info("Creating ZooKeeper node for service with path {}", fullPath);
+            createPath(path, zk);
+            List<PropertyType> propsOut = new PropertiesMapper().fromProps(props);
+            EndpointDescriptionType epd = new EndpointDescriptionType();
+            epd.getProperty().addAll(propsOut);
+            byte[] epData = endpointDescriptionParser.getData(epd);
+            createEphemeralNode(fullPath, epData);
+        }
+    }
+
+    private void createEphemeralNode(String fullPath, byte[] data) throws KeeperException, InterruptedException {
+        try {
+            zk.create(fullPath, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+        } catch (NodeExistsException nee) {
+            // this sometimes happens after a ZooKeeper node dies and the ephemeral node
+            // that belonged to the old session was not yet deleted. We need to make our
+            // session the owner of the node so it won't get deleted automatically -
+            // we do this by deleting and recreating it ourselves.
+            LOG.info("node for endpoint already exists, recreating: {}", fullPath);
+            try {
+                zk.delete(fullPath, -1);
+            } catch (NoNodeException nne) {
+                // it's a race condition, but as long as it got deleted - it's ok
+            }
+            zk.create(fullPath, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+        }
+    }
+
+    public void endpointRemoved(EndpointDescription endpoint, String matchedFilter) {
+        LOG.info("Local EndpointDescription removed: {}", endpoint);
+
+        synchronized (endpoints) {
+            if (closed) {
+                return;
+            }
+            if (!endpoints.contains(endpoint)) {
+                return;
+            }
+
+            try {
+                removeEndpoint(endpoint);
+                endpoints.remove(endpoint);
+            } catch (Exception ex) {
+                LOG.error("Exception while processing the removal of an endpoint", ex);
+            }
+        }
+    }
+
+    private void removeEndpoint(EndpointDescription endpoint) throws UnknownHostException, URISyntaxException {
+        Collection<String> interfaces = endpoint.getInterfaces();
+        String endpointKey = getKey(endpoint);
+        for (String name : interfaces) {
+            String path = Utils.getZooKeeperPath(name);
+            String fullPath = path + '/' + endpointKey;
+            LOG.debug("Removing ZooKeeper node: {}", fullPath);
+            try {
+                zk.delete(fullPath, -1);
+            } catch (Exception ex) {
+                LOG.debug("Error while removing endpoint: {}", ex); // e.g. session expired
+            }
+        }
+    }
+
+    private static void createPath(String path, ZooKeeper zk) throws KeeperException, InterruptedException {
+        StringBuilder current = new StringBuilder();
+        List<String> parts = Utils.removeEmpty(Arrays.asList(path.split("/")));
+        for (String part : parts) {
+            current.append('/');
+            current.append(part);
+            try {
+                zk.create(current.toString(), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            } catch (NodeExistsException nee) {
+                // it's not the first node with this path to ever exist - that's normal
+            }
+        }
+    }
+
+    private static String getKey(EndpointDescription endpoint) throws URISyntaxException {
+        URI uri = new URI(endpoint.getId());
+        return new StringBuilder().append(uri.getHost()).append("#").append(uri.getPort())
+            .append("#").append(uri.getPath().replace('/', '#')).toString();
+    }
+
+    public void close() {
+        LOG.debug("closing - removing all endpoints");
+        synchronized (endpoints) {
+            closed = true;
+            for (EndpointDescription endpoint : endpoints) {
+                try {
+                    removeEndpoint(endpoint);
+                } catch (Exception ex) {
+                    LOG.error("Exception while removing endpoint during close", ex);
+                }
+            }
+            endpoints.clear();
+        }
+        discoveryPluginTracker.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerFactory.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerFactory.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerFactory.java
new file mode 100644
index 0000000..1eabec3
--- /dev/null
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerFactory.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.discovery.zookeeper.publish;
+
+import java.util.ArrayList;
+import java.util.Dictionary;
+import java.util.Hashtable;
+import java.util.List;
+
+import org.apache.aries.rsa.discovery.zookeeper.ZooKeeperDiscovery;
+import org.apache.zookeeper.ZooKeeper;
+import org.osgi.framework.Bundle;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.Constants;
+import org.osgi.framework.ServiceFactory;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.remoteserviceadmin.EndpointListener;
+import org.osgi.service.remoteserviceadmin.RemoteConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Creates local EndpointListeners that publish to ZooKeeper.
+ */
+public class PublishingEndpointListenerFactory implements ServiceFactory<PublishingEndpointListener> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(PublishingEndpointListenerFactory.class);
+
+    private final BundleContext bctx;
+    private final ZooKeeper zk;
+    private final List<PublishingEndpointListener> listeners = new ArrayList<PublishingEndpointListener>();
+    private ServiceRegistration<?> serviceRegistration;
+
+    public PublishingEndpointListenerFactory(ZooKeeper zk, BundleContext bctx) {
+        this.bctx = bctx;
+        this.zk = zk;
+    }
+
+    public PublishingEndpointListener getService(Bundle b, ServiceRegistration<PublishingEndpointListener> sr) {
+        LOG.debug("new EndpointListener from factory");
+        synchronized (listeners) {
+            PublishingEndpointListener pel = new PublishingEndpointListener(zk, bctx);
+            listeners.add(pel);
+            return pel;
+        }
+    }
+
+    public void ungetService(Bundle b, ServiceRegistration<PublishingEndpointListener> sr, 
+                             PublishingEndpointListener pel) {
+        LOG.debug("remove EndpointListener");
+        synchronized (listeners) {
+            if (listeners.remove(pel)) {
+                pel.close();
+            }
+        }
+    }
+
+    public synchronized void start() {
+        Dictionary<String, String> props = new Hashtable<String, String>();
+        String uuid = bctx.getProperty(Constants.FRAMEWORK_UUID);
+        props.put(EndpointListener.ENDPOINT_LISTENER_SCOPE, 
+                  String.format("(&(%s=*)(%s=%s))", Constants.OBJECTCLASS, 
+                                RemoteConstants.ENDPOINT_FRAMEWORK_UUID, uuid));
+        props.put(ZooKeeperDiscovery.DISCOVERY_ZOOKEEPER_ID, "true");
+        serviceRegistration = bctx.registerService(EndpointListener.class.getName(), this, props);
+    }
+    
+    public synchronized void stop() {
+        if (serviceRegistration != null) {
+            serviceRegistration.unregister();
+            serviceRegistration = null;
+        }
+        synchronized (listeners) {
+            for (PublishingEndpointListener pel : listeners) {
+                pel.close();
+            }
+            listeners.clear();
+        }
+    }
+
+    /**
+     * Only for the test case!
+     */
+    protected List<PublishingEndpointListener> getListeners() {
+        synchronized (listeners) {
+            return listeners;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/server/Utils.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/server/Utils.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/server/Utils.java
new file mode 100644
index 0000000..67ea3a4
--- /dev/null
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/server/Utils.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.discovery.zookeeper.server;
+
+import java.util.ArrayList;
+import java.util.Dictionary;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * General purpose utility methods.
+ */
+public final class Utils {
+
+    private Utils() {
+        // prevent instantiation
+    }
+
+    /**
+     * Remove entries whose values are empty from the given dictionary.
+     *
+     * @param dict a dictionary
+     */
+    public static void removeEmptyValues(Dictionary<String, ?> dict) {
+        List<String> keysToRemove = new ArrayList<String>();
+        Enumeration<String> keys = dict.keys();
+        while (keys.hasMoreElements()) {
+            String key = keys.nextElement();
+            Object value = dict.get(key);
+            if (value instanceof String && "".equals(value)) {
+                keysToRemove.add(key);
+            }
+        }
+        for (String key : keysToRemove) {
+            dict.remove(key);
+        }
+    }
+
+    /**
+     * Puts the given key-value pair in the given dictionary if the key does not
+     * already exist in it or if its existing value is null.
+     *
+     * @param dict a dictionary
+     * @param key the key
+     * @param value the default value to set
+     */
+    public static void setDefault(Dictionary<String, String> dict, String key, String value) {
+        if (dict.get(key) == null) {
+            dict.put(key, value);
+        }
+    }
+
+    /**
+     * Converts the given Dictionary to a Map.
+     *
+     * @param dict a dictionary
+     * @param <K> the key type
+     * @param <V> the value type
+     * @return the converted map, or an empty map if the given dictionary is null
+     */
+    public static <K, V> Map<K, V> toMap(Dictionary<K, V> dict) {
+        Map<K, V> map = new HashMap<K, V>();
+        if (dict != null) {
+            Enumeration<K> keys = dict.keys();
+            while (keys.hasMoreElements()) {
+                K key = keys.nextElement();
+                map.put(key, dict.get(key));
+            }
+        }
+        return map;
+    }
+
+    /**
+     * Converts a Dictionary into a Properties instance.
+     *
+     * @param dict a dictionary
+     * @param <K> the key type
+     * @param <V> the value type
+     * @return the properties
+     */
+    public static <K, V> Properties toProperties(Dictionary<K, V> dict) {
+        Properties props = new Properties();
+        for (Enumeration<K> e = dict.keys(); e.hasMoreElements();) {
+            K key = e.nextElement();
+            props.put(key, dict.get(key));
+        }
+        return props;
+    }
+}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/server/ZookeeperStarter.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/server/ZookeeperStarter.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/server/ZookeeperStarter.java
new file mode 100644
index 0000000..520aa99
--- /dev/null
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/server/ZookeeperStarter.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.discovery.zookeeper.server;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Dictionary;
+import java.util.Map;
+
+import org.apache.zookeeper.server.ServerConfig;
+import org.apache.zookeeper.server.ZooKeeperServerMain;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
+import org.apache.zookeeper.server.quorum.QuorumPeerMain;
+import org.osgi.framework.BundleContext;
+import org.osgi.service.cm.ConfigurationException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ZookeeperStarter implements org.osgi.service.cm.ManagedService {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ZookeeperStarter.class);
+
+    protected ZookeeperServer main;
+    private final BundleContext bundleContext;
+    private Thread zkMainThread;
+    private Map<String, ?> curConfiguration;
+
+    public ZookeeperStarter(BundleContext ctx) {
+        bundleContext = ctx;
+    }
+
+    public synchronized void shutdown() {
+        if (main != null) {
+            LOG.info("Shutting down ZooKeeper server");
+            try {
+                main.shutdown();
+                if (zkMainThread != null) {
+                    zkMainThread.join();
+                }
+            } catch (Throwable e) {
+                LOG.error(e.getMessage(), e);
+            }
+            main = null;
+            zkMainThread = null;
+        }
+    }
+
+    private void setDefaults(Dictionary<String, String> dict) throws IOException {
+        Utils.removeEmptyValues(dict); // to avoid NumberFormatExceptions
+        Utils.setDefault(dict, "tickTime", "2000");
+        Utils.setDefault(dict, "initLimit", "10");
+        Utils.setDefault(dict, "syncLimit", "5");
+        Utils.setDefault(dict, "clientPort", "2181");
+        Utils.setDefault(dict, "dataDir", new File(bundleContext.getDataFile(""), "zkdata").getCanonicalPath());
+    }
+
+    @SuppressWarnings("unchecked")
+    public synchronized void updated(Dictionary<String, ?> dict) throws ConfigurationException {
+        LOG.debug("Received configuration update for Zookeeper Server: " + dict);
+        try {
+            if (dict != null) {
+                setDefaults((Dictionary<String, String>)dict);
+            }
+            Map<String, ?> configMap = Utils.toMap(dict);
+            if (!configMap.equals(curConfiguration)) { // only if something actually changed
+                shutdown();
+                curConfiguration = configMap;
+                // config is null if it doesn't exist, is being deleted or has not yet been loaded
+                // in which case we just stop running
+                if (dict != null) {
+                    startFromConfig(parseConfig(dict));
+                    LOG.info("Applied configuration update: " + dict);
+                }
+            }
+        } catch (Exception th) {
+            LOG.error("Problem applying configuration update: " + dict, th);
+        }
+    }
+
+    private QuorumPeerConfig parseConfig(Dictionary<String, ?> dict) throws IOException, ConfigException {
+        QuorumPeerConfig config = new QuorumPeerConfig();
+        config.parseProperties(Utils.toProperties(dict));
+        return config;
+    }
+
+    protected void startFromConfig(final QuorumPeerConfig config) {
+        int numServers = config.getServers().size();
+        main = numServers > 1 ? new MyQuorumPeerMain(config) : new MyZooKeeperServerMain(config);
+        zkMainThread = new Thread(new Runnable() {
+            public void run() {
+                try {
+                    main.startup();
+                } catch (Throwable e) {
+                    LOG.error("Problem running ZooKeeper server.", e);
+                }
+            }
+        });
+        zkMainThread.start();
+    }
+
+    interface ZookeeperServer {
+        void startup() throws IOException;
+        void shutdown();
+    }
+
+    static class MyQuorumPeerMain extends QuorumPeerMain implements ZookeeperServer {
+
+        private QuorumPeerConfig config;
+
+        MyQuorumPeerMain(QuorumPeerConfig config) {
+            this.config = config;
+        }
+
+        public void startup() throws IOException {
+            runFromConfig(config);
+        }
+
+        public void shutdown() {
+            if (null != quorumPeer) {
+                quorumPeer.shutdown();
+            }
+        }
+    }
+
+    static class MyZooKeeperServerMain extends ZooKeeperServerMain implements ZookeeperServer {
+
+        private QuorumPeerConfig config;
+
+        MyZooKeeperServerMain(QuorumPeerConfig config) {
+            this.config = config;
+        }
+
+        public void startup() throws IOException {
+            ServerConfig serverConfig = new ServerConfig();
+            serverConfig.readFrom(config);
+            runFromConfig(serverConfig);
+        }
+
+        public void shutdown() {
+            try {
+                super.shutdown();
+            } catch (Exception e) {
+                LOG.error("Error shutting down ZooKeeper", e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/EndpointListenerTracker.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/EndpointListenerTracker.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/EndpointListenerTracker.java
new file mode 100644
index 0000000..5909ee0
--- /dev/null
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/EndpointListenerTracker.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.discovery.zookeeper.subscribe;
+
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceReference;
+import org.osgi.service.remoteserviceadmin.EndpointListener;
+import org.osgi.util.tracker.ServiceTracker;
+
+/**
+ * Tracks interest in EndpointListeners. Delegates to InterfaceMonitorManager to manage
+ * interest in the scopes of each EndpointListener.
+ */
+public class EndpointListenerTracker extends ServiceTracker<EndpointListener, EndpointListener> {
+    private final InterfaceMonitorManager imManager;
+
+    public EndpointListenerTracker(BundleContext bctx, InterfaceMonitorManager imManager) {
+        super(bctx, EndpointListener.class, null);
+        this.imManager = imManager;
+    }
+
+    @Override
+    public EndpointListener addingService(ServiceReference<EndpointListener> endpointListener) {
+        imManager.addInterest(endpointListener);
+        return null;
+    }
+
+    @Override
+    public void modifiedService(ServiceReference<EndpointListener> endpointListener, EndpointListener service) {
+        // called when an EndpointListener updates its service properties,
+        // e.g. when its interest scope is expanded/reduced
+        imManager.addInterest(endpointListener);
+    }
+
+    @Override
+    public void removedService(ServiceReference<EndpointListener> endpointListener, EndpointListener service) {
+        imManager.removeInterest(endpointListener);
+    }
+
+}