You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aries.apache.org by cs...@apache.org on 2018/02/06 15:59:26 UTC

[1/2] aries-rsa git commit: [ARIES-1771] Support endpoint changes in zookeeper discovery

Repository: aries-rsa
Updated Branches:
  refs/heads/master b7597a4f1 -> 389bae151


[ARIES-1771] Support endpoint changes in zookeeper discovery


Project: http://git-wip-us.apache.org/repos/asf/aries-rsa/repo
Commit: http://git-wip-us.apache.org/repos/asf/aries-rsa/commit/025516f2
Tree: http://git-wip-us.apache.org/repos/asf/aries-rsa/tree/025516f2
Diff: http://git-wip-us.apache.org/repos/asf/aries-rsa/diff/025516f2

Branch: refs/heads/master
Commit: 025516f2cbd1a361a588755db1b8961668ce0c8d
Parents: b7597a4
Author: Christian Schneider <cs...@adobe.com>
Authored: Tue Feb 6 16:54:28 2018 +0100
Committer: Christian Schneider <cs...@adobe.com>
Committed: Tue Feb 6 16:54:28 2018 +0100

----------------------------------------------------------------------
 .../discovery/zookeeper/ZooKeeperDiscovery.java |   3 +-
 .../publish/PublishingEndpointListener.java     |  50 +++++++--
 .../PublishingEndpointListenerFactory.java      |   5 +-
 .../subscribe/EndpointListenerTracker.java      |  25 ++++-
 .../zookeeper/subscribe/InterfaceMonitor.java   |  17 ++-
 .../subscribe/InterfaceMonitorManager.java      | 106 +++++++++++--------
 .../PublishingEndpointListenerFactoryTest.java  |  61 ++++++-----
 .../subscribe/InterfaceMonitorTest.java         |   4 +
 8 files changed, 183 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/025516f2/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java
index 0e03722..584da35 100644
--- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java
@@ -33,7 +33,6 @@ import org.apache.zookeeper.ZooKeeper;
 import org.osgi.framework.BundleContext;
 import org.osgi.service.cm.ConfigurationException;
 import org.osgi.service.cm.ManagedService;
-import org.osgi.service.remoteserviceadmin.EndpointEventListener;
 import org.osgi.util.tracker.ServiceTracker;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,7 +46,7 @@ public class ZooKeeperDiscovery implements Watcher, ManagedService {
     private final BundleContext bctx;
 
     private PublishingEndpointListenerFactory endpointListenerFactory;
-    private ServiceTracker<EndpointEventListener, EndpointEventListener> endpointListenerTracker;
+    private ServiceTracker<?, ?> endpointListenerTracker;
     private InterfaceMonitorManager imManager;
     private ZooKeeper zkClient;
     private boolean closed;

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/025516f2/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListener.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListener.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListener.java
index 82387ba..d5fe7a6 100644
--- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListener.java
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListener.java
@@ -38,10 +38,12 @@ import org.apache.zookeeper.KeeperException.NoNodeException;
 import org.apache.zookeeper.KeeperException.NodeExistsException;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
 import org.osgi.framework.BundleContext;
 import org.osgi.service.remoteserviceadmin.EndpointDescription;
 import org.osgi.service.remoteserviceadmin.EndpointEvent;
 import org.osgi.service.remoteserviceadmin.EndpointEventListener;
+import org.osgi.service.remoteserviceadmin.EndpointListener;
 import org.osgi.util.tracker.ServiceTracker;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -49,7 +51,8 @@ import org.slf4j.LoggerFactory;
 /**
  * Listens for local Endpoints and publishes them to ZooKeeper.
  */
-public class PublishingEndpointListener implements EndpointEventListener {
+@SuppressWarnings("deprecation")
+public class PublishingEndpointListener implements EndpointEventListener, EndpointListener {
 
     private static final Logger LOG = LoggerFactory.getLogger(PublishingEndpointListener.class);
 
@@ -79,13 +82,45 @@ public class PublishingEndpointListener implements EndpointEventListener {
             endpointRemoved(endpoint, filter);
             break;
         case EndpointEvent.MODIFIED:
-            endpointRemoved(endpoint, filter);
-            endpointAdded(endpoint, filter);
+            endpointModified(endpoint, filter);
             break;
         }
     }
     
-    private void endpointAdded(EndpointDescription endpoint, String matchedFilter) {
+    private void endpointModified(EndpointDescription endpoint, String filter) {
+        try {
+            modifyEndpoint(endpoint);
+        } catch (Exception e) {
+            LOG.error("Error modifying endpoint data in zookeeper for endpoint {}", endpoint.getId(), e);
+        }
+    }
+
+    private void modifyEndpoint(EndpointDescription endpoint) throws URISyntaxException, KeeperException, InterruptedException {
+        Collection<String> interfaces = endpoint.getInterfaces();
+        String endpointKey = getKey(endpoint);
+        Map<String, Object> props = new HashMap<String, Object>(endpoint.getProperties());
+
+        // process plugins
+        Object[] plugins = discoveryPluginTracker.getServices();
+        if (plugins != null) {
+            for (Object plugin : plugins) {
+                if (plugin instanceof DiscoveryPlugin) {
+                    endpointKey = ((DiscoveryPlugin)plugin).process(props, endpointKey);
+                }
+            }
+        }
+        LOG.info("Changing endpoint in zookeeper: {}", endpoint);
+        for (String name : interfaces) {
+            String path = Utils.getZooKeeperPath(name);
+            String fullPath = path + '/' + endpointKey;
+            LOG.info("Changing ZooKeeper node for service with path {}", fullPath);
+            createPath(path, zk);
+            zk.setData(fullPath, getData(endpoint), -1);
+        }
+    }
+
+    @Override
+    public void endpointAdded(EndpointDescription endpoint, String matchedFilter) {
         synchronized (endpoints) {
             if (closed) {
                 return;
@@ -153,7 +188,8 @@ public class PublishingEndpointListener implements EndpointEventListener {
         }
     }
 
-    private void endpointRemoved(EndpointDescription endpoint, String matchedFilter) {
+    @Override
+    public void endpointRemoved(EndpointDescription endpoint, String matchedFilter) {
         LOG.info("Local EndpointDescription removed: {}", endpoint);
 
         synchronized (endpoints) {
@@ -195,7 +231,9 @@ public class PublishingEndpointListener implements EndpointEventListener {
             current.append('/');
             current.append(part);
             try {
-                zk.create(current.toString(), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+                if (zk.exists(current.toString(), false) == null) {
+                    zk.create(current.toString(), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+                }
             } catch (NodeExistsException nee) {
                 // it's not the first node with this path to ever exist - that's normal
             }

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/025516f2/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerFactory.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerFactory.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerFactory.java
index 8a40b92..444f7bb 100644
--- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerFactory.java
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerFactory.java
@@ -31,6 +31,7 @@ import org.osgi.framework.Constants;
 import org.osgi.framework.ServiceFactory;
 import org.osgi.framework.ServiceRegistration;
 import org.osgi.service.remoteserviceadmin.EndpointEventListener;
+import org.osgi.service.remoteserviceadmin.EndpointListener;
 import org.osgi.service.remoteserviceadmin.RemoteConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,6 +39,7 @@ import org.slf4j.LoggerFactory;
 /**
  * Creates local EndpointListeners that publish to ZooKeeper.
  */
+@SuppressWarnings("deprecation")
 public class PublishingEndpointListenerFactory implements ServiceFactory<PublishingEndpointListener> {
 
     private static final Logger LOG = LoggerFactory.getLogger(PublishingEndpointListenerFactory.class);
@@ -78,7 +80,8 @@ public class PublishingEndpointListenerFactory implements ServiceFactory<Publish
                   String.format("(&(%s=*)(%s=%s))", Constants.OBJECTCLASS, 
                                 RemoteConstants.ENDPOINT_FRAMEWORK_UUID, uuid));
         props.put(ZooKeeperDiscovery.DISCOVERY_ZOOKEEPER_ID, "true");
-        serviceRegistration = bctx.registerService(EndpointEventListener.class.getName(), this, props);
+        String[] ifAr = {EndpointEventListener.class.getName(), EndpointListener.class.getName()};
+        serviceRegistration = bctx.registerService(ifAr, this, props);
     }
     
     public synchronized void stop() {

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/025516f2/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/EndpointListenerTracker.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/EndpointListenerTracker.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/EndpointListenerTracker.java
index 6e6ed1b..0ed1097 100644
--- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/EndpointListenerTracker.java
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/EndpointListenerTracker.java
@@ -19,37 +19,52 @@
 package org.apache.aries.rsa.discovery.zookeeper.subscribe;
 
 import org.osgi.framework.BundleContext;
+import org.osgi.framework.Filter;
+import org.osgi.framework.FrameworkUtil;
+import org.osgi.framework.InvalidSyntaxException;
 import org.osgi.framework.ServiceReference;
 import org.osgi.service.remoteserviceadmin.EndpointEventListener;
+import org.osgi.service.remoteserviceadmin.EndpointListener;
 import org.osgi.util.tracker.ServiceTracker;
 
 /**
  * Tracks interest in EndpointListeners. Delegates to InterfaceMonitorManager to manage
  * interest in the scopes of each EndpointListener.
  */
-public class EndpointListenerTracker extends ServiceTracker<EndpointEventListener, EndpointEventListener> {
+@SuppressWarnings({ "rawtypes", "deprecation", "unchecked" })
+public class EndpointListenerTracker extends ServiceTracker {
     private final InterfaceMonitorManager imManager;
 
     public EndpointListenerTracker(BundleContext bctx, InterfaceMonitorManager imManager) {
-        super(bctx, EndpointEventListener.class, null);
+        super(bctx, getfilter(), null);
         this.imManager = imManager;
     }
+    
+    private static Filter getfilter() {
+        String filterSt = String.format("(|(objectClass=%s)(objectClass=%s))", EndpointEventListener.class.getName(), 
+                EndpointListener.class.getName());
+        try {
+            return FrameworkUtil.createFilter(filterSt);
+        } catch (InvalidSyntaxException e) {
+            throw new IllegalArgumentException(e.getMessage(), e);
+        }
+    }
 
     @Override
-    public EndpointEventListener addingService(ServiceReference<EndpointEventListener> endpointListener) {
+    public Object addingService(ServiceReference endpointListener) {
         imManager.addInterest(endpointListener);
         return null;
     }
 
     @Override
-    public void modifiedService(ServiceReference<EndpointEventListener> endpointListener, EndpointEventListener service) {
+    public void modifiedService(ServiceReference endpointListener, Object service) {
         // called when an EndpointListener updates its service properties,
         // e.g. when its interest scope is expanded/reduced
         imManager.addInterest(endpointListener);
     }
 
     @Override
-    public void removedService(ServiceReference<EndpointEventListener> endpointListener, EndpointEventListener service) {
+    public void removedService(ServiceReference endpointListener, Object service) {
         imManager.removeInterest(endpointListener);
     }
 

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/025516f2/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitor.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitor.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitor.java
index 6972989..7078bb8 100644
--- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitor.java
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitor.java
@@ -91,6 +91,13 @@ public class InterfaceMonitor implements Watcher, StatCallback {
     private void watch() {
         LOG.debug("registering a ZooKeeper.exists({}) callback", znode);
         zk.exists(znode, this, this, null);
+        zk.getData(znode, this, new DataCallback() {
+            
+            @Override
+            public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
+                processDelta();
+            }
+        }, null);
     }
 
     /**
@@ -199,18 +206,22 @@ public class InterfaceMonitor implements Watcher, StatCallback {
                 EndpointDescription endpoint = getEndpointDescriptionFromNode(childZNode);
                 if (endpoint != null) {
                     EndpointDescription prevEndpoint = prevNodes.get(child);
-                    LOG.info("found new node " + zn + "/[" + child + "]   ( []->child )  props: "
-                            + endpoint.getProperties().values());
+                    
                     newNodes.put(child, endpoint);
                     prevNodes.remove(child);
                     foundANode = true;
                     LOG.debug("Properties: {}", endpoint.getProperties());
                     if (prevEndpoint == null) {
                         // This guy is new
+                        LOG.info("found new node " + zn + "/[" + child + "]   ( []->child )  props: "
+                                + endpoint.getProperties().values());
                         EndpointEvent event = new EndpointEvent(EndpointEvent.ADDED, endpoint);
                         endpointListener.endpointChanged(event, null);
                     } else if (!prevEndpoint.getProperties().equals(endpoint.getProperties())) {
-                        // TODO
+                        LOG.info("Found changed node " + zn + "/[" + child + "]   ( []->child )  props: "
+                                + endpoint.getProperties().values());
+                        EndpointEvent event = new EndpointEvent(EndpointEvent.MODIFIED, endpoint);
+                        endpointListener.endpointChanged(event, null);
                     }
                 }
                 if (recursive && processChildren(childZNode, newNodes, prevNodes)) {

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/025516f2/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManager.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManager.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManager.java
index 26e4462..7d6e4ae 100644
--- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManager.java
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManager.java
@@ -33,13 +33,13 @@ import org.apache.aries.rsa.discovery.zookeeper.ZooKeeperDiscovery;
 import org.apache.aries.rsa.discovery.zookeeper.util.Utils;
 import org.apache.aries.rsa.util.StringPlus;
 import org.apache.zookeeper.ZooKeeper;
-import org.osgi.framework.Bundle;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.Filter;
 import org.osgi.framework.ServiceReference;
 import org.osgi.service.remoteserviceadmin.EndpointDescription;
 import org.osgi.service.remoteserviceadmin.EndpointEvent;
 import org.osgi.service.remoteserviceadmin.EndpointEventListener;
+import org.osgi.service.remoteserviceadmin.EndpointListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,6 +49,7 @@ import org.slf4j.LoggerFactory;
  * The InterfaceMonitor calls back when it detects added or removed external Endpoints.
  * These events are then forwarded to all interested EndpointEventListeners.
  */
+@SuppressWarnings({"deprecation", "rawtypes"})
 public class InterfaceMonitorManager {
     private static final Logger LOG = LoggerFactory.getLogger(InterfaceMonitorManager.class);
     private static final Pattern OBJECTCLASS_PATTERN = Pattern.compile(".*\\(objectClass=([^)]+)\\).*");
@@ -56,14 +57,14 @@ public class InterfaceMonitorManager {
     private final BundleContext bctx;
     private final ZooKeeper zk;
     // map of EndpointEventListeners and the scopes they are interested in
-    private final Map<ServiceReference<EndpointEventListener>, List<String>> EndpointEventListenerScopes =
-            new HashMap<ServiceReference<EndpointEventListener>, List<String>>();
+    private final Map<ServiceReference, List<String>> epListenerScopes =
+            new HashMap<ServiceReference, List<String>>();
     // map of scopes and their interest data
     private final Map<String, Interest> interests = new HashMap<String, Interest>();
 
     protected static class Interest {
-        List<ServiceReference<EndpointEventListener>> EndpointEventListeners = 
-            new CopyOnWriteArrayList<ServiceReference<EndpointEventListener>>();
+        List<ServiceReference> epListeners = 
+            new CopyOnWriteArrayList<ServiceReference>();
         InterfaceMonitor monitor;
     }
 
@@ -72,26 +73,26 @@ public class InterfaceMonitorManager {
         this.zk = zk;
     }
 
-    public void addInterest(ServiceReference<EndpointEventListener> EndpointEventListener) {
-        if (isOurOwnEndpointEventListener(EndpointEventListener)) {
+    public void addInterest(ServiceReference<?> eplistener) {
+        if (isOurOwnEndpointEventListener(eplistener)) {
             LOG.debug("Skipping our own EndpointEventListener");
             return;
         }
-        List<String> scopes = getScopes(EndpointEventListener);
+        List<String> scopes = getScopes(eplistener);
         LOG.debug("adding Interests: {}", scopes);
         
         for (String scope : scopes) {
             String objClass = getObjectClass(scope);
-            addInterest(EndpointEventListener, scope, objClass);
+            addInterest(eplistener, scope, objClass);
         }
     }
 
-    private static boolean isOurOwnEndpointEventListener(ServiceReference<EndpointEventListener> EndpointEventListener) {
+    private static boolean isOurOwnEndpointEventListener(ServiceReference<?> EndpointEventListener) {
         return Boolean.parseBoolean(String.valueOf(
                 EndpointEventListener.getProperty(ZooKeeperDiscovery.DISCOVERY_ZOOKEEPER_ID)));
     }
 
-    public synchronized void addInterest(ServiceReference<EndpointEventListener> EndpointEventListener, 
+    public synchronized void addInterest(ServiceReference epListener, 
                                          String scope, String objClass) {
         // get or create interest for given scope and add listener to it
         Interest interest = interests.get(scope);
@@ -99,27 +100,27 @@ public class InterfaceMonitorManager {
             // create interest, add listener and start monitor
             interest = new Interest();
             interests.put(scope, interest);
-            interest.EndpointEventListeners.add(EndpointEventListener); // add it before monitor starts so we don't miss events
+            interest.epListeners.add(epListener); // add it before monitor starts so we don't miss events
             interest.monitor = createInterfaceMonitor(scope, objClass, interest);
             interest.monitor.start();
         } else {
             // interest already exists, so just add listener to it
-            if (!interest.EndpointEventListeners.contains(EndpointEventListener)) {
-                interest.EndpointEventListeners.add(EndpointEventListener);
+            if (!interest.epListeners.contains(epListener)) {
+                interest.epListeners.add(epListener);
             }
             // notify listener of all known endpoints for given scope
             // (as EndpointEventListener contract requires of all added/modified listeners)
             for (EndpointDescription endpoint : interest.monitor.getEndpoints()) {
                 EndpointEvent event = new EndpointEvent(EndpointEvent.ADDED, endpoint);
-                notifyListeners(event, scope, Arrays.asList(EndpointEventListener));
+                notifyListeners(event, scope, Arrays.asList(epListener));
             }
         }
 
         // add scope to listener's scopes list
-        List<String> scopes = EndpointEventListenerScopes.get(EndpointEventListener);
+        List<String> scopes = epListenerScopes.get(epListener);
         if (scopes == null) {
             scopes = new ArrayList<String>(1);
-            EndpointEventListenerScopes.put(EndpointEventListener, scopes);
+            epListenerScopes.put(epListener, scopes);
         }
         if (!scopes.contains(scope)) {
             scopes.add(scope);
@@ -128,7 +129,7 @@ public class InterfaceMonitorManager {
 
     public synchronized void removeInterest(ServiceReference<EndpointEventListener> EndpointEventListener) {
         LOG.info("removing EndpointEventListener interests: {}", EndpointEventListener);
-        List<String> scopes = EndpointEventListenerScopes.get(EndpointEventListener);
+        List<String> scopes = epListenerScopes.get(EndpointEventListener);
         if (scopes == null) {
             return;
         }
@@ -136,14 +137,14 @@ public class InterfaceMonitorManager {
         for (String scope : scopes) {
             Interest interest = interests.get(scope);
             if (interest != null) {
-                interest.EndpointEventListeners.remove(EndpointEventListener);
-                if (interest.EndpointEventListeners.isEmpty()) {
+                interest.epListeners.remove(EndpointEventListener);
+                if (interest.epListeners.isEmpty()) {
                     interest.monitor.close();
                     interests.remove(scope);
                 }
             }
         }
-        EndpointEventListenerScopes.remove(EndpointEventListener);
+        epListenerScopes.remove(EndpointEventListener);
     }
 
     protected InterfaceMonitor createInterfaceMonitor(final String scope, String objClass, final Interest interest) {
@@ -152,30 +153,37 @@ public class InterfaceMonitorManager {
 
             @Override
             public void endpointChanged(EndpointEvent event, String filter) {
-                notifyListeners(event, scope, interest.EndpointEventListeners);
+                notifyListeners(event, scope, interest.epListeners);
             }
         };
         return new InterfaceMonitor(zk, objClass, listener, scope);
     }
 
     private void notifyListeners(EndpointEvent event, String currentScope,
-            List<ServiceReference<EndpointEventListener>> EndpointEventListeners) {
+            List<ServiceReference> epListeners) {
         EndpointDescription endpoint = event.getEndpoint();
-        for (ServiceReference<EndpointEventListener> EndpointEventListenerRef : EndpointEventListeners) {
-            EndpointEventListener service = bctx.getService(EndpointEventListenerRef);
+        for (ServiceReference<?> epListenerRef : epListeners) {
+            if (epListenerRef.getBundle() == null) {
+                LOG.info("listening service was unregistered, ignoring");
+            }
+            Object service = bctx.getService(epListenerRef);
+            LOG.trace("matching {} against {}", endpoint, currentScope);
+            if (matchFilter(bctx, currentScope, endpoint)) {
+                LOG.debug("Matched {} against {}", endpoint, currentScope);
             try {
-                EndpointEventListener EndpointEventListener = (EndpointEventListener)service;
-                LOG.trace("matching {} against {}", endpoint, currentScope);
-                if (matchFilter(bctx, currentScope, endpoint)) {
-                    LOG.debug("Matched {} against {}", endpoint, currentScope);
-                    notifyListener(event, currentScope, EndpointEventListenerRef.getBundle(),
-                                   EndpointEventListener);
+                if (service instanceof EndpointEventListener) {
+                    EndpointEventListener epeListener = (EndpointEventListener)service;
+                    notifyListener(event, currentScope, epeListener);
+                } else if (service instanceof EndpointListener) {
+                    EndpointListener epListener = (EndpointListener)service;
+                    notifyListener(event, currentScope, epListener);
                 }
             } finally {
                 if (service != null) {
-                    bctx.ungetService(EndpointEventListenerRef);
+                    bctx.ungetService(epListenerRef);
                 }
             }
+            }
         }
     }
     
@@ -194,14 +202,28 @@ public class InterfaceMonitorManager {
     }
 
 
-    private void notifyListener(EndpointEvent event, String currentScope,
-                                Bundle listenerBundle, EndpointEventListener listener) {
+    private void notifyListener(EndpointEvent event, String currentScope, EndpointEventListener listener) {
         EndpointDescription endpoint = event.getEndpoint();
-        if (listenerBundle == null) {
-            LOG.info("listening service was unregistered, ignoring");
-        } else {
-            LOG.info("Calling endpointchanged from bundle {} for endpoint {} ", listenerBundle.getSymbolicName(), endpoint);
-            listener.endpointChanged(event, currentScope);
+        LOG.info("Calling endpointchanged on class {} for filter {}, type {}, endpoint {} ", listener, currentScope, endpoint);
+        listener.endpointChanged(event, currentScope);
+    }
+    
+    private void notifyListener(EndpointEvent event, String currentScope, EndpointListener listener) {
+        EndpointDescription endpoint = event.getEndpoint();
+        LOG.info("Calling old listener on class {} for filter {}, type {}, endpoint {} ", listener, currentScope, endpoint);
+        switch (event.getType()) {
+        case EndpointEvent.ADDED:
+            listener.endpointAdded(endpoint, currentScope);
+            break;
+
+        case EndpointEvent.MODIFIED:
+            listener.endpointAdded(endpoint, currentScope);
+            listener.endpointRemoved(endpoint, currentScope);
+            break;
+
+        case EndpointEvent.REMOVED:
+            listener.endpointRemoved(endpoint, currentScope);
+            break;
         }
     }
 
@@ -210,7 +232,7 @@ public class InterfaceMonitorManager {
             interest.monitor.close();
         }
         interests.clear();
-        EndpointEventListenerScopes.clear();
+        epListenerScopes.clear();
     }
 
     /**
@@ -223,8 +245,8 @@ public class InterfaceMonitorManager {
     /**
      * Only for test case!
      */
-    protected synchronized Map<ServiceReference<EndpointEventListener>, List<String>> getEndpointListenerScopes() {
-        return EndpointEventListenerScopes;
+    protected synchronized Map<ServiceReference, List<String>> getEndpointListenerScopes() {
+        return epListenerScopes;
     }
 
     protected List<String> getScopes(ServiceReference<?> sref) {

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/025516f2/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerFactoryTest.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerFactoryTest.java b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerFactoryTest.java
index 777c11c..381ec9d 100644
--- a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerFactoryTest.java
+++ b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerFactoryTest.java
@@ -18,58 +18,50 @@
  */
 package org.apache.aries.rsa.discovery.zookeeper.publish;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
 import java.util.Dictionary;
 import java.util.List;
 
 import org.apache.zookeeper.ZooKeeper;
 import org.easymock.EasyMock;
 import org.easymock.IMocksControl;
+import org.junit.Before;
+import org.junit.Test;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.ServiceRegistration;
 import org.osgi.service.remoteserviceadmin.EndpointEventListener;
+import org.osgi.service.remoteserviceadmin.EndpointListener;
 
-import junit.framework.TestCase;
+@SuppressWarnings("deprecation")
+public class PublishingEndpointListenerFactoryTest {
 
-public class PublishingEndpointListenerFactoryTest extends TestCase {
+    private IMocksControl c;
+    private BundleContext ctx;
+    private ZooKeeper zk;
 
-    @SuppressWarnings("unchecked")
+    @Before
+    public void before() {
+        c = EasyMock.createNiceControl();
+        zk = c.createMock(ZooKeeper.class);
+        ctx = createBundleContext();
+    }
+    
+    @Test
     public void testScope() {
-        IMocksControl c = EasyMock.createNiceControl();
-
-        BundleContext ctx = c.createMock(BundleContext.class);
-        ZooKeeper zk = c.createMock(ZooKeeper.class);
-        @SuppressWarnings("rawtypes")
-        ServiceRegistration sreg = c.createMock(ServiceRegistration.class);
-
         PublishingEndpointListenerFactory eplf = new PublishingEndpointListenerFactory(zk, ctx);
 
-        EasyMock.expect(ctx.registerService(EasyMock.eq(EndpointEventListener.class.getName()), EasyMock.eq(eplf),
-                                            (Dictionary<String, String>)EasyMock.anyObject())).andReturn(sreg).once();
-
-        EasyMock.expect(ctx.getProperty(EasyMock.eq("org.osgi.framework.uuid"))).andReturn("myUUID").anyTimes();
-
         c.replay();
         eplf.start();
         c.verify();
 
     }
 
-    @SuppressWarnings("unchecked")
+    @Test
     public void testServiceFactory() {
-        IMocksControl c = EasyMock.createNiceControl();
-
-        BundleContext ctx = c.createMock(BundleContext.class);
-        ZooKeeper zk = c.createMock(ZooKeeper.class);
-        @SuppressWarnings("rawtypes")
-        ServiceRegistration sreg = c.createMock(ServiceRegistration.class);
-
         PublishingEndpointListenerFactory eplf = new PublishingEndpointListenerFactory(zk, ctx);
 
-        EasyMock.expect(ctx.registerService(EasyMock.eq(EndpointEventListener.class.getName()), EasyMock.eq(eplf),
-                                (Dictionary<String, String>)EasyMock.anyObject())).andReturn(sreg).once();
-
-        EasyMock.expect(ctx.getProperty(EasyMock.eq("org.osgi.framework.uuid"))).andReturn("myUUID").anyTimes();
-
         PublishingEndpointListener eli = c.createMock(PublishingEndpointListener.class);
         eli.close();
         EasyMock.expectLastCall().once();
@@ -78,7 +70,6 @@ public class PublishingEndpointListenerFactoryTest extends TestCase {
         eplf.start();
 
         PublishingEndpointListener service = eplf.getService(null, null);
-        assertNotNull(service);
         assertTrue(service instanceof EndpointEventListener);
 
         List<PublishingEndpointListener> listeners = eplf.getListeners();
@@ -97,4 +88,16 @@ public class PublishingEndpointListenerFactoryTest extends TestCase {
 
         c.verify();
     }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    private BundleContext createBundleContext() {
+        BundleContext ctx = c.createMock(BundleContext.class);
+        ServiceRegistration sreg = c.createMock(ServiceRegistration.class);
+        String[] ifAr = {EndpointEventListener.class.getName(), EndpointListener.class.getName()};
+        EasyMock.expect(ctx.registerService(EasyMock.aryEq(ifAr), EasyMock.anyObject(),
+                                            (Dictionary<String, String>)EasyMock.anyObject())).andReturn(sreg).once();
+    
+        EasyMock.expect(ctx.getProperty(EasyMock.eq("org.osgi.framework.uuid"))).andReturn("myUUID").anyTimes();
+        return ctx;
+    }
 }

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/025516f2/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorTest.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorTest.java b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorTest.java
index e2ecece..53ddbc4 100644
--- a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorTest.java
+++ b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorTest.java
@@ -20,6 +20,7 @@ package org.apache.aries.rsa.discovery.zookeeper.subscribe;
 
 import static org.easymock.EasyMock.eq;
 import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
 
 import java.util.Collections;
 
@@ -29,6 +30,7 @@ import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher.Event.EventType;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.AsyncCallback.DataCallback;
 import org.apache.zookeeper.data.Stat;
 import org.easymock.EasyMock;
 import org.easymock.IMocksControl;
@@ -52,6 +54,8 @@ public class InterfaceMonitorTest extends TestCase {
         InterfaceMonitor im = new InterfaceMonitor(zk, interf, endpointListener, scope);
         zk.exists(eq(node), eq(im), eq(im), EasyMock.anyObject());
         EasyMock.expectLastCall().once();
+        zk.getData(eq(node), eq(im), EasyMock.anyObject(DataCallback.class), EasyMock.anyObject());
+        expectLastCall();
 
         expect(zk.exists(eq(node), eq(false))).andReturn(new Stat()).anyTimes();
         expect(zk.getChildren(eq(node), eq(false))).andReturn(Collections.<String> emptyList()).once();


[2/2] aries-rsa git commit: Avoid connection refused exceptions during tests when discovery client is faster than zookeeper server

Posted by cs...@apache.org.
Avoid connection refused exceptions during tests when discovery client is faster than zookeeper server


Project: http://git-wip-us.apache.org/repos/asf/aries-rsa/repo
Commit: http://git-wip-us.apache.org/repos/asf/aries-rsa/commit/389bae15
Tree: http://git-wip-us.apache.org/repos/asf/aries-rsa/tree/389bae15
Diff: http://git-wip-us.apache.org/repos/asf/aries-rsa/diff/389bae15

Branch: refs/heads/master
Commit: 389bae1517d421621cc96bb2c75f8219c82f3a73
Parents: 025516f
Author: Christian Schneider <cs...@adobe.com>
Authored: Tue Feb 6 16:59:12 2018 +0100
Committer: Christian Schneider <cs...@adobe.com>
Committed: Tue Feb 6 16:59:12 2018 +0100

----------------------------------------------------------------------
 .../discovery/zookeeper/ZooKeeperDiscovery.java | 39 ++++++++++++++++----
 1 file changed, 32 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/389bae15/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java
index 584da35..13dadad 100644
--- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java
@@ -19,6 +19,7 @@
 package org.apache.aries.rsa.discovery.zookeeper;
 
 import java.io.IOException;
+import java.net.Socket;
 import java.util.Dictionary;
 import java.util.Enumeration;
 import java.util.HashMap;
@@ -58,7 +59,7 @@ public class ZooKeeperDiscovery implements Watcher, ManagedService {
         this.bctx = bctx;
     }
 
-    public synchronized void updated(Dictionary<String, ?> configuration) throws ConfigurationException {
+    public synchronized void updated(final Dictionary<String, ?> configuration) throws ConfigurationException {
         LOG.debug("Received configuration update for Zookeeper Discovery: {}", configuration);
         // make changes only if config actually changed, to prevent unnecessary ZooKeeper reconnections
         if (!toMap(configuration).equals(toMap(curConfiguration))) {
@@ -66,13 +67,18 @@ public class ZooKeeperDiscovery implements Watcher, ManagedService {
             curConfiguration = configuration;
             // config is null if it doesn't exist, is being deleted or has not yet been loaded
             // in which case we just stop running
-            if (!closed && configuration != null) {
-                try {
-                    createZookeeper(configuration);
-                } catch (IOException e) {
-                    throw new ConfigurationException(null, "Error starting zookeeper client", e);
-                }
+            if (closed || configuration == null) {
+                return;
             }
+            new Thread(new Runnable() {
+                public void run() {
+                    try {
+                        createZookeeper(configuration);
+                    } catch (IOException e) {
+                        LOG.error("Error starting zookeeper client", e);
+                    }
+                }
+            }).start();
         }
     }
 
@@ -155,8 +161,27 @@ public class ZooKeeperDiscovery implements Watcher, ManagedService {
         String host = (String)getWithDefault(config, "zookeeper.host", "localhost");
         String port = (String)getWithDefault(config, "zookeeper.port", "2181");
         int timeout = Integer.parseInt((String)getWithDefault(config, "zookeeper.timeout", "3000"));
+        waitPort(host, Integer.parseInt(port));
         zkClient = createZooKeeper(host, port, timeout);
     }
+
+    private void waitPort(String host, int port) {
+        long start = System.currentTimeMillis();
+        while (System.currentTimeMillis() - start < 2000) {
+            try (Socket socket = new Socket(host, port)) {
+                return;
+            } catch (IOException e) {
+                safeSleep();
+            }
+        }
+    }
+
+    private void safeSleep() {
+        try {
+            Thread.sleep(100);
+        } catch (InterruptedException e1) {
+        }
+    }
     
     public Object getWithDefault(Dictionary<String, ?> config, String key, Object defaultValue) {
         Object value = config.get(key);