You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by am...@apache.org on 2013/07/03 02:23:46 UTC

svn commit: r1499160 - in /cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src: main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/ main/java/org/apache/cxf/dosgi/discovery/zookeeper/util/ test/java/org/apache/cxf/dosgi/discovery/zookeepe...

Author: amichai
Date: Wed Jul  3 00:23:45 2013
New Revision: 1499160

URL: http://svn.apache.org/r1499160
Log:
Organize InterfaceMonitorManager

Modified:
    cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/EndpointListenerTrackerCustomizer.java
    cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/InterfaceMonitor.java
    cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/InterfaceMonitorManager.java
    cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/util/Utils.java
    cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/InterfaceMonitorManagerTest.java

Modified: cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/EndpointListenerTrackerCustomizer.java
URL: http://svn.apache.org/viewvc/cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/EndpointListenerTrackerCustomizer.java?rev=1499160&r1=1499159&r2=1499160&view=diff
==============================================================================
--- cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/EndpointListenerTrackerCustomizer.java (original)
+++ cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/EndpointListenerTrackerCustomizer.java Wed Jul  3 00:23:45 2013
@@ -40,41 +40,37 @@ public class EndpointListenerTrackerCust
     }
 
     public Object addingService(ServiceReference sref) {
-        handleEndpointListener(sref);
+        updateListenerScopes(sref);
         return sref;
     }
 
     public void modifiedService(ServiceReference sref, Object service) {
-        handleEndpointListener(sref);
+        // called when an EndpointListener updates its service properties,
+        // e.g. when its interest scope is expanded/reduced
+        updateListenerScopes(sref);
     }
 
     public void removedService(ServiceReference sref, Object service) {
-        LOG.info("removedService: {}", sref);
+        LOG.info("removing EndpointListener interests: {}", sref);
         imManager.removeInterest(sref);
     }
 
-    private void handleEndpointListener(ServiceReference sref) {
+    private void updateListenerScopes(ServiceReference sref) {
         if (isOurOwnEndpointListener(sref)) {
-            LOG.debug("Skipping our own endpointListener");
+            LOG.debug("Skipping our own EndpointListener");
             return;
         }
 
+        LOG.info("updating EndpointListener interests: {}", sref);
         if (LOG.isDebugEnabled()) {
-            for (String key : sref.getPropertyKeys()) {
-                LOG.debug("modifiedService: property: " + key + " => " + sref.getProperty(key));
-            }
+            LOG.debug("updated EndpointListener properties: {}", Utils.getProperties(sref));
         }
 
-        for (String scope : Utils.getScopes(sref)) {
-            String objClass = Utils.getObjectClass(scope);
-            LOG.debug("Adding interest in scope {}, objectClass {}", scope, objClass);
-            imManager.addInterest(sref, scope, objClass);
-        }
+        imManager.addInterest(sref);
     }
 
     private static boolean isOurOwnEndpointListener(ServiceReference sref) {
         return Boolean.parseBoolean(String.valueOf(
                 sref.getProperty(ZooKeeperDiscovery.DISCOVERY_ZOOKEEPER_ID)));
     }
-
 }

Modified: cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/InterfaceMonitor.java
URL: http://svn.apache.org/viewvc/cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/InterfaceMonitor.java?rev=1499160&r1=1499159&r2=1499160&view=diff
==============================================================================
--- cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/InterfaceMonitor.java (original)
+++ cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/InterfaceMonitor.java Wed Jul  3 00:23:45 2013
@@ -43,6 +43,9 @@ import org.slf4j.LoggerFactory;
  * whose data is a serialized version of an EndpointDescription, and notifies an
  * EndpointListener when changes are detected (which can then propagate the
  * notification to other EndpointListeners with a matching scope).
+ * <p>
+ * Note that the EndpointListener is used here as a decoupling interface for
+ * convenience, and is not necessarily used according to its documented contract.
  */
 public class InterfaceMonitor implements Watcher, StatCallback {
 
@@ -50,20 +53,20 @@ public class InterfaceMonitor implements
 
     private final String znode;
     private final ZooKeeper zookeeper;
-    private final EndpointListener epListener;
+    private final EndpointListener listener;
     private final boolean recursive;
     private volatile boolean closed;
 
     // This map reference changes, so don't synchronize on it
     private Map<String, EndpointDescription> nodes = new HashMap<String, EndpointDescription>();
 
-    public InterfaceMonitor(ZooKeeper zk, String intf, EndpointListener epListener, String scope) {
+    public InterfaceMonitor(ZooKeeper zk, String objClass, EndpointListener listener, String scope) {
         this.zookeeper = zk;
-        this.znode = Utils.getZooKeeperPath(intf);
-        this.recursive = intf == null || intf.isEmpty();
-        this.epListener = epListener;
+        this.znode = Utils.getZooKeeperPath(objClass);
+        this.recursive = objClass == null || objClass.isEmpty();
+        this.listener = listener;
         LOG.debug("Creating new InterfaceMonitor {} for scope [{}] and objectClass [{}]",
-                new Object[] {recursive ? "(recursive)" : "", scope, intf});
+                new Object[] {recursive ? "(recursive)" : "", scope, objClass});
     }
 
     public void start() {
@@ -131,7 +134,7 @@ public class InterfaceMonitor implements
     public synchronized void close() {
         closed = true;
         for (EndpointDescription epd : nodes.values()) {
-            epListener.endpointRemoved(epd, null);
+            listener.endpointRemoved(epd, null);
         }
         nodes.clear();
     }
@@ -149,7 +152,7 @@ public class InterfaceMonitor implements
         // whatever is left in prevNodes now has been removed from Discovery
         LOG.debug("processChildren done. Nodes that are missing now and need to be removed: {}", prevNodes.values());
         for (EndpointDescription epd : prevNodes.values()) {
-            epListener.endpointRemoved(epd, null);
+            listener.endpointRemoved(epd, null);
         }
         nodes = newNodes;
     }
@@ -183,7 +186,7 @@ public class InterfaceMonitor implements
                     LOG.debug("Properties: {}", epd.getProperties());
                     if (prevEpd == null) {
                         // This guy is new
-                        epListener.endpointAdded(epd, null);
+                        listener.endpointAdded(epd, null);
                     } else if (!prevEpd.getProperties().equals(epd.getProperties())) {
                         // TODO
                     }

Modified: cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/InterfaceMonitorManager.java
URL: http://svn.apache.org/viewvc/cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/InterfaceMonitorManager.java?rev=1499160&r1=1499159&r2=1499160&view=diff
==============================================================================
--- cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/InterfaceMonitorManager.java (original)
+++ cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/InterfaceMonitorManager.java Wed Jul  3 00:23:45 2013
@@ -24,7 +24,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CopyOnWriteArrayList;
 
-import org.apache.cxf.dosgi.discovery.local.util.Utils;
+import org.apache.cxf.dosgi.discovery.zookeeper.util.Utils;
 import org.apache.zookeeper.ZooKeeper;
 import org.osgi.framework.Bundle;
 import org.osgi.framework.BundleContext;
@@ -34,24 +34,27 @@ import org.osgi.service.remoteserviceadm
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.cxf.dosgi.discovery.local.util.Utils.matchFilter;
+
 /**
  * Manages the EndpointListeners and the scopes they are interested in.
  * For each scope with interested EndpointListeners an InterfaceMonitor is created.
  * The InterfaceMonitor calls back when it detects added or removed external Endpoints.
- * These events are then forwarded to all interested EndpointListeners
+ * These events are then forwarded to all interested EndpointListeners.
  */
 public class InterfaceMonitorManager {
 
     private static final Logger LOG = LoggerFactory.getLogger(InterfaceMonitorManager.class);
 
-    private final ZooKeeper zooKeeper;
-    private final Map<ServiceReference, List<String> /* scopes of the epl */> handledEndpointListeners
-        = new HashMap<ServiceReference, List<String>>();
-    private final Map<String /* scope */, Interest> interestingScopes = new HashMap<String, Interest>();
     private final BundleContext bctx;
+    private final ZooKeeper zooKeeper;
+    // map of EndpointListeners and the scopes they are interested in
+    private final Map<ServiceReference, List<String>> listenerScopes = new HashMap<ServiceReference, List<String>>();
+    // map of scopes and their interest data
+    private final Map<String, Interest> interests = new HashMap<String, Interest>();
 
     protected static class Interest {
-        List<ServiceReference> relatedServiceListeners = new CopyOnWriteArrayList<ServiceReference>();
+        List<ServiceReference> listeners = new CopyOnWriteArrayList<ServiceReference>();
         InterfaceMonitor im;
     }
 
@@ -60,90 +63,83 @@ public class InterfaceMonitorManager {
         this.zooKeeper = zooKeeper;
     }
 
+    public void addInterest(ServiceReference sref) {
+        for (String scope : Utils.getScopes(sref)) {
+            String objClass = Utils.getObjectClass(scope);
+            LOG.debug("Adding interest in scope {}, objectClass {}", scope, objClass);
+            addInterest(sref, scope, objClass);
+        }
+    }
+
     public synchronized void addInterest(ServiceReference sref, String scope, String objClass) {
-        Interest interest = interestingScopes.get(scope);
+        // get or create interest for given scope and add listener to it
+        Interest interest = interests.get(scope);
         if (interest == null) {
+            // create interest, add listener and start monitor
             interest = new Interest();
-            interestingScopes.put(scope, interest);
-        }
-
-        if (!interest.relatedServiceListeners.contains(sref)) {
-            interest.relatedServiceListeners.add(sref);
-        }
-
-        if (interest.im == null) {
+            interests.put(scope, interest);
+            interest.listeners.add(sref); // add it before monitor starts so we don't miss events
             interest.im = createInterfaceMonitor(scope, objClass, interest);
             interest.im.start();
+        } else if (!interest.listeners.contains(sref)) {
+            // interest already exists, so just add listener to it
+            interest.listeners.add(sref);
         }
 
-        List<String> handledScopes = handledEndpointListeners.get(sref);
-        if (handledScopes == null) {
-            handledScopes = new ArrayList<String>(1);
-            handledEndpointListeners.put(sref, handledScopes);
+        // add scope to listener's scopes list
+        List<String> scopes = listenerScopes.get(sref);
+        if (scopes == null) {
+            scopes = new ArrayList<String>(1);
+            listenerScopes.put(sref, scopes);
         }
-
-        if (!handledScopes.contains(scope)) {
-            handledScopes.add(scope);
+        if (!scopes.contains(scope)) {
+            scopes.add(scope);
         }
     }
 
-    /**
-     * Only for test case!
-     */
-    protected synchronized Map<String, Interest> getInterestingScopes() {
-        return interestingScopes;
-    }
+    public synchronized void removeInterest(ServiceReference sref) {
+        List<String> scopes = listenerScopes.get(sref);
+        if (scopes == null) {
+            return;
+        }
 
-    /**
-     * Only for test case!
-     */
-    protected synchronized Map<ServiceReference, List<String>> getHandledEndpointListeners() {
-        return handledEndpointListeners;
+        for (String scope : scopes) {
+            Interest interest = interests.get(scope);
+            if (interest != null) {
+                interest.listeners.remove(sref);
+                if (interest.listeners.isEmpty()) {
+                    interest.im.close();
+                    interests.remove(scope);
+                }
+            }
+        }
+        listenerScopes.remove(sref);
     }
 
     private InterfaceMonitor createInterfaceMonitor(final String scope, String objClass, final Interest interest) {
         // holding this object's lock in the callbacks can lead to a deadlock with InterfaceMonitor
-        EndpointListener epListener = new EndpointListener() {
+        EndpointListener listener = new EndpointListener() {
 
             public void endpointRemoved(EndpointDescription endpoint, String matchedFilter) {
-                notifyListeners(endpoint, scope, false, interest.relatedServiceListeners);
+                notifyListeners(endpoint, scope, false, interest.listeners);
             }
 
             public void endpointAdded(EndpointDescription endpoint, String matchedFilter) {
-                notifyListeners(endpoint, scope, true, interest.relatedServiceListeners);
+                notifyListeners(endpoint, scope, true, interest.listeners);
             }
         };
-        return new InterfaceMonitor(zooKeeper, objClass, epListener, scope);
-    }
-
-    public synchronized void removeInterest(ServiceReference sref) {
-        List<String> handledScopes = handledEndpointListeners.get(sref);
-        if (handledScopes == null) {
-            return;
-        }
-
-        for (String scope : handledScopes) {
-            Interest i = interestingScopes.get(scope);
-            if (i != null) {
-                i.relatedServiceListeners.remove(sref);
-                if (i.relatedServiceListeners.isEmpty()) {
-                    i.im.close();
-                    interestingScopes.remove(scope);
-                }
-            }
-        }
-        handledEndpointListeners.remove(sref);
+        return new InterfaceMonitor(zooKeeper, objClass, listener, scope);
     }
 
     private void notifyListeners(EndpointDescription epd, String currentScope, boolean isAdded,
-            List<ServiceReference> relatedServiceListeners) {
-        for (ServiceReference sref : relatedServiceListeners) {
+            List<ServiceReference> listeners) {
+        for (ServiceReference sref : listeners) {
             Object service = bctx.getService(sref);
             try {
                 if (service instanceof EndpointListener) {
                     EndpointListener epl = (EndpointListener) service;
                     LOG.trace("matching {} against {}", epd, currentScope);
-                    if (Utils.matchFilter(bctx, currentScope, epd)) {
+                    if (matchFilter(bctx, currentScope, epd)) {
                         LOG.debug("Matched {} against {}", epd, currentScope);
                         notifyListener(epd, currentScope, isAdded, sref.getBundle(), epl);
                     }
@@ -172,10 +168,24 @@ public class InterfaceMonitorManager {
     }
 
     public synchronized void close() {
-        for (Interest interest : interestingScopes.values()) {
+        for (Interest interest : interests.values()) {
             interest.im.close();
         }
-        interestingScopes.clear();
-        handledEndpointListeners.clear();
+        interests.clear();
+        listenerScopes.clear();
+    }
+
+    /**
+     * Only for test case!
+     */
+    protected synchronized Map<String, Interest> getInterests() {
+        return interests;
+    }
+
+    /**
+     * Only for test case!
+     */
+    protected synchronized Map<ServiceReference, List<String>> getListenerScopes() {
+        return listenerScopes;
     }
 }

Modified: cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/util/Utils.java
URL: http://svn.apache.org/viewvc/cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/util/Utils.java?rev=1499160&r1=1499159&r2=1499160&view=diff
==============================================================================
--- cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/util/Utils.java (original)
+++ cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/util/Utils.java Wed Jul  3 00:23:45 2013
@@ -21,6 +21,8 @@ package org.apache.cxf.dosgi.discovery.z
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.UUID;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -121,4 +123,20 @@ public final class Utils {
         Matcher m = OBJECTCLASS_PATTERN.matcher(scope);
         return m.matches() ? m.group(1) : null;
     }
+
+    /**
+     * Returns a service's properties as a map.
+     *
+     * @param serviceReference a service reference
+     * @return the service's properties as a map
+     */
+    public static Map<String, Object> getProperties(ServiceReference serviceReference) {
+        String[] keys = serviceReference.getPropertyKeys();
+        Map<String, Object> props = new HashMap<String, Object>(keys.length);
+        for (String key : keys) {
+            Object val = serviceReference.getProperty(key);
+            props.put(key, val);
+        }
+        return props;
+    }
 }

Modified: cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/InterfaceMonitorManagerTest.java
URL: http://svn.apache.org/viewvc/cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/InterfaceMonitorManagerTest.java?rev=1499160&r1=1499159&r2=1499160&view=diff
==============================================================================
--- cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/InterfaceMonitorManagerTest.java (original)
+++ cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/InterfaceMonitorManagerTest.java Wed Jul  3 00:23:45 2013
@@ -93,52 +93,52 @@ public class InterfaceMonitorManagerTest
 
         // sref has no scope -> nothing should happen
 
-        assertEquals(0, eltc.getHandledEndpointListeners().size());
-        assertEquals(0, eltc.getInterestingScopes().size());
+        assertEquals(0, eltc.getListenerScopes().size());
+        assertEquals(0, eltc.getInterests().size());
 
         //p.put(EndpointListener.ENDPOINT_LISTENER_SCOPE, );
 
         eltc.addInterest(sref, "(objectClass=mine)", "mine");
 
-        assertEquals(1, eltc.getHandledEndpointListeners().size());
-        assertEquals(1, eltc.getHandledEndpointListeners().get(sref).size());
-        assertEquals("(objectClass=mine)", eltc.getHandledEndpointListeners().get(sref).get(0));
-        assertEquals(1, eltc.getInterestingScopes().size());
+        assertEquals(1, eltc.getListenerScopes().size());
+        assertEquals(1, eltc.getListenerScopes().get(sref).size());
+        assertEquals("(objectClass=mine)", eltc.getListenerScopes().get(sref).get(0));
+        assertEquals(1, eltc.getInterests().size());
 
         eltc.addInterest(sref, "(objectClass=mine)", "mine");
 
-        assertEquals(1, eltc.getHandledEndpointListeners().size());
-        assertEquals(1, eltc.getHandledEndpointListeners().get(sref).size());
-        assertEquals("(objectClass=mine)", eltc.getHandledEndpointListeners().get(sref).get(0));
-        assertEquals(1, eltc.getInterestingScopes().size());
+        assertEquals(1, eltc.getListenerScopes().size());
+        assertEquals(1, eltc.getListenerScopes().get(sref).size());
+        assertEquals("(objectClass=mine)", eltc.getListenerScopes().get(sref).get(0));
+        assertEquals(1, eltc.getInterests().size());
 
         eltc.addInterest(sref2, "(objectClass=mine)", "mine");
 
-        assertEquals(2, eltc.getHandledEndpointListeners().size());
-        assertEquals(1, eltc.getHandledEndpointListeners().get(sref).size());
-        assertEquals(1, eltc.getHandledEndpointListeners().get(sref2).size());
-        assertEquals("(objectClass=mine)", eltc.getHandledEndpointListeners().get(sref).get(0));
-        assertEquals("(objectClass=mine)", eltc.getHandledEndpointListeners().get(sref2).get(0));
-        assertEquals(1, eltc.getInterestingScopes().size());
+        assertEquals(2, eltc.getListenerScopes().size());
+        assertEquals(1, eltc.getListenerScopes().get(sref).size());
+        assertEquals(1, eltc.getListenerScopes().get(sref2).size());
+        assertEquals("(objectClass=mine)", eltc.getListenerScopes().get(sref).get(0));
+        assertEquals("(objectClass=mine)", eltc.getListenerScopes().get(sref2).get(0));
+        assertEquals(1, eltc.getInterests().size());
 
         eltc.removeInterest(sref);
 
-        assertEquals(1, eltc.getHandledEndpointListeners().size());
-        assertEquals(1, eltc.getHandledEndpointListeners().get(sref2).size());
-        assertEquals("(objectClass=mine)", eltc.getHandledEndpointListeners().get(sref2).get(0));
-        assertEquals(1, eltc.getInterestingScopes().size());
+        assertEquals(1, eltc.getListenerScopes().size());
+        assertEquals(1, eltc.getListenerScopes().get(sref2).size());
+        assertEquals("(objectClass=mine)", eltc.getListenerScopes().get(sref2).get(0));
+        assertEquals(1, eltc.getInterests().size());
 
         eltc.removeInterest(sref);
 
-        assertEquals(1, eltc.getHandledEndpointListeners().size());
-        assertEquals(1, eltc.getHandledEndpointListeners().get(sref2).size());
-        assertEquals("(objectClass=mine)", eltc.getHandledEndpointListeners().get(sref2).get(0));
-        assertEquals(1, eltc.getInterestingScopes().size());
+        assertEquals(1, eltc.getListenerScopes().size());
+        assertEquals(1, eltc.getListenerScopes().get(sref2).size());
+        assertEquals("(objectClass=mine)", eltc.getListenerScopes().get(sref2).get(0));
+        assertEquals(1, eltc.getInterests().size());
 
         eltc.removeInterest(sref2);
 
-        assertEquals(0, eltc.getHandledEndpointListeners().size());
-        assertEquals(0, eltc.getInterestingScopes().size());
+        assertEquals(0, eltc.getListenerScopes().size());
+        assertEquals(0, eltc.getInterests().size());
 
         c.verify();
         for (IMocksControl control : controls) {