You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aries.apache.org by cs...@apache.org on 2019/10/28 15:35:56 UTC

[aries-rsa] 02/02: ARIES-1780 - Use DS instead of Activator

This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/aries-rsa.git

commit afc1141c1ad52218e05fec654a6f9b1a6533bb0e
Author: Christian Schneider <cs...@adobe.com>
AuthorDate: Mon Oct 28 16:32:28 2019 +0100

    ARIES-1780 - Use DS instead of Activator
---
 discovery/zookeeper/bnd.bnd                        |   1 -
 discovery/zookeeper/pom.xml                        |   6 +-
 .../aries/rsa/discovery/zookeeper/Activator.java   |  54 ------
 .../rsa/discovery/zookeeper/ClientManager.java     | 148 ++++++++++++++++
 .../discovery/zookeeper/ZooKeeperDiscovery.java    | 186 +++------------------
 .../repository/ZookeeperEndpointRepository.java    |  24 +--
 .../zookeeper/server/ZookeeperStarter.java         |   3 -
 .../rsa/discovery/zookeeper/ClientManagerTest.java | 100 +++++++++++
 .../zookeeper/ZookeeperDiscoveryTest.java          |  76 ---------
 .../ZookeeperEndpointRepositoryTest.java           |   4 +-
 10 files changed, 285 insertions(+), 317 deletions(-)

diff --git a/discovery/zookeeper/bnd.bnd b/discovery/zookeeper/bnd.bnd
index 13b126e..4a8cc29 100644
--- a/discovery/zookeeper/bnd.bnd
+++ b/discovery/zookeeper/bnd.bnd
@@ -14,6 +14,5 @@
 #    KIND, either express or implied.  See the License for the
 #    specific language governing permissions and limitations
 #    under the License.
-Bundle-Activator: org.apache.aries.rsa.discovery.zookeeper.Activator
 Provide-Capability: osgi.remoteserviceadmin.discovery;\
 	protocols:List<String>="zookeeper"; version:Version=1.1.0
diff --git a/discovery/zookeeper/pom.xml b/discovery/zookeeper/pom.xml
index 81fce9d..12612d8 100644
--- a/discovery/zookeeper/pom.xml
+++ b/discovery/zookeeper/pom.xml
@@ -94,7 +94,11 @@
             <version>1.3</version>
             <scope>test</scope>
         </dependency>
-        
+        <dependency>
+            <groupId>org.apache.felix</groupId>
+            <artifactId>org.apache.felix.converter</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
 </project>
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/Activator.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/Activator.java
deleted file mode 100644
index 8003849..0000000
--- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/Activator.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.aries.rsa.discovery.zookeeper;
-
-import java.util.Dictionary;
-import java.util.Hashtable;
-
-import org.apache.zookeeper.server.ZooTrace;
-import org.osgi.framework.BundleActivator;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.Constants;
-import org.osgi.service.cm.ManagedService;
-
-public class Activator implements BundleActivator {
-
-    private static final String PID_DISCOVERY_ZOOKEEPER = "org.apache.aries.rsa.discovery.zookeeper";
-    
-    private ZooKeeperDiscovery zkd;
-
-    public synchronized void start(BundleContext bc) throws Exception {
-        zkd = new ZooKeeperDiscovery(bc);
-        bc.registerService(ManagedService.class, zkd, configProperties(PID_DISCOVERY_ZOOKEEPER));
-        
-    }
-
-    public synchronized void stop(BundleContext bc) throws Exception {
-    	// Load ZooTrace class early to avoid ClassNotFoundException on shutdown
-    	ZooTrace.getTextTraceLevel();
-    	
-        zkd.stop(true);
-    }
-    
-    private Dictionary<String, String> configProperties(String pid) {
-        Dictionary<String, String> props = new Hashtable<>();
-        props.put(Constants.SERVICE_PID, pid);
-        return props;
-    }
-}
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ClientManager.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ClientManager.java
new file mode 100644
index 0000000..691cc50
--- /dev/null
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ClientManager.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.discovery.zookeeper;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.util.Hashtable;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.server.ZooTrace;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.ConfigurationPolicy;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.metatype.annotations.ObjectClassDefinition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Component(//
+        service = ClientManager.class,
+        immediate = true,
+        configurationPid = "org.apache.aries.rsa.discovery.zookeeper", //
+        configurationPolicy = ConfigurationPolicy.REQUIRE //
+)
+public class ClientManager implements Watcher {
+
+    private static final int MAX_PORT_WAIT_MS = 2000;
+    private static final Logger LOG = LoggerFactory.getLogger(ClientManager.class);
+
+    private ZooKeeper zkClient;
+    private DiscoveryConfig config;
+    private ServiceRegistration<ZooKeeper> reg;
+    private BundleContext context;
+
+    @Activate
+    public synchronized void activate(final DiscoveryConfig config, final BundleContext context) {
+        this.config = config;
+        this.context = context;
+        LOG.debug("Received configuration update for Zookeeper Discovery: {}", config);
+        CompletableFuture.runAsync(new Runnable() {
+            public void run() {
+                startClient();
+            }
+        });
+    }
+
+    private void startClient() {
+        try {
+            this.zkClient = createClient(config);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    protected ZooKeeper createClient(DiscoveryConfig config) throws IOException {
+        LOG.info("ZooKeeper discovery connecting to {}:{} with timeout {}", config.zookeeper_host(), config.zookeeper_port(), config.zookeeper_timeout());
+        waitPort(config);
+        return new ZooKeeper(config.zookeeper_host() + ":" + config.zookeeper_port(), config.zookeeper_timeout(), this);
+    }
+
+    @Deactivate
+    public synchronized void stop() {
+        // Load ZooTrace class early to avoid ClassNotFoundException on shutdown
+        ZooTrace.getTextTraceLevel();
+        
+        if (reg != null) {
+            reg.unregister();
+        }
+        CompletableFuture.runAsync(new Runnable() {
+            public void run() {
+                closeClient();
+            }
+        });
+    }
+
+    private void closeClient() {
+        try {
+            zkClient.close();
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /* Callback for ZooKeeper */
+    public void process(WatchedEvent event) {
+        LOG.debug("Got ZooKeeper event " + event);
+
+        if (event.getState() == KeeperState.SyncConnected) {
+            LOG.info("Connection to ZooKeeper established. Publishing Zookeeper service");
+            this.reg = context.registerService(ZooKeeper.class, zkClient, new Hashtable<String, String>());
+        }
+
+        if (event.getState() == KeeperState.Expired) {
+            LOG.info("Connection to ZooKeeper expired. Trying to create a new connection");
+            stop();
+            startClient();
+        }
+    }
+
+    private void waitPort(DiscoveryConfig config) {
+        String host = config.zookeeper_host();
+        Integer port = Integer.parseInt(config.zookeeper_port());
+        long start = System.currentTimeMillis();
+        while (System.currentTimeMillis() - start < MAX_PORT_WAIT_MS) {
+            try (Socket socket = new Socket(host, port)) {
+                return;
+            } catch (IOException e) {
+                safeSleep();
+            }
+        }
+    }
+
+    private void safeSleep() {
+        try {
+            Thread.sleep(100);
+        } catch (InterruptedException e1) {
+        }
+    }
+    
+    @ObjectClassDefinition(name = "Zookeeper discovery config")
+    public @interface DiscoveryConfig {
+        String zookeeper_host() default "localhost";
+        String zookeeper_port() default "2181";
+        int zookeeper_timeout() default 3000;
+    }
+}
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java
index 2e9bea2..1ca8516 100644
--- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java
@@ -18,197 +18,53 @@
  */
 package org.apache.aries.rsa.discovery.zookeeper;
 
-import java.io.IOException;
-import java.net.Socket;
-import java.util.Dictionary;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.Map;
-
 import org.apache.aries.rsa.discovery.zookeeper.publish.PublishingEndpointListener;
 import org.apache.aries.rsa.discovery.zookeeper.repository.ZookeeperEndpointRepository;
 import org.apache.aries.rsa.discovery.zookeeper.subscribe.EndpointListenerTracker;
 import org.apache.aries.rsa.discovery.zookeeper.subscribe.InterestManager;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooKeeper;
 import org.osgi.framework.BundleContext;
-import org.osgi.service.cm.ConfigurationException;
-import org.osgi.service.cm.ManagedService;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
 import org.osgi.util.tracker.ServiceTracker;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class ZooKeeperDiscovery implements Watcher, ManagedService {
-
+@Component
+public class ZooKeeperDiscovery {
     public static final String DISCOVERY_ZOOKEEPER_ID = "org.apache.cxf.dosgi.discovery.zookeeper";
 
     private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperDiscovery.class);
 
-    private final BundleContext bctx;
-
+    @Reference
+    private ZooKeeper zkClient;
+    
     private PublishingEndpointListener endpointListener;
     private ServiceTracker<?, ?> endpointListenerTracker;
     private InterestManager imManager;
-    private ZooKeeper zkClient;
-    private boolean closed;
-    private boolean started;
-
-    private Dictionary<String, ?> curConfiguration;
-
     private ZookeeperEndpointRepository repository;
 
-    public ZooKeeperDiscovery(BundleContext bctx) {
-        this.bctx = bctx;
-    }
-
-    public synchronized void updated(final Dictionary<String, ?> configuration) throws ConfigurationException {
-        LOG.debug("Received configuration update for Zookeeper Discovery: {}", configuration);
-        // make changes only if config actually changed, to prevent unnecessary ZooKeeper reconnections
-        if (!toMap(configuration).equals(toMap(curConfiguration))) {
-            stop(false);
-            curConfiguration = configuration;
-            // config is null if it doesn't exist, is being deleted or has not yet been loaded
-            // in which case we just stop running
-            if (closed || configuration == null) {
-                return;
-            }
-            new Thread(new Runnable() {
-                public void run() {
-                    try {
-                        createZookeeper(configuration);
-                    } catch (IOException e) {
-                        LOG.error("Error starting zookeeper client", e);
-                    }
-                }
-            }).start();
-        }
-    }
-
-    private synchronized void start() {
-        if (closed) {
-            return;
-        }
-        if (started) {
-            // we must be re-entrant, i.e. can be called when already started
-            LOG.debug("ZookeeperDiscovery already started");
-            return;
-        }
-        LOG.debug("starting ZookeeperDiscovery");
+    @Activate
+    public void activate(BundleContext context) {
+        LOG.debug("Starting ZookeeperDiscovery");
         repository = new ZookeeperEndpointRepository(zkClient);
         endpointListener = new PublishingEndpointListener(repository);
-        endpointListener.start(bctx);
+        endpointListener.start(context);
         imManager = new InterestManager(repository);
         repository.addListener(imManager);
-        endpointListenerTracker = new EndpointListenerTracker(bctx, imManager);
+        endpointListenerTracker = new EndpointListenerTracker(context, imManager);
         endpointListenerTracker.open();
-        started = true;
-    }
-
-    public synchronized void stop(boolean close) {
-        if (started) {
-            LOG.debug("stopping ZookeeperDiscovery");
-        }
-        started = false;
-        closed |= close;
-        if (endpointListener != null) {
-            endpointListener.stop();
-        }
-        if (endpointListenerTracker != null) {
-            endpointListenerTracker.close();
-        }
-        if (imManager != null) {
-            imManager.close();
-        }
-        if (zkClient != null) {
-            try {
-                zkClient.close();
-            } catch (InterruptedException e) {
-                LOG.error("Error closing ZooKeeper", e);
-            }
-        }
     }
 
-    protected ZooKeeper createZooKeeper(String host, String port, int timeout) throws IOException {
-        LOG.info("ZooKeeper discovery connecting to {}:{} with timeout {}", host, port, timeout);
-        return new ZooKeeper(host + ":" + port, timeout, this);
+    @Deactivate
+    public void deactivate() {
+        LOG.debug("Stopping ZookeeperDiscovery");
+        endpointListener.stop();
+        endpointListenerTracker.close();
+        imManager.close();
+        repository.close();
     }
 
-    /* Callback for ZooKeeper */
-    public void process(WatchedEvent event) {
-        LOG.debug("got ZooKeeper event " + event);
-        switch (event.getState()) {
-        case SyncConnected:
-            LOG.info("Connection to ZooKeeper established");
-            // this event can be triggered more than once in a row (e.g. after Disconnected event),
-            // so we must be re-entrant here
-            start();
-            break;
-
-        case Expired:
-            LOG.info("Connection to ZooKeeper expired. Trying to create a new connection");
-            stop(false);
-            try {
-                createZookeeper(curConfiguration);
-            } catch (IOException e) {
-                LOG.error("Error starting zookeeper client", e);
-            }
-            break;
-
-        default:
-            // ignore other events
-            break;
-        }
-    }
-
-    private void createZookeeper(Dictionary<String, ?> config) throws IOException {
-        String host = (String)getWithDefault(config, "zookeeper.host", "localhost");
-        String port = (String)getWithDefault(config, "zookeeper.port", "2181");
-        int timeout = Integer.parseInt((String)getWithDefault(config, "zookeeper.timeout", "3000"));
-        waitPort(host, Integer.parseInt(port));
-        zkClient = createZooKeeper(host, port, timeout);
-    }
-
-    private void waitPort(String host, int port) {
-        long start = System.currentTimeMillis();
-        while (System.currentTimeMillis() - start < 2000) {
-            try (Socket socket = new Socket(host, port)) {
-                return;
-            } catch (IOException e) {
-                safeSleep();
-            }
-        }
-    }
-
-    private void safeSleep() {
-        try {
-            Thread.sleep(100);
-        } catch (InterruptedException e1) {
-        }
-    }
-    
-    public Object getWithDefault(Dictionary<String, ?> config, String key, Object defaultValue) {
-        Object value = config.get(key);
-        return value != null ? value : defaultValue;
-    }
-    
-    /**
-     * Converts the given Dictionary to a Map.
-     *
-     * @param dict a dictionary
-     * @param <K> the key type
-     * @param <V> the value type
-     * @return the converted map, or an empty map if the given dictionary is null
-     */
-    public static <K, V> Map<K, V> toMap(Dictionary<K, V> dict) {
-        Map<K, V> map = new HashMap<>();
-        if (dict != null) {
-            Enumeration<K> keys = dict.keys();
-            while (keys.hasMoreElements()) {
-                K key = keys.nextElement();
-                map.put(key, dict.get(key));
-            }
-        }
-        return map;
-    }
 }
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepository.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepository.java
index c134f81..5b0d66b 100644
--- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepository.java
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepository.java
@@ -21,7 +21,6 @@ package org.apache.aries.rsa.discovery.zookeeper.repository;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.Closeable;
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -57,12 +56,7 @@ public class ZookeeperEndpointRepository implements Closeable, Watcher {
     private Map<String, EndpointDescription> nodes = new ConcurrentHashMap<>();
     
     public ZookeeperEndpointRepository(ZooKeeper zk) {
-        this(zk, null);
-    }
-    
-    public ZookeeperEndpointRepository(ZooKeeper zk, EndpointEventListener listener) {
         this.zk = zk;
-        this.listener = listener;
         this.parser = new EndpointDescriptionParser();
         try {
             createPath(PATH_PREFIX);
@@ -142,18 +136,18 @@ public class ZookeeperEndpointRepository implements Closeable, Watcher {
         case NodeCreated:
         case NodeDataChanged:
         case NodeChildrenChanged:
-        	watchRecursive(event.getPath());
-        	break;
-		case NodeDeleted:
-			handleRemoved(event.getPath());
-			break;
-		default:
-			break;
-		}
+            watchRecursive(event.getPath());
+            break;
+        case NodeDeleted:
+            handleRemoved(event.getPath());
+            break;
+        default:
+            break;
+        }
     }
 
     @Override
-    public void close() throws IOException {
+    public void close() {
         nodes.clear();
     }
 
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/server/ZookeeperStarter.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/server/ZookeeperStarter.java
index 65341cd..c5a44ee 100644
--- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/server/ZookeeperStarter.java
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/server/ZookeeperStarter.java
@@ -20,11 +20,8 @@ package org.apache.aries.rsa.discovery.zookeeper.server;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.Set;
 
 import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
 import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
diff --git a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/ClientManagerTest.java b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/ClientManagerTest.java
new file mode 100644
index 0000000..360bf88
--- /dev/null
+++ b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/ClientManagerTest.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.discovery.zookeeper;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.aries.rsa.discovery.zookeeper.ClientManager.DiscoveryConfig;
+import org.apache.zookeeper.ZooKeeper;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
+import org.osgi.framework.BundleContext;
+import org.osgi.service.cm.ConfigurationException;
+import org.osgi.util.converter.Converters;
+
+public class ClientManagerTest {
+
+    @Rule
+    public MockitoRule rule = MockitoJUnit.rule();
+    
+    @Mock
+    BundleContext context;
+    
+    @Mock
+    ZooKeeper zookeeper;
+    
+    Semaphore sem = new Semaphore(0);
+    
+    private DiscoveryConfig config;
+    private ClientManager zkd;
+    
+    @Before
+    public void before() {
+        zkd = new ClientManager() {
+            @Override
+            protected ZooKeeper createClient(DiscoveryConfig config) {
+                ClientManagerTest.this.config = config;
+                sem.release();
+                return zookeeper;
+            }  
+        };
+    }
+    
+    @Test
+    public void testDefaults() throws ConfigurationException, InstantiationException, IllegalAccessException, InterruptedException {
+        Map<String, Object> configuration = new HashMap<>();
+        
+        zkd.activate(convert(configuration), context);
+        
+        sem.tryAcquire(10, TimeUnit.SECONDS);
+        assertEquals("localhost", config.zookeeper_host());
+        assertEquals("2181", config.zookeeper_port());
+        assertEquals(3000, config.zookeeper_timeout());
+    }
+
+    @Test
+    public void testConfig() throws ConfigurationException, InterruptedException {
+        Map<String, Object> configuration = new HashMap<>();
+        configuration.put("zookeeper.host", "myhost");
+        configuration.put("zookeeper.port", "1");
+        configuration.put("zookeeper.timeout", "1000");
+        
+        DiscoveryConfig config2 = convert(configuration);
+        assertEquals("myhost", config2.zookeeper_host());
+        zkd.activate(config2, context);
+        
+        sem.tryAcquire(10, TimeUnit.SECONDS);
+        assertEquals("myhost", config.zookeeper_host());
+        assertEquals("1", config.zookeeper_port());
+        assertEquals(1000, config.zookeeper_timeout());
+    }
+
+    private DiscoveryConfig convert(Map<String, Object> configuration) {
+        return Converters.standardConverter().convert(configuration).to(DiscoveryConfig.class);
+    }
+}
diff --git a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/ZookeeperDiscoveryTest.java b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/ZookeeperDiscoveryTest.java
deleted file mode 100644
index 96a3028..0000000
--- a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/ZookeeperDiscoveryTest.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.aries.rsa.discovery.zookeeper;
-
-import java.util.Dictionary;
-import java.util.Hashtable;
-
-import org.apache.zookeeper.ZooKeeper;
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnit;
-import org.mockito.junit.MockitoRule;
-import org.osgi.framework.BundleContext;
-import org.osgi.service.cm.ConfigurationException;
-
-public class ZookeeperDiscoveryTest {
-
-    @Rule
-    public MockitoRule rule = MockitoJUnit.rule();
-    
-    @Mock
-    BundleContext bc;
-    
-    @Test
-    public void testDefaults() throws ConfigurationException {
-        ZooKeeperDiscovery zkd = new ZooKeeperDiscovery(bc) {
-            @Override
-            protected ZooKeeper createZooKeeper(String host, String port, int timeout) {
-                Assert.assertEquals("localhost", host);
-                Assert.assertEquals("2181", port);
-                Assert.assertEquals(3000, timeout);
-                return null;
-            }  
-        };
-        
-        Dictionary<String, Object> configuration = new Hashtable<>();
-        zkd.updated(configuration);
-    }
-    
-    @Test
-    public void testConfig() throws ConfigurationException {
-        ZooKeeperDiscovery zkd = new ZooKeeperDiscovery(bc) {
-            @Override
-            protected ZooKeeper createZooKeeper(String host, String port, int timeout) {
-                Assert.assertEquals("myhost", host);
-                Assert.assertEquals("1", port);
-                Assert.assertEquals(1000, timeout);
-                return null;
-            }  
-        };
-        
-        Dictionary<String, Object> configuration = new Hashtable<>();
-        configuration.put("zookeeper.host", "myhost");
-        configuration.put("zookeeper.port", "1");
-        configuration.put("zookeeper.timeout", "1000");
-        zkd.updated(configuration);
-    }
-}
diff --git a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepositoryTest.java b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepositoryTest.java
index 00d1a84..a65696b 100644
--- a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepositoryTest.java
+++ b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepositoryTest.java
@@ -19,7 +19,6 @@
 package org.apache.aries.rsa.discovery.zookeeper.repository;
 
 import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.samePropertyValuesAs;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
@@ -103,7 +102,8 @@ public class ZookeeperEndpointRepositoryTest {
                 sem.release();
             }
         };
-        ZookeeperEndpointRepository repository = new ZookeeperEndpointRepository(zk, listener);
+        ZookeeperEndpointRepository repository = new ZookeeperEndpointRepository(zk);
+        repository.addListener(listener);
         
         EndpointDescription endpoint = createEndpoint();
         repository.add(endpoint);