You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aries.apache.org by cs...@apache.org on 2016/03/14 17:49:38 UTC

[4/5] aries-rsa git commit: Adding roundtrip test

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitor.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitor.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitor.java
new file mode 100644
index 0000000..f50848f
--- /dev/null
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitor.java
@@ -0,0 +1,262 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.discovery.zookeeper.subscribe;
+
+import java.io.ByteArrayInputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.aries.rsa.discovery.endpoint.EndpointDescriptionParser;
+import org.apache.aries.rsa.discovery.endpoint.PropertiesMapper;
+import org.apache.aries.rsa.discovery.zookeeper.util.Utils;
+import org.apache.zookeeper.AsyncCallback.StatCallback;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+import org.osgi.service.remoteserviceadmin.EndpointDescription;
+import org.osgi.service.remoteserviceadmin.EndpointListener;
+import org.osgi.xmlns.rsa.v1_0.EndpointDescriptionType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Monitors ZooKeeper for changes in published endpoints.
+ * <p>
+ * Specifically, it monitors the node path associated with a given interface class,
+ * whose data is a serialized version of an EndpointDescription, and notifies an
+ * EndpointListener when changes are detected (which can then propagate the
+ * notification to other EndpointListeners with a matching scope).
+ * <p>
+ * Note that the EndpointListener is used here as a decoupling interface for
+ * convenience, and is not necessarily used according to its documented contract.
+ */
+public class InterfaceMonitor implements Watcher, StatCallback {
+
+    private static final Logger LOG = LoggerFactory.getLogger(InterfaceMonitor.class);
+
+    private final String znode;
+    private final ZooKeeper zk;
+    private final EndpointListener endpointListener;
+    private final boolean recursive;
+    private volatile boolean closed;
+
+    // This map reference changes, so don't synchronize on it
+    private Map<String, EndpointDescription> nodes = new HashMap<String, EndpointDescription>();
+
+    private EndpointDescriptionParser parser;
+
+    public InterfaceMonitor(ZooKeeper zk, String objClass, EndpointListener endpointListener, String scope) {
+        this.zk = zk;
+        this.znode = Utils.getZooKeeperPath(objClass);
+        this.recursive = objClass == null || objClass.isEmpty();
+        this.endpointListener = endpointListener;
+        this.parser = new EndpointDescriptionParser();
+        LOG.debug("Creating new InterfaceMonitor {} for scope [{}] and objectClass [{}]",
+                new Object[] {recursive ? "(recursive)" : "", scope, objClass});
+    }
+
+    /**
+     * Returns all endpoints that are currently known to this monitor.
+     *
+     * @return all endpoints that are currently known to this monitor
+     */
+    public synchronized List<EndpointDescription> getEndpoints() {
+        return new ArrayList<EndpointDescription>(nodes.values());
+    }
+
+    public void start() {
+        watch();
+    }
+
+    private void watch() {
+        LOG.debug("registering a ZooKeeper.exists({}) callback", znode);
+        zk.exists(znode, this, this, null);
+    }
+
+    /**
+     * Zookeeper Watcher interface callback.
+     */
+    public void process(WatchedEvent event) {
+        LOG.debug("ZooKeeper watcher callback on node {} for event {}", znode, event);
+        processDelta();
+    }
+
+    /**
+     * Zookeeper StatCallback interface callback.
+     */
+    @SuppressWarnings("deprecation")
+    public void processResult(int rc, String path, Object ctx, Stat stat) {
+        LOG.debug("ZooKeeper callback on node: {} code: {}", znode, rc);
+
+        switch (rc) {
+        case Code.Ok:
+        case Code.NoNode:
+            processDelta();
+            return;
+
+        case Code.SessionExpired:
+        case Code.NoAuth:
+        case Code.ConnectionLoss:
+            return;
+
+        default:
+            watch();
+        }
+    }
+
+    private void processDelta() {
+        if (closed) {
+            return;
+        }
+
+        if (zk.getState() != ZooKeeper.States.CONNECTED) {
+            LOG.debug("ZooKeeper connection was already closed! Not processing changed event.");
+            return;
+        }
+
+        try {
+            if (zk.exists(znode, false) != null) {
+                zk.getChildren(znode, this);
+                refreshNodes();
+            } else {
+                LOG.debug("znode {} doesn't exist -> not processing any changes", znode);
+            }
+        } catch (Exception e) {
+            if (zk.getState() != ZooKeeper.States.CONNECTED) {
+                LOG.debug("Error getting Zookeeper data: " + e); // e.g. session expired, handled by ZooKeeperDiscovery
+            } else {
+                LOG.error("Error getting ZooKeeper data.", e);
+            }
+        }
+    }
+
+    public synchronized void close() {
+        closed = true;
+        for (EndpointDescription endpoint : nodes.values()) {
+            endpointListener.endpointRemoved(endpoint, null);
+        }
+        nodes.clear();
+    }
+
+    private synchronized void refreshNodes() {
+        if (closed) {
+            return;
+        }
+        LOG.info("Processing change on node: {}", znode);
+
+        Map<String, EndpointDescription> newNodes = new HashMap<String, EndpointDescription>();
+        Map<String, EndpointDescription> prevNodes = new HashMap<String, EndpointDescription>(nodes);
+        processChildren(znode, newNodes, prevNodes);
+
+        // whatever is left in prevNodes now has been removed from Discovery
+        LOG.debug("processChildren done. Nodes that are missing now and need to be removed: {}", prevNodes.values());
+        for (EndpointDescription endpoint : prevNodes.values()) {
+            endpointListener.endpointRemoved(endpoint, null);
+        }
+        nodes = newNodes;
+    }
+
+    /**
+     * Iterates through all child nodes of the given node and tries to find
+     * endpoints. If the recursive flag is set it also traverses into the child
+     * nodes.
+     *
+     * @return true if an endpoint was found and if the node therefore needs to
+     *         be monitored for changes
+     */
+    private boolean processChildren(String zn, Map<String, EndpointDescription> newNodes,
+            Map<String, EndpointDescription> prevNodes) {
+        List<String> children;
+        try {
+            LOG.debug("Processing the children of {}", zn);
+            children = zk.getChildren(zn, false);
+
+            boolean foundANode = false;
+            for (String child : children) {
+                String childZNode = zn + '/' + child;
+                EndpointDescription endpoint = getEndpointDescriptionFromNode(childZNode);
+                if (endpoint != null) {
+                    EndpointDescription prevEndpoint = prevNodes.get(child);
+                    LOG.info("found new node " + zn + "/[" + child + "]   ( []->child )  props: "
+                            + endpoint.getProperties().values());
+                    newNodes.put(child, endpoint);
+                    prevNodes.remove(child);
+                    foundANode = true;
+                    LOG.debug("Properties: {}", endpoint.getProperties());
+                    if (prevEndpoint == null) {
+                        // This guy is new
+                        endpointListener.endpointAdded(endpoint, null);
+                    } else if (!prevEndpoint.getProperties().equals(endpoint.getProperties())) {
+                        // TODO
+                    }
+                }
+                if (recursive && processChildren(childZNode, newNodes, prevNodes)) {
+                    zk.getChildren(childZNode, this);
+                }
+            }
+
+            return foundANode;
+        } catch (KeeperException e) {
+            LOG.error("Problem processing ZooKeeper node", e);
+        } catch (InterruptedException e) {
+            LOG.error("Problem processing ZooKeeper node", e);
+        }
+        return false;
+    }
+
+    /**
+     * Retrieves data from the given node and parses it into an EndpointDescription.
+     *
+     * @param node a node path
+     * @return endpoint found in the node or null if no endpoint was found
+     */
+    private EndpointDescription getEndpointDescriptionFromNode(String node) {
+        try {
+            Stat stat = zk.exists(node, false);
+            if (stat == null || stat.getDataLength() <= 0) {
+                return null;
+            }
+            byte[] data = zk.getData(node, false, null);
+            LOG.debug("Got data for node: {}", node);
+
+            EndpointDescription endpoint = getFirstEnpointDescription(data);
+            if (endpoint != null) {
+                return endpoint;
+            }
+            LOG.warn("No Discovery information found for node: {}", node);
+        } catch (Exception e) {
+            LOG.error("Problem getting EndpointDescription from node " + node, e);
+        }
+        return null;
+    }
+
+    public EndpointDescription getFirstEnpointDescription(byte[] data) {
+        List<EndpointDescriptionType> elements = parser.getEndpointDescriptions(new ByteArrayInputStream(data));
+        if (elements.isEmpty()) {
+            return null;
+        }
+        Map<String, Object> props = new PropertiesMapper().toProps(elements.get(0).getProperty());
+        return new EndpointDescription(props);
+    }
+}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManager.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManager.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManager.java
new file mode 100644
index 0000000..3a02a48
--- /dev/null
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManager.java
@@ -0,0 +1,261 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.discovery.zookeeper.subscribe;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.aries.rsa.discovery.zookeeper.ZooKeeperDiscovery;
+import org.apache.aries.rsa.discovery.zookeeper.util.Utils;
+import org.apache.aries.rsa.util.StringPlus;
+import org.apache.zookeeper.ZooKeeper;
+import org.osgi.framework.Bundle;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.Filter;
+import org.osgi.framework.ServiceReference;
+import org.osgi.service.remoteserviceadmin.EndpointDescription;
+import org.osgi.service.remoteserviceadmin.EndpointListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manages the EndpointListeners and the scopes they are interested in.
+ * For each scope with interested EndpointListeners an InterfaceMonitor is created.
+ * The InterfaceMonitor calls back when it detects added or removed external Endpoints.
+ * These events are then forwarded to all interested EndpointListeners.
+ */
+public class InterfaceMonitorManager {
+    private static final Logger LOG = LoggerFactory.getLogger(InterfaceMonitorManager.class);
+    private static final Pattern OBJECTCLASS_PATTERN = Pattern.compile(".*\\(objectClass=([^)]+)\\).*");
+
+    private final BundleContext bctx;
+    private final ZooKeeper zk;
+    // map of EndpointListeners and the scopes they are interested in
+    private final Map<ServiceReference<EndpointListener>, List<String>> endpointListenerScopes =
+            new HashMap<ServiceReference<EndpointListener>, List<String>>();
+    // map of scopes and their interest data
+    private final Map<String, Interest> interests = new HashMap<String, Interest>();
+
+    protected static class Interest {
+        List<ServiceReference<EndpointListener>> endpointListeners = 
+            new CopyOnWriteArrayList<ServiceReference<EndpointListener>>();
+        InterfaceMonitor monitor;
+    }
+
+    public InterfaceMonitorManager(BundleContext bctx, ZooKeeper zk) {
+        this.bctx = bctx;
+        this.zk = zk;
+    }
+
+    public void addInterest(ServiceReference<EndpointListener> endpointListener) {
+        if (isOurOwnEndpointListener(endpointListener)) {
+            LOG.debug("Skipping our own EndpointListener");
+            return;
+        }
+
+        LOG.info("updating EndpointListener interests: {}", endpointListener);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("updated EndpointListener properties: {}", getProperties(endpointListener));
+        }
+        for (String scope : getScopes(endpointListener)) {
+            String objClass = getObjectClass(scope);
+            LOG.debug("Adding interest in scope {}, objectClass {}", scope, objClass);
+            addInterest(endpointListener, scope, objClass);
+        }
+    }
+
+    private static boolean isOurOwnEndpointListener(ServiceReference<EndpointListener> endpointListener) {
+        return Boolean.parseBoolean(String.valueOf(
+                endpointListener.getProperty(ZooKeeperDiscovery.DISCOVERY_ZOOKEEPER_ID)));
+    }
+
+    public synchronized void addInterest(ServiceReference<EndpointListener> endpointListener, 
+                                         String scope, String objClass) {
+        // get or create interest for given scope and add listener to it
+        Interest interest = interests.get(scope);
+        if (interest == null) {
+            // create interest, add listener and start monitor
+            interest = new Interest();
+            interests.put(scope, interest);
+            interest.endpointListeners.add(endpointListener); // add it before monitor starts so we don't miss events
+            interest.monitor = createInterfaceMonitor(scope, objClass, interest);
+            interest.monitor.start();
+        } else {
+            // interest already exists, so just add listener to it
+            if (!interest.endpointListeners.contains(endpointListener)) {
+                interest.endpointListeners.add(endpointListener);
+            }
+            // notify listener of all known endpoints for given scope
+            // (as EndpointListener contract requires of all added/modified listeners)
+            for (EndpointDescription endpoint : interest.monitor.getEndpoints()) {
+                notifyListeners(endpoint, scope, true, Arrays.asList(endpointListener));
+            }
+        }
+
+        // add scope to listener's scopes list
+        List<String> scopes = endpointListenerScopes.get(endpointListener);
+        if (scopes == null) {
+            scopes = new ArrayList<String>(1);
+            endpointListenerScopes.put(endpointListener, scopes);
+        }
+        if (!scopes.contains(scope)) {
+            scopes.add(scope);
+        }
+    }
+
+    public synchronized void removeInterest(ServiceReference<EndpointListener> endpointListener) {
+        LOG.info("removing EndpointListener interests: {}", endpointListener);
+        List<String> scopes = endpointListenerScopes.get(endpointListener);
+        if (scopes == null) {
+            return;
+        }
+
+        for (String scope : scopes) {
+            Interest interest = interests.get(scope);
+            if (interest != null) {
+                interest.endpointListeners.remove(endpointListener);
+                if (interest.endpointListeners.isEmpty()) {
+                    interest.monitor.close();
+                    interests.remove(scope);
+                }
+            }
+        }
+        endpointListenerScopes.remove(endpointListener);
+    }
+
+    protected InterfaceMonitor createInterfaceMonitor(final String scope, String objClass, final Interest interest) {
+        // holding this object's lock in the callbacks can lead to a deadlock with InterfaceMonitor
+        EndpointListener endpointListener = new EndpointListener() {
+
+            public void endpointRemoved(EndpointDescription endpoint, String matchedFilter) {
+                notifyListeners(endpoint, scope, false, interest.endpointListeners);
+            }
+
+            public void endpointAdded(EndpointDescription endpoint, String matchedFilter) {
+                notifyListeners(endpoint, scope, true, interest.endpointListeners);
+            }
+        };
+        return new InterfaceMonitor(zk, objClass, endpointListener, scope);
+    }
+
+    private void notifyListeners(EndpointDescription endpoint, String currentScope, boolean isAdded,
+            List<ServiceReference<EndpointListener>> endpointListeners) {
+        for (ServiceReference<EndpointListener> endpointListenerRef : endpointListeners) {
+            EndpointListener service = bctx.getService(endpointListenerRef);
+            try {
+                EndpointListener endpointListener = (EndpointListener)service;
+                LOG.trace("matching {} against {}", endpoint, currentScope);
+                if (matchFilter(bctx, currentScope, endpoint)) {
+                    LOG.debug("Matched {} against {}", endpoint, currentScope);
+                    notifyListener(endpoint, currentScope, isAdded, endpointListenerRef.getBundle(),
+                                   endpointListener);
+                }
+            } finally {
+                if (service != null) {
+                    bctx.ungetService(endpointListenerRef);
+                }
+            }
+        }
+    }
+    
+    private static boolean matchFilter(BundleContext bctx, String filter, EndpointDescription endpoint) {
+        if (filter == null) {
+            return false;
+        }
+    
+        try {
+            Filter f = bctx.createFilter(filter);
+            Dictionary<String, Object> dict = new Hashtable<String, Object>(endpoint.getProperties());
+            return f.match(dict);
+        } catch (Exception e) {
+            return false;
+        }
+    }
+
+
+    private void notifyListener(EndpointDescription endpoint, String currentScope, boolean isAdded,
+                                Bundle endpointListenerBundle, EndpointListener endpointListener) {
+        if (endpointListenerBundle == null) {
+            LOG.info("listening service was unregistered, ignoring");
+        } else if (isAdded) {
+            LOG.info("calling EndpointListener.endpointAdded: " + endpointListener + " from bundle "
+                    + endpointListenerBundle.getSymbolicName() + " for endpoint: " + endpoint);
+            endpointListener.endpointAdded(endpoint, currentScope);
+        } else {
+            LOG.info("calling EndpointListener.endpointRemoved: " + endpointListener + " from bundle "
+                    + endpointListenerBundle.getSymbolicName() + " for endpoint: " + endpoint);
+            endpointListener.endpointRemoved(endpoint, currentScope);
+        }
+    }
+
+    public synchronized void close() {
+        for (Interest interest : interests.values()) {
+            interest.monitor.close();
+        }
+        interests.clear();
+        endpointListenerScopes.clear();
+    }
+
+    /**
+     * Only for test case!
+     */
+    protected synchronized Map<String, Interest> getInterests() {
+        return interests;
+    }
+
+    /**
+     * Only for test case!
+     */
+    protected synchronized Map<ServiceReference<EndpointListener>, List<String>> getEndpointListenerScopes() {
+        return endpointListenerScopes;
+    }
+
+    protected List<String> getScopes(ServiceReference<?> sref) {
+        return Utils.removeEmpty(StringPlus.normalize(sref.getProperty(EndpointListener.ENDPOINT_LISTENER_SCOPE)));
+    }
+    
+    public static String getObjectClass(String scope) {
+        Matcher m = OBJECTCLASS_PATTERN.matcher(scope);
+        return m.matches() ? m.group(1) : null;
+    }
+
+    /**
+     * Returns a service's properties as a map.
+     *
+     * @param serviceReference a service reference
+     * @return the service's properties as a map
+     */
+    public static Map<String, Object> getProperties(ServiceReference<?> serviceReference) {
+        String[] keys = serviceReference.getPropertyKeys();
+        Map<String, Object> props = new HashMap<String, Object>(keys.length);
+        for (String key : keys) {
+            Object val = serviceReference.getProperty(key);
+            props.put(key, val);
+        }
+        return props;
+    }
+}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/util/Utils.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/util/Utils.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/util/Utils.java
new file mode 100644
index 0000000..82ccb85
--- /dev/null
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/util/Utils.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.discovery.zookeeper.util;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public final class Utils {
+
+    static final String PATH_PREFIX = "/osgi/service_registry";
+
+    private Utils() {
+        // never constructed
+    }
+
+    public static String getZooKeeperPath(String name) {
+        return name == null || name.isEmpty() ? PATH_PREFIX : PATH_PREFIX + '/' + name.replace('.', '/');
+    }
+
+    /**
+     * Removes nulls and empty strings from the given string array.
+     *
+     * @param strings an array of strings
+     * @return a new array containing the non-null and non-empty
+     *         elements of the original array in the same order
+     */
+    public static List<String> removeEmpty(List<String> strings) {
+        List<String> result = new ArrayList<String>();
+        for (String s : strings) {
+            if (s != null && !s.isEmpty()) {
+                result.add(s);
+            }
+        }
+        return result;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/Activator.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/Activator.java b/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/Activator.java
deleted file mode 100644
index 1e6c551..0000000
--- a/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/Activator.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.dosgi.discovery.zookeeper;
-
-import java.util.Dictionary;
-import java.util.Hashtable;
-
-import org.osgi.framework.BundleActivator;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.Constants;
-import org.osgi.service.cm.ManagedService;
-
-public class Activator implements BundleActivator {
-
-    private ZooKeeperDiscovery zkd;
-
-    public synchronized void start(BundleContext bc) throws Exception {
-        zkd = new ZooKeeperDiscovery(bc);
-        Dictionary<String, String> props = new Hashtable<String, String>();
-        props.put(Constants.SERVICE_PID, "org.apache.aries.rsa.discovery.zookeeper");
-        bc.registerService(ManagedService.class.getName(), zkd, props);
-    }
-
-    public synchronized void stop(BundleContext bc) throws Exception {
-        zkd.stop(true);
-    }
-}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/ZooKeeperDiscovery.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/ZooKeeperDiscovery.java b/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/ZooKeeperDiscovery.java
deleted file mode 100644
index 3a7f2c4..0000000
--- a/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/ZooKeeperDiscovery.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.dosgi.discovery.zookeeper;
-
-import java.io.IOException;
-import java.util.Dictionary;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.cxf.dosgi.discovery.zookeeper.publish.PublishingEndpointListenerFactory;
-import org.apache.cxf.dosgi.discovery.zookeeper.subscribe.EndpointListenerTracker;
-import org.apache.cxf.dosgi.discovery.zookeeper.subscribe.InterfaceMonitorManager;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
-import org.osgi.framework.BundleContext;
-import org.osgi.service.cm.ConfigurationException;
-import org.osgi.service.cm.ManagedService;
-import org.osgi.service.remoteserviceadmin.EndpointListener;
-import org.osgi.util.tracker.ServiceTracker;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ZooKeeperDiscovery implements Watcher, ManagedService {
-
-    public static final String DISCOVERY_ZOOKEEPER_ID = "org.apache.cxf.dosgi.discovery.zookeeper";
-
-    private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperDiscovery.class);
-
-    private final BundleContext bctx;
-
-    private PublishingEndpointListenerFactory endpointListenerFactory;
-    private ServiceTracker<EndpointListener, EndpointListener> endpointListenerTracker;
-    private InterfaceMonitorManager imManager;
-    private ZooKeeper zkClient;
-    private boolean closed;
-    private boolean started;
-
-    private Dictionary<String, ?> curConfiguration;
-
-    public ZooKeeperDiscovery(BundleContext bctx) {
-        this.bctx = bctx;
-    }
-
-    public synchronized void updated(Dictionary<String, ?> configuration) throws ConfigurationException {
-        LOG.debug("Received configuration update for Zookeeper Discovery: {}", configuration);
-        // make changes only if config actually changed, to prevent unnecessary ZooKeeper reconnections
-        if (!ZooKeeperDiscovery.toMap(configuration).equals(ZooKeeperDiscovery.toMap(curConfiguration))) {
-            stop(false);
-            curConfiguration = configuration;
-            // config is null if it doesn't exist, is being deleted or has not yet been loaded
-            // in which case we just stop running
-            if (!closed && configuration != null) {
-                try {
-                    createZookeeper(configuration);
-                } catch (IOException e) {
-                    throw new ConfigurationException(null, "Error starting zookeeper client", e);
-                }
-            }
-        }
-    }
-
-    private synchronized void start() {
-        if (closed) {
-            return;
-        }
-        if (started) {
-            // we must be re-entrant, i.e. can be called when already started
-            LOG.debug("ZookeeperDiscovery already started");
-            return;
-        }
-        LOG.debug("starting ZookeeperDiscovery");
-        endpointListenerFactory = new PublishingEndpointListenerFactory(zkClient, bctx);
-        endpointListenerFactory.start();
-        imManager = new InterfaceMonitorManager(bctx, zkClient);
-        endpointListenerTracker = new EndpointListenerTracker(bctx, imManager);
-        endpointListenerTracker.open();
-        started = true;
-    }
-
-    public synchronized void stop(boolean close) {
-        if (started) {
-            LOG.debug("stopping ZookeeperDiscovery");
-        }
-        started = false;
-        closed |= close;
-        if (endpointListenerFactory != null) {
-            endpointListenerFactory.stop();
-        }
-        if (endpointListenerTracker != null) {
-            endpointListenerTracker.close();
-        }
-        if (imManager != null) {
-            imManager.close();
-        }
-        if (zkClient != null) {
-            try {
-                zkClient.close();
-            } catch (InterruptedException e) {
-                LOG.error("Error closing ZooKeeper", e);
-            }
-        }
-    }
-
-    protected ZooKeeper createZooKeeper(String host, String port, int timeout) throws IOException {
-        LOG.info("ZooKeeper discovery connecting to {}:{} with timeout {}",
-                new Object[]{host, port, timeout});
-        return new ZooKeeper(host + ":" + port, timeout, this);
-    }
-
-    /* Callback for ZooKeeper */
-    public void process(WatchedEvent event) {
-        LOG.debug("got ZooKeeper event " + event);
-        switch (event.getState()) {
-        case SyncConnected:
-            LOG.info("Connection to ZooKeeper established");
-            // this event can be triggered more than once in a row (e.g. after Disconnected event),
-            // so we must be re-entrant here
-            start();
-            break;
-
-        case Expired:
-            LOG.info("Connection to ZooKeeper expired. Trying to create a new connection");
-            stop(false);
-            try {
-                createZookeeper(curConfiguration);
-            } catch (IOException e) {
-                LOG.error("Error starting zookeeper client", e);
-            }
-            break;
-
-        default:
-            // ignore other events
-            break;
-        }
-    }
-
-    private void createZookeeper(Dictionary<String, ?> config) throws IOException {
-        String host = (String)getWithDefault(config, "zookeeper.host", "localhost");
-        String port = (String)getWithDefault(config, "zookeeper.port", "2181");
-        int timeout = Integer.parseInt((String)getWithDefault(config, "zookeeper.timeout", "3000"));
-        zkClient = createZooKeeper(host, port, timeout);
-    }
-    
-    public Object getWithDefault(Dictionary<String, ?> config, String key, Object defaultValue) {
-        Object value = config.get(key);
-        return value != null ? value : defaultValue;
-    }
-    
-    /**
-     * Converts the given Dictionary to a Map.
-     *
-     * @param dict a dictionary
-     * @param <K> the key type
-     * @param <V> the value type
-     * @return the converted map, or an empty map if the given dictionary is null
-     */
-    public static <K, V> Map<K, V> toMap(Dictionary<K, V> dict) {
-        Map<K, V> map = new HashMap<K, V>();
-        if (dict != null) {
-            Enumeration<K> keys = dict.keys();
-            while (keys.hasMoreElements()) {
-                K key = keys.nextElement();
-                map.put(key, dict.get(key));
-            }
-        }
-        return map;
-    }
-}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/publish/DiscoveryPlugin.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/publish/DiscoveryPlugin.java b/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/publish/DiscoveryPlugin.java
deleted file mode 100644
index 5d46585..0000000
--- a/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/publish/DiscoveryPlugin.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.dosgi.discovery.zookeeper.publish;
-
-import java.util.Map;
-
-/**
- * This interface allows transformation of service registration information before it is pushed into the ZooKeeper
- * discovery system.
- * It can be useful for situations where a host name or port number needs to be changed in cases where the host running
- * the service is known differently from the outside to what the local Java process thinks it is.
- * Extra service properties can also be added to the registration which can be useful to refine the remote service
- * lookup process. <p/>
- *
- * DiscoveryPlugins use the OSGi WhiteBoard pattern. To add one to the system, register an instance under this interface
- * with the OSGi Service Registry. All registered DiscoveryPlugin instances are visited and given a chance to
- * process the information before it is pushed into ZooKeeper. <p/>
- *
- * Note that the changes made using this plugin do not modify the local service registration.
- *
- */
-public interface DiscoveryPlugin {
-
-    /**
-     * Process service registration information. Plugins can change this information before it is published into the
-     * ZooKeeper discovery system.
-     *
-     * @param mutableProperties A map of service registration properties. The map is mutable and any changes to the map
-     * will be reflected in the ZooKeeper registration.
-     * @param endpointKey The key under which the service is registered in ZooKeeper. This key typically has the
-     * following format: hostname#port##context. While the actual value of this key is not actually used by the
-     * system (people can use it as a hint to understand where the service is located), the value <i>must</i> be
-     * unique for all services of a given type.
-     * @return The <tt>endpointKey</tt> value to be used. If there is no need to change this simply return the value
-     * of the <tt>endpointKey</tt> parameter.
-     */
-    String process(Map<String, Object> mutableProperties, String endpointKey);
-}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/publish/PublishingEndpointListener.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/publish/PublishingEndpointListener.java b/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/publish/PublishingEndpointListener.java
deleted file mode 100644
index 9bcfe72..0000000
--- a/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/publish/PublishingEndpointListener.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.dosgi.discovery.zookeeper.publish;
-
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.aries.rsa.discovery.endpoint.EndpointDescriptionParser;
-import org.apache.aries.rsa.discovery.endpoint.PropertiesMapper;
-import org.apache.cxf.dosgi.discovery.zookeeper.util.Utils;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.NoNodeException;
-import org.apache.zookeeper.KeeperException.NodeExistsException;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.ZooKeeper;
-import org.osgi.framework.BundleContext;
-import org.osgi.service.remoteserviceadmin.EndpointDescription;
-import org.osgi.service.remoteserviceadmin.EndpointListener;
-import org.osgi.util.tracker.ServiceTracker;
-import org.osgi.xmlns.rsa.v1_0.EndpointDescriptionType;
-import org.osgi.xmlns.rsa.v1_0.PropertyType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Listens for local Endpoints and publishes them to ZooKeeper.
- */
-public class PublishingEndpointListener implements EndpointListener {
-
-    private static final Logger LOG = LoggerFactory.getLogger(PublishingEndpointListener.class);
-
-    private final ZooKeeper zk;
-    private final ServiceTracker<DiscoveryPlugin, DiscoveryPlugin> discoveryPluginTracker;
-    private final List<EndpointDescription> endpoints = new ArrayList<EndpointDescription>();
-    private boolean closed;
-
-    private final EndpointDescriptionParser endpointDescriptionParser;
-
-    public PublishingEndpointListener(ZooKeeper zk, BundleContext bctx) {
-        this.zk = zk;
-        discoveryPluginTracker = new ServiceTracker<DiscoveryPlugin, DiscoveryPlugin>(bctx, 
-            DiscoveryPlugin.class, null);
-        discoveryPluginTracker.open();
-        endpointDescriptionParser = new EndpointDescriptionParser();
-    }
-
-    public void endpointAdded(EndpointDescription endpoint, String matchedFilter) {
-        LOG.info("Local EndpointDescription added: {}", endpoint);
-
-        synchronized (endpoints) {
-            if (closed) {
-                return;
-            }
-            if (endpoints.contains(endpoint)) {
-                // TODO -> Should the published endpoint be updated here?
-                return;
-            }
-
-            try {
-                addEndpoint(endpoint);
-                endpoints.add(endpoint);
-            } catch (Exception ex) {
-                LOG.error("Exception while processing the addition of an endpoint.", ex);
-            }
-        }
-    }
-
-    private void addEndpoint(EndpointDescription endpoint) throws URISyntaxException, KeeperException,
-                                                                  InterruptedException, IOException {
-        Collection<String> interfaces = endpoint.getInterfaces();
-        String endpointKey = getKey(endpoint);
-        Map<String, Object> props = new HashMap<String, Object>(endpoint.getProperties());
-
-        // process plugins
-        Object[] plugins = discoveryPluginTracker.getServices();
-        if (plugins != null) {
-            for (Object plugin : plugins) {
-                if (plugin instanceof DiscoveryPlugin) {
-                    endpointKey = ((DiscoveryPlugin)plugin).process(props, endpointKey);
-                }
-            }
-        }
-
-        for (String name : interfaces) {
-            String path = Utils.getZooKeeperPath(name);
-            String fullPath = path + '/' + endpointKey;
-            LOG.info("Creating ZooKeeper node for service with path {}", fullPath);
-            createPath(path, zk);
-            List<PropertyType> propsOut = new PropertiesMapper().fromProps(props);
-            EndpointDescriptionType epd = new EndpointDescriptionType();
-            epd.getProperty().addAll(propsOut);
-            byte[] epData = endpointDescriptionParser.getData(epd);
-            createEphemeralNode(fullPath, epData);
-        }
-    }
-
-    private void createEphemeralNode(String fullPath, byte[] data) throws KeeperException, InterruptedException {
-        try {
-            zk.create(fullPath, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
-        } catch (NodeExistsException nee) {
-            // this sometimes happens after a ZooKeeper node dies and the ephemeral node
-            // that belonged to the old session was not yet deleted. We need to make our
-            // session the owner of the node so it won't get deleted automatically -
-            // we do this by deleting and recreating it ourselves.
-            LOG.info("node for endpoint already exists, recreating: {}", fullPath);
-            try {
-                zk.delete(fullPath, -1);
-            } catch (NoNodeException nne) {
-                // it's a race condition, but as long as it got deleted - it's ok
-            }
-            zk.create(fullPath, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
-        }
-    }
-
-    public void endpointRemoved(EndpointDescription endpoint, String matchedFilter) {
-        LOG.info("Local EndpointDescription removed: {}", endpoint);
-
-        synchronized (endpoints) {
-            if (closed) {
-                return;
-            }
-            if (!endpoints.contains(endpoint)) {
-                return;
-            }
-
-            try {
-                removeEndpoint(endpoint);
-                endpoints.remove(endpoint);
-            } catch (Exception ex) {
-                LOG.error("Exception while processing the removal of an endpoint", ex);
-            }
-        }
-    }
-
-    private void removeEndpoint(EndpointDescription endpoint) throws UnknownHostException, URISyntaxException {
-        Collection<String> interfaces = endpoint.getInterfaces();
-        String endpointKey = getKey(endpoint);
-        for (String name : interfaces) {
-            String path = Utils.getZooKeeperPath(name);
-            String fullPath = path + '/' + endpointKey;
-            LOG.debug("Removing ZooKeeper node: {}", fullPath);
-            try {
-                zk.delete(fullPath, -1);
-            } catch (Exception ex) {
-                LOG.debug("Error while removing endpoint: {}", ex); // e.g. session expired
-            }
-        }
-    }
-
-    private static void createPath(String path, ZooKeeper zk) throws KeeperException, InterruptedException {
-        StringBuilder current = new StringBuilder();
-        List<String> parts = Utils.removeEmpty(Arrays.asList(path.split("/")));
-        for (String part : parts) {
-            current.append('/');
-            current.append(part);
-            try {
-                zk.create(current.toString(), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-            } catch (NodeExistsException nee) {
-                // it's not the first node with this path to ever exist - that's normal
-            }
-        }
-    }
-
-    private static String getKey(EndpointDescription endpoint) throws URISyntaxException {
-        URI uri = new URI(endpoint.getId());
-        return new StringBuilder().append(uri.getHost()).append("#").append(uri.getPort())
-            .append("#").append(uri.getPath().replace('/', '#')).toString();
-    }
-
-    public void close() {
-        LOG.debug("closing - removing all endpoints");
-        synchronized (endpoints) {
-            closed = true;
-            for (EndpointDescription endpoint : endpoints) {
-                try {
-                    removeEndpoint(endpoint);
-                } catch (Exception ex) {
-                    LOG.error("Exception while removing endpoint during close", ex);
-                }
-            }
-            endpoints.clear();
-        }
-        discoveryPluginTracker.close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/publish/PublishingEndpointListenerFactory.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/publish/PublishingEndpointListenerFactory.java b/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/publish/PublishingEndpointListenerFactory.java
deleted file mode 100644
index 99a9849..0000000
--- a/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/publish/PublishingEndpointListenerFactory.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.dosgi.discovery.zookeeper.publish;
-
-import java.util.ArrayList;
-import java.util.Dictionary;
-import java.util.Hashtable;
-import java.util.List;
-
-import org.apache.cxf.dosgi.discovery.zookeeper.ZooKeeperDiscovery;
-import org.apache.zookeeper.ZooKeeper;
-import org.osgi.framework.Bundle;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.Constants;
-import org.osgi.framework.ServiceFactory;
-import org.osgi.framework.ServiceRegistration;
-import org.osgi.service.remoteserviceadmin.EndpointListener;
-import org.osgi.service.remoteserviceadmin.RemoteConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Creates local EndpointListeners that publish to ZooKeeper.
- */
-public class PublishingEndpointListenerFactory implements ServiceFactory<PublishingEndpointListener> {
-
-    private static final Logger LOG = LoggerFactory.getLogger(PublishingEndpointListenerFactory.class);
-
-    private final BundleContext bctx;
-    private final ZooKeeper zk;
-    private final List<PublishingEndpointListener> listeners = new ArrayList<PublishingEndpointListener>();
-    private ServiceRegistration<?> serviceRegistration;
-
-    public PublishingEndpointListenerFactory(ZooKeeper zk, BundleContext bctx) {
-        this.bctx = bctx;
-        this.zk = zk;
-    }
-
-    public PublishingEndpointListener getService(Bundle b, ServiceRegistration<PublishingEndpointListener> sr) {
-        LOG.debug("new EndpointListener from factory");
-        synchronized (listeners) {
-            PublishingEndpointListener pel = new PublishingEndpointListener(zk, bctx);
-            listeners.add(pel);
-            return pel;
-        }
-    }
-
-    public void ungetService(Bundle b, ServiceRegistration<PublishingEndpointListener> sr, 
-                             PublishingEndpointListener pel) {
-        LOG.debug("remove EndpointListener");
-        synchronized (listeners) {
-            if (listeners.remove(pel)) {
-                pel.close();
-            }
-        }
-    }
-
-    public synchronized void start() {
-        Dictionary<String, String> props = new Hashtable<String, String>();
-        String uuid = bctx.getProperty(Constants.FRAMEWORK_UUID);
-        props.put(EndpointListener.ENDPOINT_LISTENER_SCOPE, 
-                  String.format("(&(%s=*)(%s=%s))", Constants.OBJECTCLASS, 
-                                RemoteConstants.ENDPOINT_FRAMEWORK_UUID, uuid));
-        props.put(ZooKeeperDiscovery.DISCOVERY_ZOOKEEPER_ID, "true");
-        serviceRegistration = bctx.registerService(EndpointListener.class.getName(), this, props);
-    }
-    
-    public synchronized void stop() {
-        if (serviceRegistration != null) {
-            serviceRegistration.unregister();
-            serviceRegistration = null;
-        }
-        synchronized (listeners) {
-            for (PublishingEndpointListener pel : listeners) {
-                pel.close();
-            }
-            listeners.clear();
-        }
-    }
-
-    /**
-     * Only for the test case!
-     */
-    protected List<PublishingEndpointListener> getListeners() {
-        synchronized (listeners) {
-            return listeners;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/EndpointListenerTracker.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/EndpointListenerTracker.java b/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/EndpointListenerTracker.java
deleted file mode 100644
index 4d0a25f..0000000
--- a/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/EndpointListenerTracker.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.dosgi.discovery.zookeeper.subscribe;
-
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.ServiceReference;
-import org.osgi.service.remoteserviceadmin.EndpointListener;
-import org.osgi.util.tracker.ServiceTracker;
-
-/**
- * Tracks interest in EndpointListeners. Delegates to InterfaceMonitorManager to manage
- * interest in the scopes of each EndpointListener.
- */
-public class EndpointListenerTracker extends ServiceTracker<EndpointListener, EndpointListener> {
-    private final InterfaceMonitorManager imManager;
-
-    public EndpointListenerTracker(BundleContext bctx, InterfaceMonitorManager imManager) {
-        super(bctx, EndpointListener.class, null);
-        this.imManager = imManager;
-    }
-
-    @Override
-    public EndpointListener addingService(ServiceReference<EndpointListener> endpointListener) {
-        imManager.addInterest(endpointListener);
-        return null;
-    }
-
-    @Override
-    public void modifiedService(ServiceReference<EndpointListener> endpointListener, EndpointListener service) {
-        // called when an EndpointListener updates its service properties,
-        // e.g. when its interest scope is expanded/reduced
-        imManager.addInterest(endpointListener);
-    }
-
-    @Override
-    public void removedService(ServiceReference<EndpointListener> endpointListener, EndpointListener service) {
-        imManager.removeInterest(endpointListener);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/InterfaceMonitor.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/InterfaceMonitor.java b/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/InterfaceMonitor.java
deleted file mode 100644
index 3822b6e..0000000
--- a/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/InterfaceMonitor.java
+++ /dev/null
@@ -1,262 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.dosgi.discovery.zookeeper.subscribe;
-
-import java.io.ByteArrayInputStream;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.aries.rsa.discovery.endpoint.EndpointDescriptionParser;
-import org.apache.aries.rsa.discovery.endpoint.PropertiesMapper;
-import org.apache.cxf.dosgi.discovery.zookeeper.util.Utils;
-import org.apache.zookeeper.AsyncCallback.StatCallback;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.Code;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.Stat;
-import org.osgi.service.remoteserviceadmin.EndpointDescription;
-import org.osgi.service.remoteserviceadmin.EndpointListener;
-import org.osgi.xmlns.rsa.v1_0.EndpointDescriptionType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Monitors ZooKeeper for changes in published endpoints.
- * <p>
- * Specifically, it monitors the node path associated with a given interface class,
- * whose data is a serialized version of an EndpointDescription, and notifies an
- * EndpointListener when changes are detected (which can then propagate the
- * notification to other EndpointListeners with a matching scope).
- * <p>
- * Note that the EndpointListener is used here as a decoupling interface for
- * convenience, and is not necessarily used according to its documented contract.
- */
-public class InterfaceMonitor implements Watcher, StatCallback {
-
-    private static final Logger LOG = LoggerFactory.getLogger(InterfaceMonitor.class);
-
-    private final String znode;
-    private final ZooKeeper zk;
-    private final EndpointListener endpointListener;
-    private final boolean recursive;
-    private volatile boolean closed;
-
-    // This map reference changes, so don't synchronize on it
-    private Map<String, EndpointDescription> nodes = new HashMap<String, EndpointDescription>();
-
-    private EndpointDescriptionParser parser;
-
-    public InterfaceMonitor(ZooKeeper zk, String objClass, EndpointListener endpointListener, String scope) {
-        this.zk = zk;
-        this.znode = Utils.getZooKeeperPath(objClass);
-        this.recursive = objClass == null || objClass.isEmpty();
-        this.endpointListener = endpointListener;
-        this.parser = new EndpointDescriptionParser();
-        LOG.debug("Creating new InterfaceMonitor {} for scope [{}] and objectClass [{}]",
-                new Object[] {recursive ? "(recursive)" : "", scope, objClass});
-    }
-
-    /**
-     * Returns all endpoints that are currently known to this monitor.
-     *
-     * @return all endpoints that are currently known to this monitor
-     */
-    public synchronized List<EndpointDescription> getEndpoints() {
-        return new ArrayList<EndpointDescription>(nodes.values());
-    }
-
-    public void start() {
-        watch();
-    }
-
-    private void watch() {
-        LOG.debug("registering a ZooKeeper.exists({}) callback", znode);
-        zk.exists(znode, this, this, null);
-    }
-
-    /**
-     * Zookeeper Watcher interface callback.
-     */
-    public void process(WatchedEvent event) {
-        LOG.debug("ZooKeeper watcher callback on node {} for event {}", znode, event);
-        processDelta();
-    }
-
-    /**
-     * Zookeeper StatCallback interface callback.
-     */
-    @SuppressWarnings("deprecation")
-    public void processResult(int rc, String path, Object ctx, Stat stat) {
-        LOG.debug("ZooKeeper callback on node: {} code: {}", znode, rc);
-
-        switch (rc) {
-        case Code.Ok:
-        case Code.NoNode:
-            processDelta();
-            return;
-
-        case Code.SessionExpired:
-        case Code.NoAuth:
-        case Code.ConnectionLoss:
-            return;
-
-        default:
-            watch();
-        }
-    }
-
-    private void processDelta() {
-        if (closed) {
-            return;
-        }
-
-        if (zk.getState() != ZooKeeper.States.CONNECTED) {
-            LOG.debug("ZooKeeper connection was already closed! Not processing changed event.");
-            return;
-        }
-
-        try {
-            if (zk.exists(znode, false) != null) {
-                zk.getChildren(znode, this);
-                refreshNodes();
-            } else {
-                LOG.debug("znode {} doesn't exist -> not processing any changes", znode);
-            }
-        } catch (Exception e) {
-            if (zk.getState() != ZooKeeper.States.CONNECTED) {
-                LOG.debug("Error getting Zookeeper data: " + e); // e.g. session expired, handled by ZooKeeperDiscovery
-            } else {
-                LOG.error("Error getting ZooKeeper data.", e);
-            }
-        }
-    }
-
-    public synchronized void close() {
-        closed = true;
-        for (EndpointDescription endpoint : nodes.values()) {
-            endpointListener.endpointRemoved(endpoint, null);
-        }
-        nodes.clear();
-    }
-
-    private synchronized void refreshNodes() {
-        if (closed) {
-            return;
-        }
-        LOG.info("Processing change on node: {}", znode);
-
-        Map<String, EndpointDescription> newNodes = new HashMap<String, EndpointDescription>();
-        Map<String, EndpointDescription> prevNodes = new HashMap<String, EndpointDescription>(nodes);
-        processChildren(znode, newNodes, prevNodes);
-
-        // whatever is left in prevNodes now has been removed from Discovery
-        LOG.debug("processChildren done. Nodes that are missing now and need to be removed: {}", prevNodes.values());
-        for (EndpointDescription endpoint : prevNodes.values()) {
-            endpointListener.endpointRemoved(endpoint, null);
-        }
-        nodes = newNodes;
-    }
-
-    /**
-     * Iterates through all child nodes of the given node and tries to find
-     * endpoints. If the recursive flag is set it also traverses into the child
-     * nodes.
-     *
-     * @return true if an endpoint was found and if the node therefore needs to
-     *         be monitored for changes
-     */
-    private boolean processChildren(String zn, Map<String, EndpointDescription> newNodes,
-            Map<String, EndpointDescription> prevNodes) {
-        List<String> children;
-        try {
-            LOG.debug("Processing the children of {}", zn);
-            children = zk.getChildren(zn, false);
-
-            boolean foundANode = false;
-            for (String child : children) {
-                String childZNode = zn + '/' + child;
-                EndpointDescription endpoint = getEndpointDescriptionFromNode(childZNode);
-                if (endpoint != null) {
-                    EndpointDescription prevEndpoint = prevNodes.get(child);
-                    LOG.info("found new node " + zn + "/[" + child + "]   ( []->child )  props: "
-                            + endpoint.getProperties().values());
-                    newNodes.put(child, endpoint);
-                    prevNodes.remove(child);
-                    foundANode = true;
-                    LOG.debug("Properties: {}", endpoint.getProperties());
-                    if (prevEndpoint == null) {
-                        // This guy is new
-                        endpointListener.endpointAdded(endpoint, null);
-                    } else if (!prevEndpoint.getProperties().equals(endpoint.getProperties())) {
-                        // TODO
-                    }
-                }
-                if (recursive && processChildren(childZNode, newNodes, prevNodes)) {
-                    zk.getChildren(childZNode, this);
-                }
-            }
-
-            return foundANode;
-        } catch (KeeperException e) {
-            LOG.error("Problem processing ZooKeeper node", e);
-        } catch (InterruptedException e) {
-            LOG.error("Problem processing ZooKeeper node", e);
-        }
-        return false;
-    }
-
-    /**
-     * Retrieves data from the given node and parses it into an EndpointDescription.
-     *
-     * @param node a node path
-     * @return endpoint found in the node or null if no endpoint was found
-     */
-    private EndpointDescription getEndpointDescriptionFromNode(String node) {
-        try {
-            Stat stat = zk.exists(node, false);
-            if (stat == null || stat.getDataLength() <= 0) {
-                return null;
-            }
-            byte[] data = zk.getData(node, false, null);
-            LOG.debug("Got data for node: {}", node);
-
-            EndpointDescription endpoint = getFirstEnpointDescription(data);
-            if (endpoint != null) {
-                return endpoint;
-            }
-            LOG.warn("No Discovery information found for node: {}", node);
-        } catch (Exception e) {
-            LOG.error("Problem getting EndpointDescription from node " + node, e);
-        }
-        return null;
-    }
-
-    public EndpointDescription getFirstEnpointDescription(byte[] data) {
-        List<EndpointDescriptionType> elements = parser.getEndpointDescriptions(new ByteArrayInputStream(data));
-        if (elements.isEmpty()) {
-            return null;
-        }
-        Map<String, Object> props = new PropertiesMapper().toProps(elements.get(0).getProperty());
-        return new EndpointDescription(props);
-    }
-}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/InterfaceMonitorManager.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/InterfaceMonitorManager.java b/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/InterfaceMonitorManager.java
deleted file mode 100644
index f44b5af..0000000
--- a/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/InterfaceMonitorManager.java
+++ /dev/null
@@ -1,261 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.dosgi.discovery.zookeeper.subscribe;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Dictionary;
-import java.util.HashMap;
-import java.util.Hashtable;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.aries.rsa.util.StringPlus;
-import org.apache.cxf.dosgi.discovery.zookeeper.ZooKeeperDiscovery;
-import org.apache.cxf.dosgi.discovery.zookeeper.util.Utils;
-import org.apache.zookeeper.ZooKeeper;
-import org.osgi.framework.Bundle;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.Filter;
-import org.osgi.framework.ServiceReference;
-import org.osgi.service.remoteserviceadmin.EndpointDescription;
-import org.osgi.service.remoteserviceadmin.EndpointListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Manages the EndpointListeners and the scopes they are interested in.
- * For each scope with interested EndpointListeners an InterfaceMonitor is created.
- * The InterfaceMonitor calls back when it detects added or removed external Endpoints.
- * These events are then forwarded to all interested EndpointListeners.
- */
-public class InterfaceMonitorManager {
-    private static final Logger LOG = LoggerFactory.getLogger(InterfaceMonitorManager.class);
-    private static final Pattern OBJECTCLASS_PATTERN = Pattern.compile(".*\\(objectClass=([^)]+)\\).*");
-
-    private final BundleContext bctx;
-    private final ZooKeeper zk;
-    // map of EndpointListeners and the scopes they are interested in
-    private final Map<ServiceReference<EndpointListener>, List<String>> endpointListenerScopes =
-            new HashMap<ServiceReference<EndpointListener>, List<String>>();
-    // map of scopes and their interest data
-    private final Map<String, Interest> interests = new HashMap<String, Interest>();
-
-    protected static class Interest {
-        List<ServiceReference<EndpointListener>> endpointListeners = 
-            new CopyOnWriteArrayList<ServiceReference<EndpointListener>>();
-        InterfaceMonitor monitor;
-    }
-
-    public InterfaceMonitorManager(BundleContext bctx, ZooKeeper zk) {
-        this.bctx = bctx;
-        this.zk = zk;
-    }
-
-    public void addInterest(ServiceReference<EndpointListener> endpointListener) {
-        if (isOurOwnEndpointListener(endpointListener)) {
-            LOG.debug("Skipping our own EndpointListener");
-            return;
-        }
-
-        LOG.info("updating EndpointListener interests: {}", endpointListener);
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("updated EndpointListener properties: {}", getProperties(endpointListener));
-        }
-        for (String scope : getScopes(endpointListener)) {
-            String objClass = getObjectClass(scope);
-            LOG.debug("Adding interest in scope {}, objectClass {}", scope, objClass);
-            addInterest(endpointListener, scope, objClass);
-        }
-    }
-
-    private static boolean isOurOwnEndpointListener(ServiceReference<EndpointListener> endpointListener) {
-        return Boolean.parseBoolean(String.valueOf(
-                endpointListener.getProperty(ZooKeeperDiscovery.DISCOVERY_ZOOKEEPER_ID)));
-    }
-
-    public synchronized void addInterest(ServiceReference<EndpointListener> endpointListener, 
-                                         String scope, String objClass) {
-        // get or create interest for given scope and add listener to it
-        Interest interest = interests.get(scope);
-        if (interest == null) {
-            // create interest, add listener and start monitor
-            interest = new Interest();
-            interests.put(scope, interest);
-            interest.endpointListeners.add(endpointListener); // add it before monitor starts so we don't miss events
-            interest.monitor = createInterfaceMonitor(scope, objClass, interest);
-            interest.monitor.start();
-        } else {
-            // interest already exists, so just add listener to it
-            if (!interest.endpointListeners.contains(endpointListener)) {
-                interest.endpointListeners.add(endpointListener);
-            }
-            // notify listener of all known endpoints for given scope
-            // (as EndpointListener contract requires of all added/modified listeners)
-            for (EndpointDescription endpoint : interest.monitor.getEndpoints()) {
-                notifyListeners(endpoint, scope, true, Arrays.asList(endpointListener));
-            }
-        }
-
-        // add scope to listener's scopes list
-        List<String> scopes = endpointListenerScopes.get(endpointListener);
-        if (scopes == null) {
-            scopes = new ArrayList<String>(1);
-            endpointListenerScopes.put(endpointListener, scopes);
-        }
-        if (!scopes.contains(scope)) {
-            scopes.add(scope);
-        }
-    }
-
-    public synchronized void removeInterest(ServiceReference<EndpointListener> endpointListener) {
-        LOG.info("removing EndpointListener interests: {}", endpointListener);
-        List<String> scopes = endpointListenerScopes.get(endpointListener);
-        if (scopes == null) {
-            return;
-        }
-
-        for (String scope : scopes) {
-            Interest interest = interests.get(scope);
-            if (interest != null) {
-                interest.endpointListeners.remove(endpointListener);
-                if (interest.endpointListeners.isEmpty()) {
-                    interest.monitor.close();
-                    interests.remove(scope);
-                }
-            }
-        }
-        endpointListenerScopes.remove(endpointListener);
-    }
-
-    protected InterfaceMonitor createInterfaceMonitor(final String scope, String objClass, final Interest interest) {
-        // holding this object's lock in the callbacks can lead to a deadlock with InterfaceMonitor
-        EndpointListener endpointListener = new EndpointListener() {
-
-            public void endpointRemoved(EndpointDescription endpoint, String matchedFilter) {
-                notifyListeners(endpoint, scope, false, interest.endpointListeners);
-            }
-
-            public void endpointAdded(EndpointDescription endpoint, String matchedFilter) {
-                notifyListeners(endpoint, scope, true, interest.endpointListeners);
-            }
-        };
-        return new InterfaceMonitor(zk, objClass, endpointListener, scope);
-    }
-
-    private void notifyListeners(EndpointDescription endpoint, String currentScope, boolean isAdded,
-            List<ServiceReference<EndpointListener>> endpointListeners) {
-        for (ServiceReference<EndpointListener> endpointListenerRef : endpointListeners) {
-            EndpointListener service = bctx.getService(endpointListenerRef);
-            try {
-                EndpointListener endpointListener = (EndpointListener)service;
-                LOG.trace("matching {} against {}", endpoint, currentScope);
-                if (matchFilter(bctx, currentScope, endpoint)) {
-                    LOG.debug("Matched {} against {}", endpoint, currentScope);
-                    notifyListener(endpoint, currentScope, isAdded, endpointListenerRef.getBundle(),
-                                   endpointListener);
-                }
-            } finally {
-                if (service != null) {
-                    bctx.ungetService(endpointListenerRef);
-                }
-            }
-        }
-    }
-    
-    private static boolean matchFilter(BundleContext bctx, String filter, EndpointDescription endpoint) {
-        if (filter == null) {
-            return false;
-        }
-    
-        try {
-            Filter f = bctx.createFilter(filter);
-            Dictionary<String, Object> dict = new Hashtable<String, Object>(endpoint.getProperties());
-            return f.match(dict);
-        } catch (Exception e) {
-            return false;
-        }
-    }
-
-
-    private void notifyListener(EndpointDescription endpoint, String currentScope, boolean isAdded,
-                                Bundle endpointListenerBundle, EndpointListener endpointListener) {
-        if (endpointListenerBundle == null) {
-            LOG.info("listening service was unregistered, ignoring");
-        } else if (isAdded) {
-            LOG.info("calling EndpointListener.endpointAdded: " + endpointListener + " from bundle "
-                    + endpointListenerBundle.getSymbolicName() + " for endpoint: " + endpoint);
-            endpointListener.endpointAdded(endpoint, currentScope);
-        } else {
-            LOG.info("calling EndpointListener.endpointRemoved: " + endpointListener + " from bundle "
-                    + endpointListenerBundle.getSymbolicName() + " for endpoint: " + endpoint);
-            endpointListener.endpointRemoved(endpoint, currentScope);
-        }
-    }
-
-    public synchronized void close() {
-        for (Interest interest : interests.values()) {
-            interest.monitor.close();
-        }
-        interests.clear();
-        endpointListenerScopes.clear();
-    }
-
-    /**
-     * Only for test case!
-     */
-    protected synchronized Map<String, Interest> getInterests() {
-        return interests;
-    }
-
-    /**
-     * Only for test case!
-     */
-    protected synchronized Map<ServiceReference<EndpointListener>, List<String>> getEndpointListenerScopes() {
-        return endpointListenerScopes;
-    }
-
-    protected List<String> getScopes(ServiceReference<?> sref) {
-        return Utils.removeEmpty(StringPlus.normalize(sref.getProperty(EndpointListener.ENDPOINT_LISTENER_SCOPE)));
-    }
-    
-    public static String getObjectClass(String scope) {
-        Matcher m = OBJECTCLASS_PATTERN.matcher(scope);
-        return m.matches() ? m.group(1) : null;
-    }
-
-    /**
-     * Returns a service's properties as a map.
-     *
-     * @param serviceReference a service reference
-     * @return the service's properties as a map
-     */
-    public static Map<String, Object> getProperties(ServiceReference<?> serviceReference) {
-        String[] keys = serviceReference.getPropertyKeys();
-        Map<String, Object> props = new HashMap<String, Object>(keys.length);
-        for (String key : keys) {
-            Object val = serviceReference.getProperty(key);
-            props.put(key, val);
-        }
-        return props;
-    }
-}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/util/Utils.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/util/Utils.java b/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/util/Utils.java
deleted file mode 100644
index afd9c0a..0000000
--- a/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/util/Utils.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.dosgi.discovery.zookeeper.util;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public final class Utils {
-
-    static final String PATH_PREFIX = "/osgi/service_registry";
-
-    private Utils() {
-        // never constructed
-    }
-
-    public static String getZooKeeperPath(String name) {
-        return name == null || name.isEmpty() ? PATH_PREFIX : PATH_PREFIX + '/' + name.replace('.', '/');
-    }
-
-    /**
-     * Removes nulls and empty strings from the given string array.
-     *
-     * @param strings an array of strings
-     * @return a new array containing the non-null and non-empty
-     *         elements of the original array in the same order
-     */
-    public static List<String> removeEmpty(List<String> strings) {
-        List<String> result = new ArrayList<String>();
-        for (String s : strings) {
-            if (s != null && !s.isEmpty()) {
-                result.add(s);
-            }
-        }
-        return result;
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper/src/main/resources/OSGI-INF/metatype/zookeeper.xml
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/resources/OSGI-INF/metatype/zookeeper.xml b/discovery/zookeeper/src/main/resources/OSGI-INF/metatype/zookeeper.xml
new file mode 100644
index 0000000..361fa1e
--- /dev/null
+++ b/discovery/zookeeper/src/main/resources/OSGI-INF/metatype/zookeeper.xml
@@ -0,0 +1,34 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements. See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership. The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License. You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied. See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<MetaData xmlns="http://www.osgi.org/xmlns/metadata/v1.0.0"
+	 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	 xsi:schemaLocation="
+		 http://www.osgi.org/xmlns/metadata/v1.0.0 http://www.osgi.org/xmlns/metatype/v1.1.0/metatype.xsd
+	 ">
+	 <OCD description="" name="Zookeeper server config" id="zookeeper.server">
+        <AD id="clientPort" required="false" type="String" default="2181" description=""/>
+	 	<AD id="tickTime" required="false" type="String" default="2000" description=""/>
+        <AD id="initLimit" required="false" type="String" default="10" description=""/>
+        <AD id="syncLimit" required="false" type="String" default="5" description=""/>
+	 </OCD>
+	 <Designate pid="org.apache.aries.rsa.discovery.zookeeper.server">
+	 	<Object ocdref="zookeeper.server"/>
+	 </Designate>
+</MetaData>
\ No newline at end of file