You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by am...@apache.org on 2013/05/29 21:32:46 UTC
svn commit: r1487605 -
/cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/
Author: amichai
Date: Wed May 29 19:32:45 2013
New Revision: 1487605
URL: http://svn.apache.org/r1487605
Log:
Cleanup discovery.zookeeper package
Modified:
cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/InterfaceMonitor.java
cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/InterfaceMonitorManager.java
cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/PublishingEndpointListener.java
cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/PublishingEndpointListenerFactory.java
Modified: cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/InterfaceMonitor.java
URL: http://svn.apache.org/viewvc/cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/InterfaceMonitor.java?rev=1487605&r1=1487604&r2=1487605&view=diff
==============================================================================
--- cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/InterfaceMonitor.java (original)
+++ cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/InterfaceMonitor.java Wed May 29 19:32:45 2013
@@ -47,18 +47,17 @@ public class InterfaceMonitor implements
private final boolean recursive;
private boolean closed;
- // This map is *only* accessed in the change() method
+ // This map reference changes, so don't synchronize on it
private Map<String, EndpointDescription> nodes = new HashMap<String, EndpointDescription>();
public InterfaceMonitor(ZooKeeper zk, String intf, EndpointListener epListener, String scope, BundleContext bctx) {
this.zookeeper = zk;
this.znode = Util.getZooKeeperPath(intf);
- this.recursive = intf == null || "".equals(intf);
+ this.recursive = intf == null || intf.isEmpty();
this.epListener = epListener;
if (LOG.isDebugEnabled()) {
- String recursiveSt = recursive ? "recursive" : "";
- LOG.debug("Creating new InterfaceMonitor " + recursiveSt + "for scope [" + scope
- + "] and objectClass [" + intf + "] ");
+ LOG.debug("Creating new InterfaceMonitor " + (recursive ? "(recursive)" : "")
+ + " for scope [" + scope + "] and objectClass [" + intf + "]");
}
}
@@ -72,7 +71,7 @@ public class InterfaceMonitor implements
}
/**
- * Zookeeper watcher
+ * Zookeeper Watcher interface callback.
*/
public void process(WatchedEvent event) {
LOG.debug("ZooKeeper watcher callback for event {}", event);
@@ -80,7 +79,7 @@ public class InterfaceMonitor implements
}
/**
- * Zookeeper StatCallback
+ * Zookeeper StatCallback interface callback.
*/
@SuppressWarnings("deprecation")
public void processResult(int rc, String path, Object ctx, Stat stat) {
@@ -89,7 +88,8 @@ public class InterfaceMonitor implements
switch (rc) {
case Code.Ok:
case Code.NoNode:
- break;
+ processDelta();
+ return;
case Code.SessionExpired:
case Code.NoAuth:
@@ -98,10 +98,7 @@ public class InterfaceMonitor implements
default:
watch();
- return;
}
-
- processDelta();
}
private void processDelta() {
@@ -117,12 +114,12 @@ public class InterfaceMonitor implements
try {
if (zookeeper.exists(znode, false) != null) {
zookeeper.getChildren(znode, this);
- change();
+ refreshNodes();
} else {
LOG.debug("znode {} doesn't exist -> not processing any changes", znode);
}
- } catch (Exception ke) {
- LOG.error("Error getting ZooKeeper data.", ke);
+ } catch (Exception e) {
+ LOG.error("Error getting ZooKeeper data.", e);
}
}
@@ -133,8 +130,8 @@ public class InterfaceMonitor implements
nodes.clear();
}
- public synchronized void change() {
- LOG.info("Zookeeper callback on node: {}", znode);
+ private synchronized void refreshNodes() {
+ LOG.info("Processing change on node: {}", znode);
Map<String, EndpointDescription> newNodes = new HashMap<String, EndpointDescription>();
Map<String, EndpointDescription> prevNodes = nodes;
@@ -149,10 +146,10 @@ public class InterfaceMonitor implements
}
/**
- * iterates through all child nodes of the given node and tries to find
+ * Iterates through all child nodes of the given node and tries to find
* endpoints. If the recursive flag is set it also traverses into the child
* nodes.
- *
+ *
* @return true if an endpoint was found and if the node therefore needs to
* be monitored for changes
*/
@@ -199,10 +196,9 @@ public class InterfaceMonitor implements
}
/**
- * Scan the node data for Endpoint information and publish it to the related
- * service listeners
- *
- * @param node
+ * Retrieves data from the given node and parses it into an EndpointDescription.
+ *
+ * @param node a node path
* @return endpoint found in the node or null if no endpoint was found
*/
private EndpointDescription getEndpointDescriptionFromNode(String node) {
@@ -212,16 +208,15 @@ public class InterfaceMonitor implements
return null;
}
byte[] data = zookeeper.getData(node, false, null);
- LOG.debug("Child: {}", node);
+ LOG.debug("Got data for node: {}", node);
List<Element> elements = LocalDiscoveryUtils.getElements(new ByteArrayInputStream(data));
if (elements.size() > 0) {
return LocalDiscoveryUtils.getEndpointDescription(elements.get(0));
- } else {
- LOG.warn("No Discovery information found for node: {}", node);
}
+ LOG.warn("No Discovery information found for node: {}", node);
} catch (Exception e) {
- LOG.error("Problem processing Zookeeper callback", e);
+ LOG.error("Problem getting EndpointDescription from node " + node, e);
}
return null;
}
Modified: cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/InterfaceMonitorManager.java
URL: http://svn.apache.org/viewvc/cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/InterfaceMonitorManager.java?rev=1487605&r1=1487604&r2=1487605&view=diff
==============================================================================
--- cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/InterfaceMonitorManager.java (original)
+++ cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/InterfaceMonitorManager.java Wed May 29 19:32:45 2013
@@ -24,7 +24,6 @@ import java.util.HashMap;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import org.apache.zookeeper.ZooKeeper;
import org.osgi.framework.BundleContext;
@@ -62,7 +61,7 @@ public class InterfaceMonitorManager {
this.zooKeeper = zooKeeper;
}
- void addInterest(ServiceReference sref, String scope, String objClass) {
+ public void addInterest(ServiceReference sref, String scope, String objClass) {
synchronized (interestingScopes) {
synchronized (handledEndpointlisteners) {
Interest interest = interestingScopes.get(scope);
@@ -108,7 +107,7 @@ public class InterfaceMonitorManager {
return handledEndpointlisteners;
}
- protected InterfaceMonitor createInterfaceMonitor(String scope, String objClass, final Interest interest) {
+ private InterfaceMonitor createInterfaceMonitor(String scope, String objClass, final Interest interest) {
EndpointListener epListener = new EndpointListener() {
public void endpointRemoved(EndpointDescription endpoint, String matchedFilter) {
notifyListeners(endpoint, false, interest.relatedServiceListeners);
@@ -131,7 +130,7 @@ public class InterfaceMonitorManager {
Interest i = interestingScopes.get(scope);
if (i != null) {
i.relatedServiceListeners.remove(sref);
- if (i.relatedServiceListeners.size() == 0) {
+ if (i.relatedServiceListeners.isEmpty()) {
i.im.close();
interestingScopes.remove(scope);
}
@@ -168,10 +167,10 @@ public class InterfaceMonitorManager {
}
}
- private boolean matches(String scope, EndpointDescription epd) {
+ private static boolean matches(String scope, EndpointDescription epd) {
try {
Filter f = FrameworkUtil.createFilter(scope);
- Dictionary<String, Object> dict = mapToDictionary(epd.getProperties());
+ Dictionary<String, Object> dict = new Hashtable<String, Object>(epd.getProperties());
return f.match(dict);
} catch (InvalidSyntaxException e) {
LOG.error("Scope [" + scope + "] resulted in an invalid filter!", e);
@@ -179,19 +178,11 @@ public class InterfaceMonitorManager {
}
}
- private Dictionary<String, Object> mapToDictionary(Map<String, Object> map) {
- Dictionary<String, Object> d = new Hashtable<String, Object>();
- Set<Map.Entry<String, Object>> entries = map.entrySet();
- for (Map.Entry<String, Object> entry : entries) {
- d.put(entry.getKey(), entry.getValue());
- }
- return d;
- }
-
public void close() {
- for (String scope : interestingScopes.keySet()) {
- Interest interest = interestingScopes.get(scope);
+ for (Interest interest : interestingScopes.values()) {
interest.im.close();
}
+ interestingScopes.clear();
+ handledEndpointlisteners.clear();
}
}
Modified: cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/PublishingEndpointListener.java
URL: http://svn.apache.org/viewvc/cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/PublishingEndpointListener.java?rev=1487605&r1=1487604&r2=1487605&view=diff
==============================================================================
--- cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/PublishingEndpointListener.java (original)
+++ cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/PublishingEndpointListener.java Wed May 29 19:32:45 2013
@@ -111,7 +111,7 @@ public class PublishingEndpointListener
endpoints.add(endpoint);
} catch (Exception ex) {
- LOG.error("Exception while processing the addition of a ServicePublication.", ex);
+ LOG.error("Exception while processing the addition of an endpoint.", ex);
}
}
@@ -133,7 +133,7 @@ public class PublishingEndpointListener
removeEndpoint(endpoint);
endpoints.remove(endpoint);
} catch (Exception ex) {
- LOG.error("Exception while processing the removal of a ServicePublication.", ex);
+ LOG.error("Exception while processing the removal of an endpoint", ex);
}
}
@@ -191,13 +191,13 @@ public class PublishingEndpointListener
}
public void close() {
- LOG.debug("removing all service publications");
+ LOG.debug("closing - removing all endpoints");
synchronized (endpoints) {
for (EndpointDescription ed : endpoints) {
try {
removeEndpoint(ed);
} catch (Exception ex) {
- LOG.error("Exception while processing the removal of a ServicePublication.", ex);
+ LOG.error("Exception while removing endpoint during close", ex);
}
}
endpoints.clear();
Modified: cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/PublishingEndpointListenerFactory.java
URL: http://svn.apache.org/viewvc/cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/PublishingEndpointListenerFactory.java?rev=1487605&r1=1487604&r2=1487605&view=diff
==============================================================================
--- cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/PublishingEndpointListenerFactory.java (original)
+++ cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/PublishingEndpointListenerFactory.java Wed May 29 19:32:45 2013
@@ -60,13 +60,11 @@ public class PublishingEndpointListenerF
}
}
- public void ungetService(Bundle b, ServiceRegistration sr, Object s) {
+ public void ungetService(Bundle b, ServiceRegistration sr, Object service) {
LOG.debug("remove EndpointListener");
synchronized (listeners) {
- if (listeners.contains(s)) {
- PublishingEndpointListener epl = (PublishingEndpointListener)s;
- epl.close();
- listeners.remove(epl);
+ if (listeners.remove(service)) {
+ ((PublishingEndpointListener)service).close();
}
}
}
@@ -88,6 +86,7 @@ public class PublishingEndpointListenerF
for (PublishingEndpointListener epl : listeners) {
epl.close();
}
+ listeners.clear();
}
/**