You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by am...@apache.org on 2013/05/29 21:32:46 UTC

svn commit: r1487605 - /cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/

Author: amichai
Date: Wed May 29 19:32:45 2013
New Revision: 1487605

URL: http://svn.apache.org/r1487605
Log:
Cleanup discovery.zookeeper package

Modified:
    cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/InterfaceMonitor.java
    cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/InterfaceMonitorManager.java
    cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/PublishingEndpointListener.java
    cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/PublishingEndpointListenerFactory.java

Modified: cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/InterfaceMonitor.java
URL: http://svn.apache.org/viewvc/cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/InterfaceMonitor.java?rev=1487605&r1=1487604&r2=1487605&view=diff
==============================================================================
--- cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/InterfaceMonitor.java (original)
+++ cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/InterfaceMonitor.java Wed May 29 19:32:45 2013
@@ -47,18 +47,17 @@ public class InterfaceMonitor implements
     private final boolean recursive;
     private boolean closed;
 
-    // This map is *only* accessed in the change() method
+    // This map reference changes, so don't synchronize on it
     private Map<String, EndpointDescription> nodes = new HashMap<String, EndpointDescription>();
 
     public InterfaceMonitor(ZooKeeper zk, String intf, EndpointListener epListener, String scope, BundleContext bctx) {
         this.zookeeper = zk;
         this.znode = Util.getZooKeeperPath(intf);
-        this.recursive = intf == null || "".equals(intf);
+        this.recursive = intf == null || intf.isEmpty();
         this.epListener = epListener;
         if (LOG.isDebugEnabled()) {
-            String recursiveSt = recursive ? "recursive" : "";
-            LOG.debug("Creating new InterfaceMonitor " + recursiveSt + "for scope [" + scope
-                + "] and objectClass [" + intf + "] ");
+            LOG.debug("Creating new InterfaceMonitor " + (recursive ? "(recursive)" : "")
+                + " for scope [" + scope + "] and objectClass [" + intf + "]");
         }
     }
 
@@ -72,7 +71,7 @@ public class InterfaceMonitor implements
     }
 
     /**
-     * Zookeeper watcher
+     * Zookeeper Watcher interface callback.
      */
     public void process(WatchedEvent event) {
         LOG.debug("ZooKeeper watcher callback for event {}", event);
@@ -80,7 +79,7 @@ public class InterfaceMonitor implements
     }
 
     /**
-     * Zookeeper StatCallback
+     * Zookeeper StatCallback interface callback.
      */
     @SuppressWarnings("deprecation")
     public void processResult(int rc, String path, Object ctx, Stat stat) {
@@ -89,7 +88,8 @@ public class InterfaceMonitor implements
         switch (rc) {
         case Code.Ok:
         case Code.NoNode:
-            break;
+            processDelta();
+            return;
 
         case Code.SessionExpired:
         case Code.NoAuth:
@@ -98,10 +98,7 @@ public class InterfaceMonitor implements
 
         default:
             watch();
-            return;
         }
-
-        processDelta();
     }
 
     private void processDelta() {
@@ -117,12 +114,12 @@ public class InterfaceMonitor implements
         try {
             if (zookeeper.exists(znode, false) != null) {
                 zookeeper.getChildren(znode, this);
-                change();
+                refreshNodes();
             } else {
                 LOG.debug("znode {} doesn't exist -> not processing any changes", znode);
             }
-        } catch (Exception ke) {
-            LOG.error("Error getting ZooKeeper data.", ke);
+        } catch (Exception e) {
+            LOG.error("Error getting ZooKeeper data.", e);
         }
     }
 
@@ -133,8 +130,8 @@ public class InterfaceMonitor implements
         nodes.clear();
     }
 
-    public synchronized void change() {
-        LOG.info("Zookeeper callback on node: {}", znode);
+    private synchronized void refreshNodes() {
+        LOG.info("Processing change on node: {}", znode);
 
         Map<String, EndpointDescription> newNodes = new HashMap<String, EndpointDescription>();
         Map<String, EndpointDescription> prevNodes = nodes;
@@ -149,10 +146,10 @@ public class InterfaceMonitor implements
     }
 
     /**
-     * iterates through all child nodes of the given node and tries to find
+     * Iterates through all child nodes of the given node and tries to find
      * endpoints. If the recursive flag is set it also traverses into the child
      * nodes.
-     * 
+     *
      * @return true if an endpoint was found and if the node therefore needs to
      *         be monitored for changes
      */
@@ -199,10 +196,9 @@ public class InterfaceMonitor implements
     }
 
     /**
-     * Scan the node data for Endpoint information and publish it to the related
-     * service listeners
-     * 
-     * @param node
+     * Retrieves data from the given node and parses it into an EndpointDescription.
+     *
+     * @param node a node path
      * @return endpoint found in the node or null if no endpoint was found
      */
     private EndpointDescription getEndpointDescriptionFromNode(String node) {
@@ -212,16 +208,15 @@ public class InterfaceMonitor implements
                 return null;
             }
             byte[] data = zookeeper.getData(node, false, null);
-            LOG.debug("Child: {}", node);
+            LOG.debug("Got data for node: {}", node);
 
             List<Element> elements = LocalDiscoveryUtils.getElements(new ByteArrayInputStream(data));
             if (elements.size() > 0) {
                 return LocalDiscoveryUtils.getEndpointDescription(elements.get(0));
-            } else {
-                LOG.warn("No Discovery information found for node: {}", node);
             }
+            LOG.warn("No Discovery information found for node: {}", node);
         } catch (Exception e) {
-            LOG.error("Problem processing Zookeeper callback", e);
+            LOG.error("Problem getting EndpointDescription from node " + node, e);
         }
         return null;
     }

Modified: cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/InterfaceMonitorManager.java
URL: http://svn.apache.org/viewvc/cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/InterfaceMonitorManager.java?rev=1487605&r1=1487604&r2=1487605&view=diff
==============================================================================
--- cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/InterfaceMonitorManager.java (original)
+++ cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/InterfaceMonitorManager.java Wed May 29 19:32:45 2013
@@ -24,7 +24,6 @@ import java.util.HashMap;
 import java.util.Hashtable;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 import org.apache.zookeeper.ZooKeeper;
 import org.osgi.framework.BundleContext;
@@ -62,7 +61,7 @@ public class InterfaceMonitorManager {
         this.zooKeeper = zooKeeper;
     }
     
-    void addInterest(ServiceReference sref, String scope, String objClass) {
+    public void addInterest(ServiceReference sref, String scope, String objClass) {
         synchronized (interestingScopes) {
             synchronized (handledEndpointlisteners) {
                 Interest interest = interestingScopes.get(scope);
@@ -108,7 +107,7 @@ public class InterfaceMonitorManager {
         return handledEndpointlisteners;
     }
     
-    protected InterfaceMonitor createInterfaceMonitor(String scope, String objClass, final Interest interest) {
+    private InterfaceMonitor createInterfaceMonitor(String scope, String objClass, final Interest interest) {
         EndpointListener epListener = new EndpointListener() {
             public void endpointRemoved(EndpointDescription endpoint, String matchedFilter) {
                 notifyListeners(endpoint, false, interest.relatedServiceListeners);
@@ -131,7 +130,7 @@ public class InterfaceMonitorManager {
             Interest i = interestingScopes.get(scope);
             if (i != null) {
                 i.relatedServiceListeners.remove(sref);
-                if (i.relatedServiceListeners.size() == 0) {
+                if (i.relatedServiceListeners.isEmpty()) {
                     i.im.close();
                     interestingScopes.remove(scope);
                 }
@@ -168,10 +167,10 @@ public class InterfaceMonitorManager {
         }
     }
     
-    private boolean matches(String scope, EndpointDescription epd) {
+    private static boolean matches(String scope, EndpointDescription epd) {
         try {
             Filter f = FrameworkUtil.createFilter(scope);
-            Dictionary<String, Object> dict = mapToDictionary(epd.getProperties());
+            Dictionary<String, Object> dict = new Hashtable<String, Object>(epd.getProperties());
             return f.match(dict);
         } catch (InvalidSyntaxException e) {
             LOG.error("Scope [" + scope + "] resulted in an invalid filter!", e);
@@ -179,19 +178,11 @@ public class InterfaceMonitorManager {
         }
     }
 
-    private Dictionary<String, Object> mapToDictionary(Map<String, Object> map) {
-        Dictionary<String, Object> d = new Hashtable<String, Object>();
-        Set<Map.Entry<String, Object>> entries = map.entrySet();
-        for (Map.Entry<String, Object> entry : entries) {
-            d.put(entry.getKey(), entry.getValue());
-        }
-        return d;
-    }
-
     public void close() {
-        for (String scope : interestingScopes.keySet()) {
-            Interest interest = interestingScopes.get(scope);
+        for (Interest interest : interestingScopes.values()) {
             interest.im.close();
         }
+        interestingScopes.clear();
+        handledEndpointlisteners.clear();
     }
 }

Modified: cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/PublishingEndpointListener.java
URL: http://svn.apache.org/viewvc/cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/PublishingEndpointListener.java?rev=1487605&r1=1487604&r2=1487605&view=diff
==============================================================================
--- cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/PublishingEndpointListener.java (original)
+++ cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/PublishingEndpointListener.java Wed May 29 19:32:45 2013
@@ -111,7 +111,7 @@ public class PublishingEndpointListener 
 
                 endpoints.add(endpoint);
             } catch (Exception ex) {
-                LOG.error("Exception while processing the addition of a ServicePublication.", ex);
+                LOG.error("Exception while processing the addition of an endpoint.", ex);
             }
         }
 
@@ -133,7 +133,7 @@ public class PublishingEndpointListener 
                 removeEndpoint(endpoint);
                 endpoints.remove(endpoint);
             } catch (Exception ex) {
-                LOG.error("Exception while processing the removal of a ServicePublication.", ex);
+                LOG.error("Exception while processing the removal of an endpoint", ex);
             }
         }
 
@@ -191,13 +191,13 @@ public class PublishingEndpointListener 
     }
 
     public void close() {
-        LOG.debug("removing all service publications");
+        LOG.debug("closing - removing all endpoints");
         synchronized (endpoints) {
             for (EndpointDescription ed : endpoints) {
                 try {
                     removeEndpoint(ed);
                 } catch (Exception ex) {
-                    LOG.error("Exception while processing the removal of a ServicePublication.", ex);
+                    LOG.error("Exception while removing endpoint during close", ex);
                 }
             }
             endpoints.clear();

Modified: cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/PublishingEndpointListenerFactory.java
URL: http://svn.apache.org/viewvc/cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/PublishingEndpointListenerFactory.java?rev=1487605&r1=1487604&r2=1487605&view=diff
==============================================================================
--- cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/PublishingEndpointListenerFactory.java (original)
+++ cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/PublishingEndpointListenerFactory.java Wed May 29 19:32:45 2013
@@ -60,13 +60,11 @@ public class PublishingEndpointListenerF
         }
     }
 
-    public void ungetService(Bundle b, ServiceRegistration sr, Object s) {
+    public void ungetService(Bundle b, ServiceRegistration sr, Object service) {
         LOG.debug("remove EndpointListener");
         synchronized (listeners) {
-            if (listeners.contains(s)) {
-                PublishingEndpointListener epl = (PublishingEndpointListener)s;
-                epl.close();
-                listeners.remove(epl);
+            if (listeners.remove(service)) {
+                ((PublishingEndpointListener)service).close();
             }
         }
     }
@@ -88,6 +86,7 @@ public class PublishingEndpointListenerF
         for (PublishingEndpointListener epl : listeners) {
             epl.close();
         }
+        listeners.clear();
     }
 
     /**