You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by cs...@apache.org on 2012/11/02 16:02:02 UTC

svn commit: r1404989 - in /cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src: main/java/org/apache/cxf/dosgi/discovery/zookeeper/ test/java/org/apache/cxf/dosgi/discovery/zookeeper/

Author: cschneider
Date: Fri Nov  2 15:02:01 2012
New Revision: 1404989

URL: http://svn.apache.org/viewvc?rev=1404989&view=rev
Log:
DOSGI-138 DOSGI-123 DOSGI-70 Refactoring of discovery, added reconnect and republishing after connection loss to zookeeper

Added:
    cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/InterfaceMonitorManager.java   (with props)
    cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/PublishingEndpointListener.java
      - copied, changed from r1402482, cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/EndpointListenerImpl.java
    cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/PublishingEndpointListenerFactory.java
      - copied, changed from r1402482, cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/EndpointListenerFactory.java
    cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/InterfaceMonitorManagerTest.java
      - copied, changed from r1402482, cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/EndpointListenerTrackerCustomizerTest.java
Removed:
    cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/DataMonitorListener.java
    cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/EndpointListenerFactory.java
    cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/EndpointListenerImpl.java
    cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/InterfaceDataMonitorListenerImpl.java
    cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/ActivatorTest.java
    cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/EndpointListenerTrackerCustomizerTest.java
Modified:
    cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/Activator.java
    cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/EndpointListenerTrackerCustomizer.java
    cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/InterfaceMonitor.java
    cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/Util.java
    cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/ZooKeeperDiscovery.java
    cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/EndpointListenerFactoryTest.java
    cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/EndpointListenerImplTest.java
    cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/InterfaceMonitorTest.java
    cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/UtilTest.java

Modified: cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/Activator.java
URL: http://svn.apache.org/viewvc/cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/Activator.java?rev=1404989&r1=1404988&r2=1404989&view=diff
==============================================================================
--- cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/Activator.java (original)
+++ cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/Activator.java Fri Nov  2 15:02:01 2012
@@ -18,92 +18,26 @@
  */
 package org.apache.cxf.dosgi.discovery.zookeeper;
 
-import java.io.IOException;
 import java.util.Dictionary;
-import java.util.Enumeration;
 import java.util.Hashtable;
-import java.util.logging.Level;
-import java.util.logging.Logger;
 
 import org.osgi.framework.BundleActivator;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.Constants;
-import org.osgi.framework.ServiceRegistration;
-import org.osgi.service.cm.ConfigurationException;
 import org.osgi.service.cm.ManagedService;
 
-public class Activator implements BundleActivator, ManagedService {
-    private static final Logger LOG = Logger.getLogger(Activator.class.getName());
-
+public class Activator implements BundleActivator {
     private ZooKeeperDiscovery zkd;
-    private Dictionary zkProperties;
-    private BundleContext bctx;
-    ServiceRegistration cmReg;
 
     public synchronized void start(BundleContext bc) throws Exception {
-        bctx = bc;
-        zkProperties = getCMDefaults();
-        zkd = createZooKeeperDiscovery();
-
-        cmReg = bc.registerService(ManagedService.class.getName(), this, zkProperties);
+        zkd = new ZooKeeperDiscovery(bc);
+        Dictionary<String, String> props = new Hashtable<String, String>();
+        props.put(Constants.SERVICE_PID, "org.apache.cxf.dosgi.discovery.zookeeper");
+        bc.registerService(ManagedService.class.getName(), zkd, props);
     }
 
     public synchronized void stop(BundleContext bc) throws Exception {
-        cmReg.unregister();
         zkd.stop();
     }
 
-    public synchronized void updated(Dictionary configuration) throws ConfigurationException {
-        if (LOG.isLoggable(Level.FINE))
-            LOG.fine("Received configuration update for Zookeeper Discovery: " + configuration);
-        if (configuration == null)
-            return;
-
-        Dictionary effective = getCMDefaults();
-        // apply all values on top of the defaults
-        for (Enumeration e = configuration.keys(); e.hasMoreElements();) {
-            Object key = e.nextElement();
-            if (key != null) {
-                Object val = configuration.get(key);
-                effective.put(key, val);
-            }
-        }
-
-        if (zkProperties.equals(effective)) {
-            LOG.info("Update called, but actual settings haven't changed ...");
-            return;
-        } else if (LOG.isLoggable(Level.INFO)) {
-            LOG.info("Updating configuration for Zookeeper Discovery: " + configuration);
-        }
-
-        zkProperties = effective;
-        cmReg.setProperties(zkProperties);
-
-        synchronized (this) {
-            zkd.stop();
-            zkd = createZooKeeperDiscovery();
-        }
-
-        // call start in any case
-        try {
-            zkd.start();
-        } catch (IOException e) {
-            LOG.log(Level.SEVERE, "Failed to start the Zookeeper Discovery component.", e);
-        }
-
-    }
-
-    private Dictionary getCMDefaults() {
-        Dictionary props = new Hashtable();
-        props.put("zookeeper.timeout", "3000");
-        props.put("zookeeper.port", "2181");
-        props.put(Constants.SERVICE_PID, "org.apache.cxf.dosgi.discovery.zookeeper");
-        return props;
-    }
-
-    // for testing
-    protected ZooKeeperDiscovery createZooKeeperDiscovery() {
-        return new ZooKeeperDiscovery(bctx, zkProperties);
-    }
-
 }

Modified: cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/EndpointListenerTrackerCustomizer.java
URL: http://svn.apache.org/viewvc/cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/EndpointListenerTrackerCustomizer.java?rev=1404989&r1=1404988&r2=1404989&view=diff
==============================================================================
--- cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/EndpointListenerTrackerCustomizer.java (original)
+++ cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/EndpointListenerTrackerCustomizer.java Fri Nov  2 15:02:01 2012
@@ -18,12 +18,6 @@
  */
 package org.apache.cxf.dosgi.discovery.zookeeper;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 import java.util.regex.Matcher;
@@ -31,165 +25,69 @@ import java.util.regex.Pattern;
 
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.ServiceReference;
-import org.osgi.service.remoteserviceadmin.EndpointDescription;
-import org.osgi.service.remoteserviceadmin.EndpointListener;
-import org.osgi.service.remoteserviceadmin.RemoteConstants;
 import org.osgi.util.tracker.ServiceTrackerCustomizer;
 
+/**
+ * Tracks interest in EndpointListeners. Delegates to InterfaceMonitorManager to manage 
+ * interest in the scopes of each EndpointListener
+ */
 public class EndpointListenerTrackerCustomizer implements ServiceTrackerCustomizer {
-
     private static final Logger LOG = Logger.getLogger(EndpointListenerTrackerCustomizer.class.getName());
-    private ZooKeeperDiscovery zooKeeperDiscovery;
     private static final Pattern OBJECTCLASS_PATTERN = Pattern.compile(".*\\(objectClass=([^)]+)\\).*");
 
-    private Map<String /* scope */, Interest> interestingScopes = new HashMap<String, Interest>();
-    private Map<ServiceReference, List<String> /* scopes of the epl */> handledEndpointlisteners = new HashMap<ServiceReference, List<String>>();
+    private InterfaceMonitorManager imManager;
 
-    private BundleContext bctx;
-
-    protected static class Interest {
-        List<ServiceReference> relatedServiceListeners = new ArrayList<ServiceReference>(1);
-        InterfaceMonitor im;
-    }
-
-    public EndpointListenerTrackerCustomizer(ZooKeeperDiscovery zooKeeperDiscovery, BundleContext bc) {
-        this.zooKeeperDiscovery = zooKeeperDiscovery;
-        bctx = bc;
+    public EndpointListenerTrackerCustomizer(BundleContext bc, InterfaceMonitorManager imManager) {
+        this.imManager = imManager;
     }
 
     public Object addingService(ServiceReference sref) {
-        LOG.info("addingService: " + sref);
         handleEndpointListener(sref);
         return sref;
     }
 
     public void modifiedService(ServiceReference sref, Object service) {
-        LOG.info("modifiedService: " + sref);
         handleEndpointListener(sref);
     }
 
     private void handleEndpointListener(ServiceReference sref) {
+        if (isOurOwnEndpointListener(sref)) {
+            LOG.finest("Skipping our own endpointListener");
+            return;
+        }
+
         if (LOG.isLoggable(Level.FINEST)) {
             for (String key : sref.getPropertyKeys()) {
                 LOG.finest("modifiedService: property: " + key + " => " + sref.getProperty(key));
             }
         }
-
-        if (Boolean.parseBoolean(String.valueOf(
-                sref.getProperty(EndpointListenerFactory.DISCOVERY_ZOOKEEPER_ID)))) {
-            LOG.finest("found my own endpointListener ... skipping it");
-            return;
-        }
         
         String[] scopes = Util.getScopes(sref);
-        
-        if (LOG.isLoggable(Level.FINE)) {
-            LOG.fine("trying to discover services for scopes[" + scopes.length + "]: ");
-            for (String scope : scopes) {
-                LOG.fine("Scope: "+scope);
-            }
-        }
-        if (scopes.length > 0) {
-            for (String scope : scopes) {
-                LOG.fine("***********  Handling scope: " + scope);
-                if("".equals(scope) || scope == null){
-                    LOG.warning("skipping empty scope from EndpointListener from " + sref.getBundle().getSymbolicName());
-                    continue;
-                }
-                
-                String objClass = getObjectClass(scope);
-                LOG.fine("***********  objectClass: " + objClass);
-
-                synchronized (interestingScopes) {
-                    synchronized (handledEndpointlisteners) {
-                        Interest interest = interestingScopes.get(scope);
-                        if (interest == null) {
-                            interest = new Interest();
-                            interestingScopes.put(scope, interest);
-                        }
-
-                        
-                        if (!interest.relatedServiceListeners.contains(sref)) {
-                            interest.relatedServiceListeners.add(sref);
-                        }
-
-                        if (interest.im != null) {
-                            // close old Monitor
-                            interest.im.close();
-                            interest.im = null;
-                        }
-                        
-                        InterfaceMonitor dm = createInterfaceMonitor(scope, objClass, interest);
-                        dm.start();
-                        interest.im = dm;
-
-                        List<String> handledScopes = handledEndpointlisteners.get(sref);
-                        if (handledScopes == null) {
-                            handledScopes = new ArrayList<String>(1);
-                            handledEndpointlisteners.put(sref, handledScopes);
-                        }
-
-                        if (!handledScopes.contains(scope))
-                            handledScopes.add(scope);
-
-                    }
-                }
-
+        for (String scope : scopes) {
+            String objClass = getObjectClass(scope);
+            if (LOG.isLoggable(Level.FINE)) {
+                LOG.fine("Adding interest in scope: " + scope + " objectClass: " + objClass);
             }
+            imManager.addInterest(sref, scope, objClass);
         }
     }
 
+    private boolean isOurOwnEndpointListener(ServiceReference sref) {
+        return Boolean.parseBoolean(String.valueOf(
+                sref.getProperty(PublishingEndpointListenerFactory.DISCOVERY_ZOOKEEPER_ID)));
+    }
 
     private String getObjectClass(String scope) {
+        if (scope == null) {
+            return null;
+        }
         Matcher m = OBJECTCLASS_PATTERN.matcher(scope);
-        if (m.matches())
-            return m.group(1);
-        return null;
+        return m.matches() ? m.group(1) : null;
     }
 
     public void removedService(ServiceReference sref, Object service) {
         LOG.info("removedService: " + sref);
-
-        List<String> handledScopes = handledEndpointlisteners.get(sref);
-        if (handledScopes != null) {
-            for (String scope : handledScopes) {
-                Interest i = interestingScopes.get(scope);
-                if (i != null) {
-                    i.relatedServiceListeners.remove(sref);
-                    if (i.relatedServiceListeners.size() == 0) {
-                        i.im.close();
-                        interestingScopes.remove(scope);
-                    }
-                }
-            }
-            handledEndpointlisteners.remove(sref);
-        }
-
+        imManager.removeInterest(sref);
     }
 
-    
-
-    /**
-     * Only for test case !
-     * */
-    protected Map<String, Interest> getInterestingScopes() {
-        return interestingScopes;
-    }
-
-    /**
-     * Only for test case !
-     * */
-    protected Map<ServiceReference, List<String>>  getHandledEndpointlisteners() {
-        return handledEndpointlisteners;
-    }
-
-    
-    /**
-     * Only for test case !
-     * */
-    protected InterfaceMonitor createInterfaceMonitor(String scope, String objClass, Interest interest) {
-        InterfaceMonitor dm = new InterfaceMonitor(zooKeeperDiscovery.getZookeeper(),
-                                                   objClass, interest, scope, bctx);
-        return dm;
-    }
 }

Modified: cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/InterfaceMonitor.java
URL: http://svn.apache.org/viewvc/cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/InterfaceMonitor.java?rev=1404989&r1=1404988&r2=1404989&view=diff
==============================================================================
--- cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/InterfaceMonitor.java (original)
+++ cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/InterfaceMonitor.java Fri Nov  2 15:02:01 2012
@@ -18,48 +18,68 @@
   */
 package org.apache.cxf.dosgi.discovery.zookeeper;
 
+import java.io.ByteArrayInputStream;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import org.apache.cxf.dosgi.discovery.local.LocalDiscoveryUtils;
+import org.apache.zookeeper.AsyncCallback.StatCallback;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.AsyncCallback.StatCallback;
-import org.apache.zookeeper.KeeperException.Code;
 import org.apache.zookeeper.data.Stat;
+import org.jdom.Element;
 import org.osgi.framework.BundleContext;
-import org.osgi.framework.ServiceReference;
+import org.osgi.service.remoteserviceadmin.EndpointDescription;
+import org.osgi.service.remoteserviceadmin.EndpointListener;
 
 public class InterfaceMonitor implements Watcher, StatCallback {
     private static final Logger LOG = Logger.getLogger(InterfaceMonitor.class.getName());
 
-    InterfaceDataMonitorListenerImpl listener;
-    final String znode;
-    final ZooKeeper zookeeper;
-    
+    private final String znode;
+    private final ZooKeeper zookeeper;
+    private final EndpointListener epListener;
+    private final boolean recursive;
     private boolean closed = false;
+    
+    // This map is *only* accessed in the change() method
+    private Map<String, EndpointDescription> nodes = new HashMap<String, EndpointDescription>();
 
-    public InterfaceMonitor(ZooKeeper zk, String intf, EndpointListenerTrackerCustomizer.Interest zkd, String scope, BundleContext bctx) {
-        LOG.fine("Creating new InterfaceMonitor for scope ["+scope+"] and objectClass ["+intf+"] ");
-        listener = createInterfaceDataMonitorListener(zk, intf, zkd, scope, bctx);
-        zookeeper = zk;
-        znode = Util.getZooKeeperPath(intf);
+    public InterfaceMonitor(ZooKeeper zk, String intf, EndpointListener epListener, String scope, BundleContext bctx) {
+        LOG.fine("Creating new InterfaceMonitor for scope [" + scope + "] and objectClass [" + intf + "] ");
+        this.zookeeper = zk;
+        this.znode = Util.getZooKeeperPath(intf);
+        this.recursive = (intf == null || "".equals(intf));
+        this.epListener = epListener;
+        LOG.fine("InterfaceDataMonitorListenerImpl is recursive: " + recursive);
     }
     
     public void start() {
-        process();
+        watch();
     }
     
-    private void process() {
+    private void watch() {
         LOG.finest("registering a zookeeper.exists(" + znode+") callback");
         zookeeper.exists(znode, this, this, null);
     }
 
+    /**
+     * Zookeeper watcher
+     */
     public void process(WatchedEvent event) {
         LOG.finer("ZooKeeper watcher callback " + event);
         processDelta();
     }
     
+    /**
+     * Zookeeper StatCallback 
+     */
+    @SuppressWarnings("deprecation")
     public void processResult(int rc, String path, Object ctx, Stat stat) {
 
         LOG.finer("ZooKeeper callback on node: " + znode + "   code: " + rc );
@@ -75,7 +95,7 @@ public class InterfaceMonitor implements
             return;
         
         default:
-            process();
+            watch();
             return;
         }
         
@@ -83,9 +103,11 @@ public class InterfaceMonitor implements
     }
 
     private void processDelta() {
-        if(closed) return;
+        if (closed) {
+            return;
+        }
         
-        if(zookeeper.getState() != ZooKeeper.States.CONNECTED){
+        if (zookeeper.getState() != ZooKeeper.States.CONNECTED){
             LOG.info("zookeeper connection was already closed! Not processing changed event.");
             return;
         }
@@ -93,7 +115,7 @@ public class InterfaceMonitor implements
         try {
             if (zookeeper.exists(znode, false) != null) {
                 zookeeper.getChildren(znode, this);
-                listener.change();
+                change();
             }else{
                 LOG.fine(znode+" doesn't exist -> not processing any changes");
             }
@@ -102,22 +124,101 @@ public class InterfaceMonitor implements
         }
     }
 
-    public void inform(ServiceReference sref) {
-       listener.inform(sref);
-    }
-
     public void close() {
-        // TODO !!!     
-        closed = true;
+        for (EndpointDescription epd : nodes.values()) {
+            epListener.endpointRemoved(epd, null);
+        }
+        nodes.clear();
     }
     
+    public synchronized void change() {
+        LOG.info("Zookeeper callback on node: " + znode);
+
+        Map<String, EndpointDescription> newNodes = new HashMap<String, EndpointDescription>();
+        Map<String, EndpointDescription> prevNodes = nodes;
+        processChildren(znode, newNodes, prevNodes);
+
+        // whatever is left in prevNodes now has been removed from Discovery
+        LOG.fine("processChildren done. Nodes that are missing now and need to be removed: "
+                 + prevNodes.values());
+        for (EndpointDescription epd : prevNodes.values()) {
+            epListener.endpointRemoved(epd, null);
+        }
+        nodes = newNodes;
+    }
+
     /**
-     * Only for thest case
-     * @return 
-     * */
-    protected InterfaceDataMonitorListenerImpl createInterfaceDataMonitorListener(ZooKeeper zk, String intf,
-                                                      EndpointListenerTrackerCustomizer.Interest zkd,
-                                                      String scope, BundleContext bctx) {
-        return new InterfaceDataMonitorListenerImpl(zk, intf, zkd,scope,bctx,this);
+     * iterates through all child nodes of the given node and tries to find endpoints. If the recursive flag
+     * is set it also traverses into the child nodes.
+     * 
+     * @return true if an endpoint was found and if the node therefore needs to be monitored for changes
+     */
+    private boolean processChildren(String znode, Map<String, EndpointDescription> newNodes,
+                                    Map<String, EndpointDescription> prevNodes) {
+        List<String> children;
+        try {
+            LOG.fine("Processing the children of " + znode);
+            children = zookeeper.getChildren(znode, false);
+
+            boolean foundANode = false;
+            for (String child : children) {
+                String childZNode = znode + '/' + child;
+                EndpointDescription epd = getEndpointDescriptionFromNode(childZNode);
+                if (epd != null) {
+                    EndpointDescription prevEpd = prevNodes.get(child);
+                    LOG.info("found new node " + znode + "/[" + child + "]   ( []->child )  props: "
+                             + epd.getProperties().values());
+                    newNodes.put(child, epd);
+                    prevNodes.remove(child);
+                    foundANode = true;
+                    LOG.finest("Properties: " + epd.getProperties());
+                    if (prevEpd == null) {
+                        // This guy is new
+                        epListener.endpointAdded(epd, null);
+                    } else if (!prevEpd.getProperties().equals(epd.getProperties())) {
+                        // TODO
+                    }
+                }
+                if (recursive) {
+                    if (processChildren(childZNode, newNodes, prevNodes)) {
+                        zookeeper.getChildren(childZNode, this);
+                    }
+                }
+            }
+
+            return foundANode;
+        } catch (KeeperException e) {
+            LOG.log(Level.SEVERE, "Problem processing Zookeeper node: " + e.getMessage(), e);
+        } catch (InterruptedException e) {
+            LOG.log(Level.SEVERE, "Problem processing Zookeeper node: " + e.getMessage(), e);
+        }
+        return false;
+    }
+
+    /**
+     * Scan the node data for Endpoint information and publish it to the related service listeners
+     * 
+     * @param node 
+     * @return endpoint found in the node or null if no endpoint was found
+     */ 
+    private EndpointDescription getEndpointDescriptionFromNode(String node) {
+        try {
+            Stat s = zookeeper.exists(node, false);
+            if (s.getDataLength() <= 0) {
+                return null;
+            }
+            byte[] data = zookeeper.getData(node, false, null);
+            LOG.fine("Child: " + node);
+
+            List<Element> elements = LocalDiscoveryUtils.getElements(new ByteArrayInputStream(data));
+            if (elements.size() > 0) {
+                return LocalDiscoveryUtils.getEndpointDescription(elements.get(0));
+            } else {
+                LOG.warning("No Discovery information found for node: " + node);
+            }
+        } catch (Exception e) {
+            LOG.log(Level.SEVERE, "Problem processing Zookeeper callback: " + e.getMessage(), e);
+        }
+        return null;
     }
 }

Added: cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/InterfaceMonitorManager.java
URL: http://svn.apache.org/viewvc/cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/InterfaceMonitorManager.java?rev=1404989&view=auto
==============================================================================
--- cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/InterfaceMonitorManager.java (added)
+++ cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/InterfaceMonitorManager.java Fri Nov  2 15:02:01 2012
@@ -0,0 +1,182 @@
+package org.apache.cxf.dosgi.discovery.zookeeper;
+
+import java.util.ArrayList;
+import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.zookeeper.ZooKeeper;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.Filter;
+import org.osgi.framework.FrameworkUtil;
+import org.osgi.framework.InvalidSyntaxException;
+import org.osgi.framework.ServiceReference;
+import org.osgi.service.remoteserviceadmin.EndpointDescription;
+import org.osgi.service.remoteserviceadmin.EndpointListener;
+
+/**
+ * Manages the EndpointListeners and the scopes they are interested in.
+ * For each scope with interested EndpointListeners an InterfaceMonitor is created.
+ * The InterfaceMonitor calls back when it detects added or removed external Endpoints.
+ * These events are then forwarded to all interested EndpointListeners
+ */
+public class InterfaceMonitorManager {
+    private static final Logger LOG = Logger.getLogger(InterfaceMonitorManager.class.getName());
+    
+    private final ZooKeeper zooKeeper;
+    private final Map<ServiceReference, List<String> /* scopes of the epl */> handledEndpointlisteners = new HashMap<ServiceReference, List<String>>();
+    private final Map<String /* scope */, Interest> interestingScopes = new HashMap<String, Interest>();
+    private final BundleContext bctx;
+
+    protected static class Interest {
+        List<ServiceReference> relatedServiceListeners = new ArrayList<ServiceReference>(1);
+        InterfaceMonitor im;
+    }
+    
+    public InterfaceMonitorManager(BundleContext bctx, ZooKeeper zooKeeper) {
+        this.bctx = bctx;
+        this.zooKeeper = zooKeeper;
+    }
+    
+    void addInterest(ServiceReference sref, String scope, String objClass) {
+        synchronized (interestingScopes) {
+            synchronized (handledEndpointlisteners) {
+                Interest interest = interestingScopes.get(scope);
+                if (interest == null) {
+                    interest = new Interest();
+                    interestingScopes.put(scope, interest);
+                }
+                
+                if (!interest.relatedServiceListeners.contains(sref)) {
+                    interest.relatedServiceListeners.add(sref);
+                }
+
+                if (interest.im != null) {
+                    // close old Monitor
+                    interest.im.close();
+                    interest.im = null;
+                }
+                
+                InterfaceMonitor dm = createInterfaceMonitor(scope, objClass, interest);
+                dm.start();
+                interest.im = dm;
+
+                List<String> handledScopes = handledEndpointlisteners.get(sref);
+                if (handledScopes == null) {
+                    handledScopes = new ArrayList<String>(1);
+                    handledEndpointlisteners.put(sref, handledScopes);
+                }
+
+                if (!handledScopes.contains(scope))
+                    handledScopes.add(scope);
+
+            }
+        }
+    }
+    
+    /**
+     * Only for test case !
+     * */
+    protected Map<String, Interest> getInterestingScopes() {
+        return interestingScopes;
+    }
+
+    /**
+     * Only for test case !
+     * */
+    protected Map<ServiceReference, List<String>>  getHandledEndpointlisteners() {
+        return handledEndpointlisteners;
+    }
+    
+    protected InterfaceMonitor createInterfaceMonitor(String scope, String objClass, final Interest interest) {
+        EndpointListener epListener = new EndpointListener() {
+            public void endpointRemoved(EndpointDescription endpoint, String matchedFilter) {
+                notifyListeners(endpoint, false, interest.relatedServiceListeners);
+            }
+            
+            public void endpointAdded(EndpointDescription endpoint, String matchedFilter) {
+                notifyListeners(endpoint, true, interest.relatedServiceListeners);
+            }
+        };
+        return new InterfaceMonitor(zooKeeper, objClass, epListener, scope, bctx);
+    }
+
+    public void removeInterest(ServiceReference sref) {
+        List<String> handledScopes = handledEndpointlisteners.get(sref);
+        if (handledScopes == null) {
+               return;
+        }
+
+        for (String scope : handledScopes) {
+            Interest i = interestingScopes.get(scope);
+            if (i != null) {
+                i.relatedServiceListeners.remove(sref);
+                if (i.relatedServiceListeners.size() == 0) {
+                    i.im.close();
+                    interestingScopes.remove(scope);
+                }
+            }
+        }
+        handledEndpointlisteners.remove(sref);
+    }
+    
+    private void notifyListeners(EndpointDescription epd, boolean isAdded,
+            List<ServiceReference> relatedServiceListeners) {
+        for (ServiceReference sref : relatedServiceListeners) {
+            Object service = bctx.getService(sref);
+            if (service == null || !(service instanceof EndpointListener)) {
+                continue;
+            }
+            EndpointListener epl = (EndpointListener) service;
+            String[] scopes = Util.getScopes(sref);
+            for (final String currentScope : scopes) {
+                LOG.fine("matching " + epd + " against " + currentScope);
+                if (matches(currentScope, epd)) {
+                    LOG.fine("Matched " + epd + "against " + currentScope);
+                    if (isAdded) {
+                        LOG.info("calling EndpointListener.endpointAdded: " + epl + "from bundle "
+                                + sref.getBundle().getSymbolicName() + " for endpoint: " + epd);
+                        epl.endpointAdded(epd, currentScope);
+                    } else {
+                        LOG.info("calling EndpointListener.endpointRemoved: " + epl + "from bundle "
+                                + sref.getBundle().getSymbolicName() + " for endpoint: " + epd);
+                        epl.endpointRemoved(epd, currentScope);
+                    }
+                    break;
+                }
+            }
+        }
+    }
+    
+    private boolean matches(String scope, EndpointDescription epd) {
+        try {
+            Filter f = FrameworkUtil.createFilter(scope);
+            Dictionary<String, Object> dict = mapToDictionary(epd.getProperties());
+            return f.match(dict);
+        } catch (InvalidSyntaxException e) {
+            LOG.log(Level.SEVERE, "Currentscope [" + scope + "] resulted in" + " a bad filter!", e);
+            return false;
+        }
+    }
+
+    private Dictionary<String, Object> mapToDictionary(Map<String, Object> map) {
+        Dictionary<String, Object> d = new Hashtable<String, Object>();
+        Set<Map.Entry<String, Object>> entries = map.entrySet();
+        for (Map.Entry<String, Object> entry : entries) {
+            d.put(entry.getKey(), entry.getValue());
+        }
+        return d;
+    }
+
+    public void close() {
+        for (String scope : interestingScopes.keySet()) {
+            Interest interest = interestingScopes.get(scope);
+            interest.im.close();
+        }
+    }
+}

Propchange: cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/InterfaceMonitorManager.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Copied: cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/PublishingEndpointListener.java (from r1402482, cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/EndpointListenerImpl.java)
URL: http://svn.apache.org/viewvc/cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/PublishingEndpointListener.java?p2=cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/PublishingEndpointListener.java&p1=cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/EndpointListenerImpl.java&r1=1402482&r2=1404989&rev=1404989&view=diff
==============================================================================
--- cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/EndpointListenerImpl.java (original)
+++ cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/PublishingEndpointListener.java Fri Nov  2 15:02:01 2012
@@ -42,17 +42,20 @@ import org.osgi.service.remoteserviceadm
 import org.osgi.service.remoteserviceadmin.EndpointListener;
 import org.osgi.util.tracker.ServiceTracker;
 
-public class EndpointListenerImpl implements EndpointListener {
-    private final Logger LOG = Logger.getLogger(EndpointListenerImpl.class.getName());
+/**
+ * Listens for local Endpoints and publishes them to Zookeeper
+ */
+public class PublishingEndpointListener implements EndpointListener {
+    private final Logger LOG = Logger.getLogger(PublishingEndpointListener.class.getName());
 
-    private final ZooKeeperDiscovery discovery;
+    private final ZooKeeper zookeeper;
     private final List<DiscoveryPlugin> discoveryPlugins = new CopyOnWriteArrayList<DiscoveryPlugin>();
     private final ServiceTracker discoveryPluginTracker;
     private final List<EndpointDescription> endpoints = new ArrayList<EndpointDescription>();
     private boolean closed = false;
 
-    public EndpointListenerImpl(ZooKeeperDiscovery zooKeeperDiscovery, BundleContext bctx) {
-        discovery = zooKeeperDiscovery;
+    public PublishingEndpointListener(ZooKeeper zooKeeper, BundleContext bctx) {
+        this.zookeeper = zooKeeper;
 
         discoveryPluginTracker = new ServiceTracker(bctx, DiscoveryPlugin.class.getName(), null) {
             @Override
@@ -73,12 +76,8 @@ public class EndpointListenerImpl implem
         discoveryPluginTracker.open();
     }
 
-    private ZooKeeper getZooKeeper() {
-        return discovery.getZookeeper();
-    }
-
     public void endpointAdded(EndpointDescription endpoint, String matchedFilter) {
-        LOG.info("endpointDescription added: " + endpoint);
+        LOG.info("Local endpointDescription added: " + endpoint);
 
         if (closed)
             return;
@@ -95,7 +94,6 @@ public class EndpointListenerImpl implem
                 Collection<String> interfaces = endpoint.getInterfaces();
                 String endpointKey = getKey(endpoint.getId());
 
-                ZooKeeper zk = getZooKeeper();
                 for (String name : interfaces) {
                     Map<String, Object> props = new HashMap<String, Object>(endpoint.getProperties());
                     for (DiscoveryPlugin plugin : discoveryPlugins) {
@@ -103,11 +101,11 @@ public class EndpointListenerImpl implem
                     }
 
                     String path = Util.getZooKeeperPath(name);
-                    ensurePath(path, zk);
+                    ensurePath(path, zookeeper);
 
                     String fullPath = path + '/' + endpointKey;
                     LOG.fine("Creating ZooKeeper node: " + fullPath);
-                    zk.create(fullPath, getData(props), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+                    zookeeper.create(fullPath, getData(props), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
                 }
 
                 endpoints.add(endpoint);
@@ -119,7 +117,7 @@ public class EndpointListenerImpl implem
     }
 
     public void endpointRemoved(EndpointDescription endpoint, String matchedFilter) {
-        LOG.info("endpointDescription removed: " + endpoint);
+        LOG.info("Local endpointDescription removed: " + endpoint);
 
         if (closed)
             return;
@@ -145,12 +143,11 @@ public class EndpointListenerImpl implem
         Collection<String> interfaces = endpoint.getInterfaces();
         String endpointKey = getKey(endpoint.getId());
 
-        ZooKeeper zk = getZooKeeper();
         for (String name : interfaces) {
             String path = Util.getZooKeeperPath(name);
             String fullPath = path + '/' + endpointKey;
             LOG.fine("Removing ZooKeeper node: " + fullPath);
-            zk.delete(fullPath, -1);
+            zookeeper.delete(fullPath, -1);
         }
     }
 

Copied: cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/PublishingEndpointListenerFactory.java (from r1402482, cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/EndpointListenerFactory.java)
URL: http://svn.apache.org/viewvc/cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/PublishingEndpointListenerFactory.java?p2=cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/PublishingEndpointListenerFactory.java&p1=cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/EndpointListenerFactory.java&r1=1402482&r2=1404989&rev=1404989&view=diff
==============================================================================
--- cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/EndpointListenerFactory.java (original)
+++ cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/PublishingEndpointListenerFactory.java Fri Nov  2 15:02:01 2012
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Properties;
 import java.util.logging.Logger;
 
+import org.apache.zookeeper.ZooKeeper;
 import org.osgi.framework.Bundle;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.Constants;
@@ -31,25 +32,27 @@ import org.osgi.framework.ServiceRegistr
 import org.osgi.service.remoteserviceadmin.EndpointListener;
 import org.osgi.service.remoteserviceadmin.RemoteConstants;
 
-public class EndpointListenerFactory implements ServiceFactory {
-
+/**
+ * Creates local Endpointlisteners that publish to Zookeeper 
+ */
+public class PublishingEndpointListenerFactory implements ServiceFactory {
     public static final String DISCOVERY_ZOOKEEPER_ID = "org.apache.cxf.dosgi.discovery.zookeeper";
-    
-    private Logger LOG = Logger.getLogger(EndpointListenerFactory.class.getName());
+    private static final Logger LOG = Logger.getLogger(PublishingEndpointListenerFactory.class.getName());
+
     private BundleContext bctx;
-    private ZooKeeperDiscovery discovery;
-    private List<EndpointListenerImpl> listeners = new ArrayList<EndpointListenerImpl>();
+    private ZooKeeper zookeeper;
+    private List<PublishingEndpointListener> listeners = new ArrayList<PublishingEndpointListener>();
     private ServiceRegistration serviceRegistartion;
 
-    public EndpointListenerFactory(ZooKeeperDiscovery zooKeeperDiscovery, BundleContext bctx) {
+    public PublishingEndpointListenerFactory(ZooKeeper zooKeeper, BundleContext bctx) {
         this.bctx = bctx;
-        discovery = zooKeeperDiscovery;
+        this.zookeeper = zooKeeper;
     }
 
     public Object getService(Bundle b, ServiceRegistration sr) {
         LOG.fine("new EndpointListener from factory");
         synchronized (listeners) {
-            EndpointListenerImpl epl = new EndpointListenerImpl(discovery, bctx);
+            PublishingEndpointListener epl = new PublishingEndpointListener(zookeeper, bctx);
             listeners.add(epl);
             return epl;
         }
@@ -59,7 +62,7 @@ public class EndpointListenerFactory imp
         LOG.fine("remove EndpointListener");
         synchronized (listeners) {
             if (listeners.contains(s)) {
-                EndpointListenerImpl epl = (EndpointListenerImpl)s;
+                PublishingEndpointListener epl = (PublishingEndpointListener)s;
                 epl.close();
                 listeners.remove(epl);
             }
@@ -67,22 +70,18 @@ public class EndpointListenerFactory imp
     }
 
     public synchronized void start() {
-        serviceRegistartion = bctx.registerService(EndpointListener.class.getName(), this, null);
-        updateServiceRegistration();
-    }
-
-    private void updateServiceRegistration() {
         Properties props = new Properties();
         props.put(EndpointListener.ENDPOINT_LISTENER_SCOPE, "(&(" + Constants.OBJECTCLASS + "=*)("+RemoteConstants.ENDPOINT_FRAMEWORK_UUID+"="+Util.getUUID(bctx)+"))");
         props.put(DISCOVERY_ZOOKEEPER_ID, "true");
-        serviceRegistartion.setProperties(props);
+        serviceRegistartion = bctx.registerService(EndpointListener.class.getName(), this, props);
     }
 
     public synchronized void stop() {
-        if (serviceRegistartion != null)
+        if (serviceRegistartion != null) {
             serviceRegistartion.unregister();
+        }
         
-        for (EndpointListenerImpl epl : listeners) {
+        for (PublishingEndpointListener epl : listeners) {
             epl.close();
         }
     }
@@ -90,7 +89,7 @@ public class EndpointListenerFactory imp
     /**
      * only for the test case !
      */
-    protected List<EndpointListenerImpl> getListeners(){
+    protected List<PublishingEndpointListener> getListeners(){
         return listeners;
     }
     

Modified: cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/Util.java
URL: http://svn.apache.org/viewvc/cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/Util.java?rev=1404989&r1=1404988&r2=1404989&view=diff
==============================================================================
--- cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/Util.java (original)
+++ cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/Util.java Fri Nov  2 15:02:01 2012
@@ -65,6 +65,7 @@ public class Util {
         }
 
         if (property instanceof Collection) {
+            @SuppressWarnings("rawtypes")
             Collection col = (Collection)property;
             // System.out.println("Collection: size "+col.size());
             String[] ret = new String[col.size()];
@@ -83,8 +84,9 @@ public class Util {
         String[] scopes = Util.getStringPlusProperty(sref.getProperty(EndpointListener.ENDPOINT_LISTENER_SCOPE));
         ArrayList<String> normalizedScopes = new ArrayList<String>(scopes.length);
         for (String scope : scopes) {
-            if(scope!=null && !"".equals(scope))
+            if (scope != null && !"".equals(scope)) {
                 normalizedScopes.add(scope);
+            }
         }
         return normalizedScopes.toArray(new String[normalizedScopes.size()]);
     }

Modified: cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/ZooKeeperDiscovery.java
URL: http://svn.apache.org/viewvc/cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/ZooKeeperDiscovery.java?rev=1404989&r1=1404988&r2=1404989&view=diff
==============================================================================
--- cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/ZooKeeperDiscovery.java (original)
+++ cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/ZooKeeperDiscovery.java Fri Nov  2 15:02:01 2012
@@ -20,111 +20,98 @@ package org.apache.cxf.dosgi.discovery.z
 
 import java.io.IOException;
 import java.util.Dictionary;
+import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.apache.zookeeper.ZooKeeper;
 import org.osgi.framework.BundleContext;
 import org.osgi.service.cm.ConfigurationException;
+import org.osgi.service.cm.ManagedService;
 import org.osgi.service.remoteserviceadmin.EndpointListener;
 import org.osgi.util.tracker.ServiceTracker;
 
-public class ZooKeeperDiscovery implements Watcher {
+public class ZooKeeperDiscovery implements Watcher, ManagedService {
 
     private static final Logger LOG = Logger.getLogger(ZooKeeperDiscovery.class.getName());
 
-    private boolean started = false;
-    
-    private BundleContext bctx;
-    private ZooKeeper zooKeeper;
-    private Dictionary properties = null;
+    private final BundleContext bctx;
 
-    private EndpointListenerFactory endpointListenerFactory;
+    private PublishingEndpointListenerFactory endpointListenerFactory;
     private ServiceTracker endpointListenerTracker;
 
-    private String zkHost;
-    private String zkPort;
-    private int zkTimeout;
+    private InterfaceMonitorManager imManager;
 
-    public ZooKeeperDiscovery(BundleContext bctx, Dictionary initialProps) {
-        this.bctx = bctx;
-        endpointListenerFactory = new EndpointListenerFactory(this, bctx);
-        properties = initialProps;
+    private ZooKeeper zooKeeper;
 
-        endpointListenerTracker = new ServiceTracker(bctx, EndpointListener.class.getName(),
-                                                     new EndpointListenerTrackerCustomizer(this, bctx));
-    }
+    @SuppressWarnings("rawtypes")
+    private Dictionary curConfiguration;
 
-    public synchronized void start() throws IOException, ConfigurationException {
-        if(started) return;
-        started = true;
-        createZooKeeper(properties);
-        
-        // Wait up to 10 seconds for the connection to be established and only register 
-        // the listeners once the connection is established
-        int loops = 100;
-        
-        while(loops>0){
-            if(zooKeeper.getState()==ZooKeeper.States.CONNECTED){
-                break;
-            }
-            --loops;
-            try {
-                Thread.sleep(100);
-            } catch (InterruptedException e) {}
-        }
-        
-        if(zooKeeper.getState()!=ZooKeeper.States.CONNECTED){
-            throw new IOException("Connection to ZookeeperServer failed !");
-        }
-        
-        endpointListenerFactory.start();
-        endpointListenerTracker.open();
-    }
-
-    public synchronized void stop() {
-        if(!started) return;
-        started = false;
-        endpointListenerFactory.stop();
-        endpointListenerTracker.close();
+    public ZooKeeperDiscovery(BundleContext bctx) {
+        this.bctx = bctx;
+        this.curConfiguration = null;
     }
 
+    @SuppressWarnings("rawtypes")
+    public synchronized void updated(Dictionary configuration) throws ConfigurationException {
+        if (LOG.isLoggable(Level.FINE)) {
+            LOG.fine("Received configuration update for Zookeeper Discovery: " + configuration);
+        }
 
-    private void createZooKeeper(Dictionary props) throws IOException, ConfigurationException {
-        zkHost = getProp(props, "zookeeper.host");
-        zkPort = getProp(props, "zookeeper.port");
-        zkTimeout = Integer.parseInt(getProp(props, "zookeeper.timeout", "3000"));
+        synchronized (this) {
+            stop();
+        }
 
-        zooKeeper = createZooKeeper();
+        if (configuration == null) {
+            return;
+        }
+        curConfiguration = configuration;
+        try {
+            zooKeeper = createZooKeeper(configuration);
+        } catch (IOException e) {
+            LOG.log(Level.SEVERE, "Failed to start the Zookeeper Discovery component.", e);
+        }
     }
 
-    // separated for testing
-    ZooKeeper createZooKeeper() throws IOException {
-        return new ZooKeeper(zkHost + ":" + zkPort, zkTimeout, this);
+    private void startModules() {
+        endpointListenerFactory = new PublishingEndpointListenerFactory(zooKeeper, bctx);
+        endpointListenerFactory.start();
+        imManager = new InterfaceMonitorManager(bctx, zooKeeper);
+        EndpointListenerTrackerCustomizer customizer = new EndpointListenerTrackerCustomizer(bctx, imManager);
+        endpointListenerTracker = new ServiceTracker(bctx, EndpointListener.class.getName(), customizer);
+        endpointListenerTracker.open();
     }
 
-    private <T> boolean hasChanged(T orig, T nw) {
-        if (orig == nw) {
-            return false;
+    public synchronized void stop() {
+        if (endpointListenerFactory != null) {
+            endpointListenerFactory.stop();
         }
-
-        if (orig == null) {
-            return true;
+        if (imManager != null) {
+            imManager.close();
+        }
+        if (endpointListenerTracker != null) {
+            endpointListenerTracker.close();
+        }
+        if (zooKeeper != null) {
+            try {
+                zooKeeper.close();
+            } catch (InterruptedException e) {
+                LOG.log(Level.SEVERE, "Error closing zookeeper", e);
+            }
         }
-
-        return !orig.equals(nw);
     }
 
-    private static String getProp(Dictionary props, String key) throws ConfigurationException {
-        String val = getProp(props, key, null);
-        if (val != null) {
-            return val;
-        } else {
-            throw new ConfigurationException(key, "The property " + key + " requires a value");
-        }
+    @SuppressWarnings("rawtypes")
+    private ZooKeeper createZooKeeper(Dictionary props) throws IOException {
+        String zkHost = getProp(props, "zookeeper.host", "localhost");
+        String zkPort = getProp(props, "zookeeper.port", "2181");
+        int zkTimeout = Integer.parseInt(getProp(props, "zookeeper.timeout", "3000"));
+        return new ZooKeeper(zkHost + ":" + zkPort, zkTimeout, this);
     }
 
+    @SuppressWarnings("rawtypes")
     private static String getProp(Dictionary props, String key, String def) {
         Object val = props.get(key);
         String rv;
@@ -139,13 +126,20 @@ public class ZooKeeperDiscovery implemen
     }
 
     /* Callback for ZooKeeper */
-    public void process(WatchedEvent arg0) {
-        // TODO Auto-generated method stub
-
-    }
-
-    protected ZooKeeper getZookeeper() {
-        return zooKeeper;
+    public void process(WatchedEvent event) {
+        KeeperState state = event.getState();
+        if (state == KeeperState.SyncConnected) {
+            LOG.info("Connection to zookeeper established");
+            startModules();
+        }
+        if (state == KeeperState.Expired) {
+            LOG.info("Connection to zookeeper expired. Trying to create a new connection");
+            stop();
+            try {
+                zooKeeper = createZooKeeper(curConfiguration);
+            } catch (IOException e) {
+                LOG.log(Level.SEVERE, "Failed to start the Zookeeper Discovery component.", e);
+            }
+        }
     }
-
 }

Modified: cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/EndpointListenerFactoryTest.java
URL: http://svn.apache.org/viewvc/cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/EndpointListenerFactoryTest.java?rev=1404989&r1=1404988&r2=1404989&view=diff
==============================================================================
--- cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/EndpointListenerFactoryTest.java (original)
+++ cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/EndpointListenerFactoryTest.java Fri Nov  2 15:02:01 2012
@@ -23,14 +23,12 @@ import java.util.Properties;
 
 import junit.framework.TestCase;
 
-import org.easymock.IAnswer;
+import org.apache.zookeeper.ZooKeeper;
 import org.easymock.classextension.EasyMock;
 import org.easymock.classextension.IMocksControl;
 import org.osgi.framework.BundleContext;
-import org.osgi.framework.Constants;
 import org.osgi.framework.ServiceRegistration;
 import org.osgi.service.remoteserviceadmin.EndpointListener;
-import org.osgi.service.remoteserviceadmin.RemoteConstants;
 
 public class EndpointListenerFactoryTest extends TestCase {
 
@@ -38,42 +36,21 @@ public class EndpointListenerFactoryTest
         IMocksControl c = EasyMock.createNiceControl();
 
         BundleContext ctx = c.createMock(BundleContext.class);
-        ZooKeeperDiscovery zkd = c.createMock(ZooKeeperDiscovery.class);
+        ZooKeeper zk = c.createMock(ZooKeeper.class);
         ServiceRegistration sreg = c.createMock(ServiceRegistration.class);
         
-        EndpointListenerFactory eplf = new EndpointListenerFactory(zkd, ctx);
+        PublishingEndpointListenerFactory eplf = new PublishingEndpointListenerFactory(zk, ctx);
 
         EasyMock.expect(
                         ctx.registerService(EasyMock.eq(EndpointListener.class.getName()), EasyMock.eq(eplf),
                                             (Properties)EasyMock.anyObject())).andReturn(sreg).once();
-
-        
-        sreg.setProperties((Properties)EasyMock.anyObject());
-        EasyMock.expectLastCall().andAnswer(new IAnswer<Object>() {
-
-            public Object answer() throws Throwable {
-                Properties p = (Properties)EasyMock.getCurrentArguments()[0];
-                assertNotNull(p);
-                String scope = (String)p.get(EndpointListener.ENDPOINT_LISTENER_SCOPE);
-                assertNotNull(scope);
-                assertEquals("(&(" + Constants.OBJECTCLASS + "=*)(" + RemoteConstants.ENDPOINT_FRAMEWORK_UUID
-                             + "=myUUID))", scope);
-                return null;
-            }
-        }).once();
-
         
         EasyMock.expect(ctx.getProperty(EasyMock.eq("org.osgi.framework.uuid"))).andReturn("myUUID")
             .anyTimes();
-
-        
-        
         
         c.replay();
         eplf.start();
         c.verify();
-
-        
         
         c.reset();
         sreg.unregister();
@@ -89,37 +66,20 @@ public class EndpointListenerFactoryTest
         IMocksControl c = EasyMock.createNiceControl();
         
         BundleContext ctx = c.createMock(BundleContext.class);
-        ZooKeeperDiscovery zkd = c.createMock(ZooKeeperDiscovery.class);
+        ZooKeeper zk = c.createMock(ZooKeeper.class);
         ServiceRegistration sreg = c.createMock(ServiceRegistration.class);
         
-        EndpointListenerFactory eplf = new EndpointListenerFactory(zkd, ctx);
+        PublishingEndpointListenerFactory eplf = new PublishingEndpointListenerFactory(zk, ctx);
 
         EasyMock.expect(
                         ctx.registerService(EasyMock.eq(EndpointListener.class.getName()), EasyMock.eq(eplf),
                                             (Properties)EasyMock.anyObject())).andReturn(sreg).once();
 
         
-        sreg.setProperties((Properties)EasyMock.anyObject());
-        EasyMock.expectLastCall().andAnswer(new IAnswer<Object>() {
-
-            public Object answer() throws Throwable {
-                Properties p = (Properties)EasyMock.getCurrentArguments()[0];
-                assertNotNull(p);
-                String scope = (String)p.get(EndpointListener.ENDPOINT_LISTENER_SCOPE);
-                assertNotNull(scope);
-                assertEquals("(&(" + Constants.OBJECTCLASS + "=*)(" + RemoteConstants.ENDPOINT_FRAMEWORK_UUID
-                             + "=myUUID))", scope);
-                return null;
-            }
-        }).once();
-
-        
         EasyMock.expect(ctx.getProperty(EasyMock.eq("org.osgi.framework.uuid"))).andReturn("myUUID")
             .anyTimes();
 
-        
-
-        EndpointListenerImpl eli = c.createMock(EndpointListenerImpl.class);
+        PublishingEndpointListener eli = c.createMock(PublishingEndpointListener.class);
         eli.close();
         EasyMock.expectLastCall().once();
         
@@ -131,7 +91,7 @@ public class EndpointListenerFactoryTest
         assertNotNull(service);
         assertTrue(service instanceof EndpointListener);
 
-        List<EndpointListenerImpl> listeners = eplf.getListeners();
+        List<PublishingEndpointListener> listeners = eplf.getListeners();
         assertEquals(1, listeners.size());
         assertEquals(service, listeners.get(0));
         

Modified: cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/EndpointListenerImplTest.java
URL: http://svn.apache.org/viewvc/cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/EndpointListenerImplTest.java?rev=1404989&r1=1404988&r2=1404989&view=diff
==============================================================================
--- cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/EndpointListenerImplTest.java (original)
+++ cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/EndpointListenerImplTest.java Fri Nov  2 15:02:01 2012
@@ -48,13 +48,8 @@ public class EndpointListenerImplTest ex
         IMocksControl c = EasyMock.createNiceControl();
 
         BundleContext ctx = c.createMock(BundleContext.class);
-        ZooKeeperDiscovery zkd = c.createMock(ZooKeeperDiscovery.class);
-
         ZooKeeper zk = c.createMock(ZooKeeper.class);
 
-        EasyMock.expect(zkd.getZookeeper()).andReturn(zk).anyTimes();
-
-
         String path = "/osgi/service_registry/myClass/google.de#80##test";
         EasyMock.expect(
                         zk.create(EasyMock.eq(path),
@@ -66,7 +61,7 @@ public class EndpointListenerImplTest ex
 
         c.replay();
 
-        EndpointListenerImpl eli = new EndpointListenerImpl(zkd, ctx);
+        PublishingEndpointListener eli = new PublishingEndpointListener(zk, ctx);
 
         Map<String, Object> props = new HashMap<String, Object>();
         props.put(Constants.OBJECTCLASS, new String[] {
@@ -134,18 +129,12 @@ public class EndpointListenerImplTest ex
         String expectedFullPath = "/osgi/service_registry/org/foo/myClass/some.machine#9876##test";
         EasyMock.expect(zk.create(
                 EasyMock.eq(expectedFullPath),
-                EasyMock.aryEq(EndpointListenerImpl.getData(expectedProps)),
+                EasyMock.aryEq(PublishingEndpointListener.getData(expectedProps)),
                 EasyMock.eq(Ids.OPEN_ACL_UNSAFE),
                 EasyMock.eq(CreateMode.EPHEMERAL))).andReturn("");
         EasyMock.replay(zk);
-        ZooKeeperDiscovery zkd = new ZooKeeperDiscovery(ctx, null) {
-            @Override
-            protected ZooKeeper getZookeeper() {
-                return zk;
-            }
-        };
 
-        EndpointListenerImpl eli = new EndpointListenerImpl(zkd, ctx);
+        PublishingEndpointListener eli = new PublishingEndpointListener(zk, ctx);
 
         List<EndpointDescription> endpointList = getEndpointsList(eli);
         assertEquals("Precondition", 0, endpointList.size());
@@ -157,7 +146,7 @@ public class EndpointListenerImplTest ex
 
 
     @SuppressWarnings("unchecked")
-    private List<EndpointDescription> getEndpointsList(EndpointListenerImpl eli) throws Exception {
+    private List<EndpointDescription> getEndpointsList(PublishingEndpointListener eli) throws Exception {
         Field field = eli.getClass().getDeclaredField("endpoints");
         field.setAccessible(true);
         return (List<EndpointDescription>) field.get(eli);
@@ -168,13 +157,8 @@ public class EndpointListenerImplTest ex
         IMocksControl c = EasyMock.createNiceControl();
 
         BundleContext ctx = c.createMock(BundleContext.class);
-        ZooKeeperDiscovery zkd = c.createMock(ZooKeeperDiscovery.class);
-
         ZooKeeper zk = c.createMock(ZooKeeper.class);
 
-        EasyMock.expect(zkd.getZookeeper()).andReturn(zk).anyTimes();
-
-
         String path = "/osgi/service_registry/myClass/google.de#80##test";
         EasyMock.expect(
                         zk.create(EasyMock.eq(path),
@@ -186,7 +170,7 @@ public class EndpointListenerImplTest ex
 
         c.replay();
 
-        EndpointListenerImpl eli = new EndpointListenerImpl(zkd, ctx);
+        PublishingEndpointListener eli = new PublishingEndpointListener(zk, ctx);
 
         Map<String, Object> props = new HashMap<String, Object>();
         props.put(Constants.OBJECTCLASS, new String[] {
@@ -208,7 +192,7 @@ public class EndpointListenerImplTest ex
 
     public void testGetKey() throws Exception {
         assertEquals("somehost#9090##org#example#TestEndpoint",
-            EndpointListenerImpl.getKey("http://somehost:9090/org/example/TestEndpoint"));
+            PublishingEndpointListener.getKey("http://somehost:9090/org/example/TestEndpoint"));
     }
 
 }

Copied: cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/InterfaceMonitorManagerTest.java (from r1402482, cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/EndpointListenerTrackerCustomizerTest.java)
URL: http://svn.apache.org/viewvc/cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/InterfaceMonitorManagerTest.java?p2=cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/InterfaceMonitorManagerTest.java&p1=cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/EndpointListenerTrackerCustomizerTest.java&r1=1402482&r2=1404989&rev=1404989&view=diff
==============================================================================
--- cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/EndpointListenerTrackerCustomizerTest.java (original)
+++ cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/InterfaceMonitorManagerTest.java Fri Nov  2 15:02:01 2012
@@ -23,29 +23,27 @@ import java.util.Properties;
 
 import junit.framework.TestCase;
 
-import org.apache.cxf.dosgi.discovery.zookeeper.EndpointListenerTrackerCustomizer.Interest;
+import org.apache.zookeeper.ZooKeeper;
 import org.easymock.IAnswer;
 import org.easymock.classextension.EasyMock;
 import org.easymock.classextension.IMocksControl;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.ServiceReference;
-import org.osgi.service.remoteserviceadmin.EndpointListener;
 
-public class EndpointListenerTrackerCustomizerTest extends TestCase{
+public class InterfaceMonitorManagerTest extends TestCase {
     
     public void testEndpointListenerTrackerCustomizer(){
         
         IMocksControl c = EasyMock.createNiceControl();
         
         BundleContext ctx = c.createMock(BundleContext.class);
-        ZooKeeperDiscovery zkd = c.createMock(ZooKeeperDiscovery.class);
+        ZooKeeper zk = c.createMock(ZooKeeper.class);
         
         ServiceReference sref = c.createMock(ServiceReference.class);
         ServiceReference sref2 = c.createMock(ServiceReference.class);
         
         final Properties p = new Properties(); 
         
-        
         EasyMock.expect(sref.getPropertyKeys()).andAnswer(new IAnswer<String[]>() {
             public String[] answer() throws Throwable {
                 return p.keySet().toArray(new String[p.keySet().size()]);
@@ -75,8 +73,8 @@ public class EndpointListenerTrackerCust
         
         final ArrayList<IMocksControl> controls = new ArrayList<IMocksControl>();
         
-        EndpointListenerTrackerCustomizer eltc = new EndpointListenerTrackerCustomizer(zkd,ctx){
-            protected InterfaceMonitor createInterfaceMonitor(String scope, String objClass, Interest interest){
+        InterfaceMonitorManager eltc = new InterfaceMonitorManager(ctx, zk){
+            protected InterfaceMonitor createInterfaceMonitor(String scope, String objClass, Interest interest) {
                 IMocksControl lc = EasyMock.createNiceControl();
                 InterfaceMonitor im = lc.createMock(InterfaceMonitor.class);
                 im.start();
@@ -91,14 +89,14 @@ public class EndpointListenerTrackerCust
         
         c.replay();
         
-        eltc.addingService(sref); // sref has no scope -> nothing should happen
+        // sref has no scope -> nothing should happen
         
         assertEquals(0, eltc.getHandledEndpointlisteners().size());
         assertEquals(0, eltc.getInterestingScopes().size());
         
-        p.put(EndpointListener.ENDPOINT_LISTENER_SCOPE, "(objectClass=mine)");
+        //p.put(EndpointListener.ENDPOINT_LISTENER_SCOPE, );
         
-        eltc.addingService(sref); 
+        eltc.addInterest(sref, "(objectClass=mine)", "mine"); 
         
         assertEquals(1, eltc.getHandledEndpointlisteners().size());
         assertEquals(1, eltc.getHandledEndpointlisteners().get(sref).size());
@@ -106,14 +104,14 @@ public class EndpointListenerTrackerCust
         assertEquals(1, eltc.getInterestingScopes().size());
         
         
-        eltc.addingService(sref); 
+        eltc.addInterest(sref, "(objectClass=mine)", "mine");
         
         assertEquals(1, eltc.getHandledEndpointlisteners().size());
         assertEquals(1, eltc.getHandledEndpointlisteners().get(sref).size());
         assertEquals("(objectClass=mine)", eltc.getHandledEndpointlisteners().get(sref).get(0));
         assertEquals(1, eltc.getInterestingScopes().size());
         
-        eltc.addingService(sref2); 
+        eltc.addInterest(sref2, "(objectClass=mine)", "mine");
         
         assertEquals(2, eltc.getHandledEndpointlisteners().size());
         assertEquals(1, eltc.getHandledEndpointlisteners().get(sref).size());
@@ -123,22 +121,21 @@ public class EndpointListenerTrackerCust
         assertEquals(1, eltc.getInterestingScopes().size());
         
         
-        eltc.removedService(sref, null);
+        eltc.removeInterest(sref);
         
         assertEquals(1, eltc.getHandledEndpointlisteners().size());
         assertEquals(1, eltc.getHandledEndpointlisteners().get(sref2).size());
         assertEquals("(objectClass=mine)", eltc.getHandledEndpointlisteners().get(sref2).get(0));
         assertEquals(1, eltc.getInterestingScopes().size());
         
-        eltc.removedService(sref, null);
+        eltc.removeInterest(sref);
         
         assertEquals(1, eltc.getHandledEndpointlisteners().size());
         assertEquals(1, eltc.getHandledEndpointlisteners().get(sref2).size());
         assertEquals("(objectClass=mine)", eltc.getHandledEndpointlisteners().get(sref2).get(0));
         assertEquals(1, eltc.getInterestingScopes().size());
         
-        
-        eltc.removedService(sref2, null);
+        eltc.removeInterest(sref2);
         
         assertEquals(0, eltc.getHandledEndpointlisteners().size());
         assertEquals(0, eltc.getInterestingScopes().size());

Modified: cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/InterfaceMonitorTest.java
URL: http://svn.apache.org/viewvc/cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/InterfaceMonitorTest.java?rev=1404989&r1=1404988&r2=1404989&view=diff
==============================================================================
--- cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/InterfaceMonitorTest.java (original)
+++ cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/InterfaceMonitorTest.java Fri Nov  2 15:02:01 2012
@@ -18,74 +18,52 @@
  */
 package org.apache.cxf.dosgi.discovery.zookeeper;
 
-import java.util.Collection;
+import static org.easymock.EasyMock.eq;
+import static org.easymock.EasyMock.expect;
+
 import java.util.Collections;
 
 import junit.framework.TestCase;
 
-import org.apache.cxf.dosgi.discovery.zookeeper.EndpointListenerTrackerCustomizer.Interest;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.Watcher.Event.EventType;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.Stat;
 import org.easymock.classextension.EasyMock;
 import org.easymock.classextension.IMocksControl;
 import org.osgi.framework.BundleContext;
+import org.osgi.service.remoteserviceadmin.EndpointListener;
 
 public class InterfaceMonitorTest extends TestCase {
 
     public void testInterfaceMonitor() throws KeeperException, InterruptedException {
 
-        IMocksControl c = EasyMock.createNiceControl();
+        IMocksControl c = EasyMock.createControl();
 
         BundleContext ctx = c.createMock(BundleContext.class);
-        ZooKeeperDiscovery zkd = c.createMock(ZooKeeperDiscovery.class);
-
         ZooKeeper zk = c.createMock(ZooKeeper.class);
-        EasyMock.expect(zk.getState()).andReturn(ZooKeeper.States.CONNECTED).anyTimes();
-
-        EasyMock.expect(zkd.getZookeeper()).andReturn(zk).anyTimes();
-
-        EndpointListenerTrackerCustomizer.Interest interest = new EndpointListenerTrackerCustomizer.Interest();
+        expect(zk.getState()).andReturn(ZooKeeper.States.CONNECTED).anyTimes();
 
         String scope = "(myProp=test)";
         String interf = "es.schaaf.test";
         String node = Util.getZooKeeperPath(interf);
 
-        final InterfaceDataMonitorListenerImpl idmli = c.createMock(InterfaceDataMonitorListenerImpl.class);
-
-        InterfaceMonitor im = new InterfaceMonitor(zk, interf, interest, scope, ctx) {
-            @Override
-            protected InterfaceDataMonitorListenerImpl createInterfaceDataMonitorListener(ZooKeeper zk,
-                                                                                          String intf,
-                                                                                          Interest zkd,
-                                                                                          String scope,
-                                                                                          BundleContext bctx) {
-                return idmli;
-            }
-        };
-
-        idmli.change();
-        EasyMock.expectLastCall().once();
-
-        zk.exists(EasyMock.eq(node), EasyMock.eq(im), EasyMock.eq(im), EasyMock.anyObject());
+        EndpointListener epListener = c.createMock(EndpointListener.class);
+        InterfaceMonitor im = new InterfaceMonitor(zk, interf, epListener, scope, ctx);
+        zk.exists(eq(node), eq(im), eq(im), EasyMock.anyObject());
         EasyMock.expectLastCall().once();
 
-        EasyMock.expect(zk.exists(EasyMock.eq(node), EasyMock.eq(false))).andReturn(new Stat()).anyTimes();
-
-        EasyMock.expect(zk.getChildren(EasyMock.eq(node), EasyMock.eq(im))).andReturn(Collections.EMPTY_LIST)
-            .once();
+        expect(zk.exists(eq(node), eq(false))).andReturn(new Stat()).anyTimes();
+        expect(zk.getChildren(eq(node), eq(false))).andReturn(Collections.<String> emptyList()).once();
+        expect(zk.getChildren(eq(node), eq(im))).andReturn(Collections.<String> emptyList()).once();
 
         c.replay();
-
         im.start();
-
         // simulate a zk callback
         WatchedEvent we = new WatchedEvent(EventType.NodeCreated, KeeperState.SyncConnected, node);
         im.process(we);
-
         c.verify();
     }
 }

Modified: cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/UtilTest.java
URL: http://svn.apache.org/viewvc/cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/UtilTest.java?rev=1404989&r1=1404988&r2=1404989&view=diff
==============================================================================
--- cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/UtilTest.java (original)
+++ cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/UtilTest.java Fri Nov  2 15:02:01 2012
@@ -64,6 +64,7 @@ public class UtilTest extends TestCase {
     }
     
     
+    @SuppressWarnings("unchecked")
     public void testGetStringPlusProperty() {
         Object in = "MyString";
         String[] out = Util.getStringPlusProperty(in);