You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by am...@apache.org on 2013/07/03 02:23:46 UTC
svn commit: r1499160 - in
/cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src:
main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/
main/java/org/apache/cxf/dosgi/discovery/zookeeper/util/
test/java/org/apache/cxf/dosgi/discovery/zookeepe...
Author: amichai
Date: Wed Jul 3 00:23:45 2013
New Revision: 1499160
URL: http://svn.apache.org/r1499160
Log:
Organize InterfaceMonitorManager
Modified:
cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/EndpointListenerTrackerCustomizer.java
cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/InterfaceMonitor.java
cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/InterfaceMonitorManager.java
cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/util/Utils.java
cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/InterfaceMonitorManagerTest.java
Modified: cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/EndpointListenerTrackerCustomizer.java
URL: http://svn.apache.org/viewvc/cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/EndpointListenerTrackerCustomizer.java?rev=1499160&r1=1499159&r2=1499160&view=diff
==============================================================================
--- cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/EndpointListenerTrackerCustomizer.java (original)
+++ cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/EndpointListenerTrackerCustomizer.java Wed Jul 3 00:23:45 2013
@@ -40,41 +40,37 @@ public class EndpointListenerTrackerCust
}
public Object addingService(ServiceReference sref) {
- handleEndpointListener(sref);
+ updateListenerScopes(sref);
return sref;
}
public void modifiedService(ServiceReference sref, Object service) {
- handleEndpointListener(sref);
+ // called when an EndpointListener updates its service properties,
+ // e.g. when its interest scope is expanded/reduced
+ updateListenerScopes(sref);
}
public void removedService(ServiceReference sref, Object service) {
- LOG.info("removedService: {}", sref);
+ LOG.info("removing EndpointListener interests: {}", sref);
imManager.removeInterest(sref);
}
- private void handleEndpointListener(ServiceReference sref) {
+ private void updateListenerScopes(ServiceReference sref) {
if (isOurOwnEndpointListener(sref)) {
- LOG.debug("Skipping our own endpointListener");
+ LOG.debug("Skipping our own EndpointListener");
return;
}
+ LOG.info("updating EndpointListener interests: {}", sref);
if (LOG.isDebugEnabled()) {
- for (String key : sref.getPropertyKeys()) {
- LOG.debug("modifiedService: property: " + key + " => " + sref.getProperty(key));
- }
+ LOG.debug("updated EndpointListener properties: {}", Utils.getProperties(sref));
}
- for (String scope : Utils.getScopes(sref)) {
- String objClass = Utils.getObjectClass(scope);
- LOG.debug("Adding interest in scope {}, objectClass {}", scope, objClass);
- imManager.addInterest(sref, scope, objClass);
- }
+ imManager.addInterest(sref);
}
private static boolean isOurOwnEndpointListener(ServiceReference sref) {
return Boolean.parseBoolean(String.valueOf(
sref.getProperty(ZooKeeperDiscovery.DISCOVERY_ZOOKEEPER_ID)));
}
-
}
Modified: cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/InterfaceMonitor.java
URL: http://svn.apache.org/viewvc/cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/InterfaceMonitor.java?rev=1499160&r1=1499159&r2=1499160&view=diff
==============================================================================
--- cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/InterfaceMonitor.java (original)
+++ cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/InterfaceMonitor.java Wed Jul 3 00:23:45 2013
@@ -43,6 +43,9 @@ import org.slf4j.LoggerFactory;
* whose data is a serialized version of an EndpointDescription, and notifies an
* EndpointListener when changes are detected (which can then propagate the
* notification to other EndpointListeners with a matching scope).
+ * <p>
+ * Note that the EndpointListener is used here as a decoupling interface for
+ * convenience, and is not necessarily used according to its documented contract.
*/
public class InterfaceMonitor implements Watcher, StatCallback {
@@ -50,20 +53,20 @@ public class InterfaceMonitor implements
private final String znode;
private final ZooKeeper zookeeper;
- private final EndpointListener epListener;
+ private final EndpointListener listener;
private final boolean recursive;
private volatile boolean closed;
// This map reference changes, so don't synchronize on it
private Map<String, EndpointDescription> nodes = new HashMap<String, EndpointDescription>();
- public InterfaceMonitor(ZooKeeper zk, String intf, EndpointListener epListener, String scope) {
+ public InterfaceMonitor(ZooKeeper zk, String objClass, EndpointListener listener, String scope) {
this.zookeeper = zk;
- this.znode = Utils.getZooKeeperPath(intf);
- this.recursive = intf == null || intf.isEmpty();
- this.epListener = epListener;
+ this.znode = Utils.getZooKeeperPath(objClass);
+ this.recursive = objClass == null || objClass.isEmpty();
+ this.listener = listener;
LOG.debug("Creating new InterfaceMonitor {} for scope [{}] and objectClass [{}]",
- new Object[] {recursive ? "(recursive)" : "", scope, intf});
+ new Object[] {recursive ? "(recursive)" : "", scope, objClass});
}
public void start() {
@@ -131,7 +134,7 @@ public class InterfaceMonitor implements
public synchronized void close() {
closed = true;
for (EndpointDescription epd : nodes.values()) {
- epListener.endpointRemoved(epd, null);
+ listener.endpointRemoved(epd, null);
}
nodes.clear();
}
@@ -149,7 +152,7 @@ public class InterfaceMonitor implements
// whatever is left in prevNodes now has been removed from Discovery
LOG.debug("processChildren done. Nodes that are missing now and need to be removed: {}", prevNodes.values());
for (EndpointDescription epd : prevNodes.values()) {
- epListener.endpointRemoved(epd, null);
+ listener.endpointRemoved(epd, null);
}
nodes = newNodes;
}
@@ -183,7 +186,7 @@ public class InterfaceMonitor implements
LOG.debug("Properties: {}", epd.getProperties());
if (prevEpd == null) {
// This guy is new
- epListener.endpointAdded(epd, null);
+ listener.endpointAdded(epd, null);
} else if (!prevEpd.getProperties().equals(epd.getProperties())) {
// TODO
}
Modified: cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/InterfaceMonitorManager.java
URL: http://svn.apache.org/viewvc/cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/InterfaceMonitorManager.java?rev=1499160&r1=1499159&r2=1499160&view=diff
==============================================================================
--- cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/InterfaceMonitorManager.java (original)
+++ cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/InterfaceMonitorManager.java Wed Jul 3 00:23:45 2013
@@ -24,7 +24,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
-import org.apache.cxf.dosgi.discovery.local.util.Utils;
+import org.apache.cxf.dosgi.discovery.zookeeper.util.Utils;
import org.apache.zookeeper.ZooKeeper;
import org.osgi.framework.Bundle;
import org.osgi.framework.BundleContext;
@@ -34,24 +34,27 @@ import org.osgi.service.remoteserviceadm
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.cxf.dosgi.discovery.local.util.Utils.matchFilter;
+
/**
* Manages the EndpointListeners and the scopes they are interested in.
* For each scope with interested EndpointListeners an InterfaceMonitor is created.
* The InterfaceMonitor calls back when it detects added or removed external Endpoints.
- * These events are then forwarded to all interested EndpointListeners
+ * These events are then forwarded to all interested EndpointListeners.
*/
public class InterfaceMonitorManager {
private static final Logger LOG = LoggerFactory.getLogger(InterfaceMonitorManager.class);
- private final ZooKeeper zooKeeper;
- private final Map<ServiceReference, List<String> /* scopes of the epl */> handledEndpointListeners
- = new HashMap<ServiceReference, List<String>>();
- private final Map<String /* scope */, Interest> interestingScopes = new HashMap<String, Interest>();
private final BundleContext bctx;
+ private final ZooKeeper zooKeeper;
+ // map of EndpointListeners and the scopes they are interested in
+ private final Map<ServiceReference, List<String>> listenerScopes = new HashMap<ServiceReference, List<String>>();
+ // map of scopes and their interest data
+ private final Map<String, Interest> interests = new HashMap<String, Interest>();
protected static class Interest {
- List<ServiceReference> relatedServiceListeners = new CopyOnWriteArrayList<ServiceReference>();
+ List<ServiceReference> listeners = new CopyOnWriteArrayList<ServiceReference>();
InterfaceMonitor im;
}
@@ -60,90 +63,83 @@ public class InterfaceMonitorManager {
this.zooKeeper = zooKeeper;
}
+ public void addInterest(ServiceReference sref) {
+ for (String scope : Utils.getScopes(sref)) {
+ String objClass = Utils.getObjectClass(scope);
+ LOG.debug("Adding interest in scope {}, objectClass {}", scope, objClass);
+ addInterest(sref, scope, objClass);
+ }
+ }
+
public synchronized void addInterest(ServiceReference sref, String scope, String objClass) {
- Interest interest = interestingScopes.get(scope);
+ // get or create interest for given scope and add listener to it
+ Interest interest = interests.get(scope);
if (interest == null) {
+ // create interest, add listener and start monitor
interest = new Interest();
- interestingScopes.put(scope, interest);
- }
-
- if (!interest.relatedServiceListeners.contains(sref)) {
- interest.relatedServiceListeners.add(sref);
- }
-
- if (interest.im == null) {
+ interests.put(scope, interest);
+ interest.listeners.add(sref); // add it before monitor starts so we don't miss events
interest.im = createInterfaceMonitor(scope, objClass, interest);
interest.im.start();
+ } else if (!interest.listeners.contains(sref)) {
+ // interest already exists, so just add listener to it
+ interest.listeners.add(sref);
}
- List<String> handledScopes = handledEndpointListeners.get(sref);
- if (handledScopes == null) {
- handledScopes = new ArrayList<String>(1);
- handledEndpointListeners.put(sref, handledScopes);
+ // add scope to listener's scopes list
+ List<String> scopes = listenerScopes.get(sref);
+ if (scopes == null) {
+ scopes = new ArrayList<String>(1);
+ listenerScopes.put(sref, scopes);
}
-
- if (!handledScopes.contains(scope)) {
- handledScopes.add(scope);
+ if (!scopes.contains(scope)) {
+ scopes.add(scope);
}
}
- /**
- * Only for test case!
- */
- protected synchronized Map<String, Interest> getInterestingScopes() {
- return interestingScopes;
- }
+ public synchronized void removeInterest(ServiceReference sref) {
+ List<String> scopes = listenerScopes.get(sref);
+ if (scopes == null) {
+ return;
+ }
- /**
- * Only for test case!
- */
- protected synchronized Map<ServiceReference, List<String>> getHandledEndpointListeners() {
- return handledEndpointListeners;
+ for (String scope : scopes) {
+ Interest interest = interests.get(scope);
+ if (interest != null) {
+ interest.listeners.remove(sref);
+ if (interest.listeners.isEmpty()) {
+ interest.im.close();
+ interests.remove(scope);
+ }
+ }
+ }
+ listenerScopes.remove(sref);
}
private InterfaceMonitor createInterfaceMonitor(final String scope, String objClass, final Interest interest) {
// holding this object's lock in the callbacks can lead to a deadlock with InterfaceMonitor
- EndpointListener epListener = new EndpointListener() {
+ EndpointListener listener = new EndpointListener() {
public void endpointRemoved(EndpointDescription endpoint, String matchedFilter) {
- notifyListeners(endpoint, scope, false, interest.relatedServiceListeners);
+ notifyListeners(endpoint, scope, false, interest.listeners);
}
public void endpointAdded(EndpointDescription endpoint, String matchedFilter) {
- notifyListeners(endpoint, scope, true, interest.relatedServiceListeners);
+ notifyListeners(endpoint, scope, true, interest.listeners);
}
};
- return new InterfaceMonitor(zooKeeper, objClass, epListener, scope);
- }
-
- public synchronized void removeInterest(ServiceReference sref) {
- List<String> handledScopes = handledEndpointListeners.get(sref);
- if (handledScopes == null) {
- return;
- }
-
- for (String scope : handledScopes) {
- Interest i = interestingScopes.get(scope);
- if (i != null) {
- i.relatedServiceListeners.remove(sref);
- if (i.relatedServiceListeners.isEmpty()) {
- i.im.close();
- interestingScopes.remove(scope);
- }
- }
- }
- handledEndpointListeners.remove(sref);
+ return new InterfaceMonitor(zooKeeper, objClass, listener, scope);
}
private void notifyListeners(EndpointDescription epd, String currentScope, boolean isAdded,
- List<ServiceReference> relatedServiceListeners) {
- for (ServiceReference sref : relatedServiceListeners) {
+ List<ServiceReference> listeners) {
+ for (ServiceReference sref : listeners) {
Object service = bctx.getService(sref);
try {
if (service instanceof EndpointListener) {
EndpointListener epl = (EndpointListener) service;
LOG.trace("matching {} against {}", epd, currentScope);
- if (Utils.matchFilter(bctx, currentScope, epd)) {
+ if (matchFilter(bctx, currentScope, epd)) {
LOG.debug("Matched {} against {}", epd, currentScope);
notifyListener(epd, currentScope, isAdded, sref.getBundle(), epl);
}
@@ -172,10 +168,24 @@ public class InterfaceMonitorManager {
}
public synchronized void close() {
- for (Interest interest : interestingScopes.values()) {
+ for (Interest interest : interests.values()) {
interest.im.close();
}
- interestingScopes.clear();
- handledEndpointListeners.clear();
+ interests.clear();
+ listenerScopes.clear();
+ }
+
+ /**
+ * Only for test case!
+ */
+ protected synchronized Map<String, Interest> getInterests() {
+ return interests;
+ }
+
+ /**
+ * Only for test case!
+ */
+ protected synchronized Map<ServiceReference, List<String>> getListenerScopes() {
+ return listenerScopes;
}
}
Modified: cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/util/Utils.java
URL: http://svn.apache.org/viewvc/cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/util/Utils.java?rev=1499160&r1=1499159&r2=1499160&view=diff
==============================================================================
--- cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/util/Utils.java (original)
+++ cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/util/Utils.java Wed Jul 3 00:23:45 2013
@@ -21,6 +21,8 @@ package org.apache.cxf.dosgi.discovery.z
import java.util.Arrays;
import java.util.Collection;
import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.Map;
import java.util.UUID;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -121,4 +123,20 @@ public final class Utils {
Matcher m = OBJECTCLASS_PATTERN.matcher(scope);
return m.matches() ? m.group(1) : null;
}
+
+ /**
+ * Returns a service's properties as a map.
+ *
+ * @param serviceReference a service reference
+ * @return the service's properties as a map
+ */
+ public static Map<String, Object> getProperties(ServiceReference serviceReference) {
+ String[] keys = serviceReference.getPropertyKeys();
+ Map<String, Object> props = new HashMap<String, Object>(keys.length);
+ for (String key : keys) {
+ Object val = serviceReference.getProperty(key);
+ props.put(key, val);
+ }
+ return props;
+ }
}
Modified: cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/InterfaceMonitorManagerTest.java
URL: http://svn.apache.org/viewvc/cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/InterfaceMonitorManagerTest.java?rev=1499160&r1=1499159&r2=1499160&view=diff
==============================================================================
--- cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/InterfaceMonitorManagerTest.java (original)
+++ cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/InterfaceMonitorManagerTest.java Wed Jul 3 00:23:45 2013
@@ -93,52 +93,52 @@ public class InterfaceMonitorManagerTest
// sref has no scope -> nothing should happen
- assertEquals(0, eltc.getHandledEndpointListeners().size());
- assertEquals(0, eltc.getInterestingScopes().size());
+ assertEquals(0, eltc.getListenerScopes().size());
+ assertEquals(0, eltc.getInterests().size());
//p.put(EndpointListener.ENDPOINT_LISTENER_SCOPE, );
eltc.addInterest(sref, "(objectClass=mine)", "mine");
- assertEquals(1, eltc.getHandledEndpointListeners().size());
- assertEquals(1, eltc.getHandledEndpointListeners().get(sref).size());
- assertEquals("(objectClass=mine)", eltc.getHandledEndpointListeners().get(sref).get(0));
- assertEquals(1, eltc.getInterestingScopes().size());
+ assertEquals(1, eltc.getListenerScopes().size());
+ assertEquals(1, eltc.getListenerScopes().get(sref).size());
+ assertEquals("(objectClass=mine)", eltc.getListenerScopes().get(sref).get(0));
+ assertEquals(1, eltc.getInterests().size());
eltc.addInterest(sref, "(objectClass=mine)", "mine");
- assertEquals(1, eltc.getHandledEndpointListeners().size());
- assertEquals(1, eltc.getHandledEndpointListeners().get(sref).size());
- assertEquals("(objectClass=mine)", eltc.getHandledEndpointListeners().get(sref).get(0));
- assertEquals(1, eltc.getInterestingScopes().size());
+ assertEquals(1, eltc.getListenerScopes().size());
+ assertEquals(1, eltc.getListenerScopes().get(sref).size());
+ assertEquals("(objectClass=mine)", eltc.getListenerScopes().get(sref).get(0));
+ assertEquals(1, eltc.getInterests().size());
eltc.addInterest(sref2, "(objectClass=mine)", "mine");
- assertEquals(2, eltc.getHandledEndpointListeners().size());
- assertEquals(1, eltc.getHandledEndpointListeners().get(sref).size());
- assertEquals(1, eltc.getHandledEndpointListeners().get(sref2).size());
- assertEquals("(objectClass=mine)", eltc.getHandledEndpointListeners().get(sref).get(0));
- assertEquals("(objectClass=mine)", eltc.getHandledEndpointListeners().get(sref2).get(0));
- assertEquals(1, eltc.getInterestingScopes().size());
+ assertEquals(2, eltc.getListenerScopes().size());
+ assertEquals(1, eltc.getListenerScopes().get(sref).size());
+ assertEquals(1, eltc.getListenerScopes().get(sref2).size());
+ assertEquals("(objectClass=mine)", eltc.getListenerScopes().get(sref).get(0));
+ assertEquals("(objectClass=mine)", eltc.getListenerScopes().get(sref2).get(0));
+ assertEquals(1, eltc.getInterests().size());
eltc.removeInterest(sref);
- assertEquals(1, eltc.getHandledEndpointListeners().size());
- assertEquals(1, eltc.getHandledEndpointListeners().get(sref2).size());
- assertEquals("(objectClass=mine)", eltc.getHandledEndpointListeners().get(sref2).get(0));
- assertEquals(1, eltc.getInterestingScopes().size());
+ assertEquals(1, eltc.getListenerScopes().size());
+ assertEquals(1, eltc.getListenerScopes().get(sref2).size());
+ assertEquals("(objectClass=mine)", eltc.getListenerScopes().get(sref2).get(0));
+ assertEquals(1, eltc.getInterests().size());
eltc.removeInterest(sref);
- assertEquals(1, eltc.getHandledEndpointListeners().size());
- assertEquals(1, eltc.getHandledEndpointListeners().get(sref2).size());
- assertEquals("(objectClass=mine)", eltc.getHandledEndpointListeners().get(sref2).get(0));
- assertEquals(1, eltc.getInterestingScopes().size());
+ assertEquals(1, eltc.getListenerScopes().size());
+ assertEquals(1, eltc.getListenerScopes().get(sref2).size());
+ assertEquals("(objectClass=mine)", eltc.getListenerScopes().get(sref2).get(0));
+ assertEquals(1, eltc.getInterests().size());
eltc.removeInterest(sref2);
- assertEquals(0, eltc.getHandledEndpointListeners().size());
- assertEquals(0, eltc.getInterestingScopes().size());
+ assertEquals(0, eltc.getListenerScopes().size());
+ assertEquals(0, eltc.getInterests().size());
c.verify();
for (IMocksControl control : controls) {