You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aries.apache.org by cs...@apache.org on 2018/02/06 15:59:26 UTC
[1/2] aries-rsa git commit: [ARIES-1771] Support endpoint changes in
zookeeper discovery
Repository: aries-rsa
Updated Branches:
refs/heads/master b7597a4f1 -> 389bae151
[ARIES-1771] Support endpoint changes in zookeeper discovery
Project: http://git-wip-us.apache.org/repos/asf/aries-rsa/repo
Commit: http://git-wip-us.apache.org/repos/asf/aries-rsa/commit/025516f2
Tree: http://git-wip-us.apache.org/repos/asf/aries-rsa/tree/025516f2
Diff: http://git-wip-us.apache.org/repos/asf/aries-rsa/diff/025516f2
Branch: refs/heads/master
Commit: 025516f2cbd1a361a588755db1b8961668ce0c8d
Parents: b7597a4
Author: Christian Schneider <cs...@adobe.com>
Authored: Tue Feb 6 16:54:28 2018 +0100
Committer: Christian Schneider <cs...@adobe.com>
Committed: Tue Feb 6 16:54:28 2018 +0100
----------------------------------------------------------------------
.../discovery/zookeeper/ZooKeeperDiscovery.java | 3 +-
.../publish/PublishingEndpointListener.java | 50 +++++++--
.../PublishingEndpointListenerFactory.java | 5 +-
.../subscribe/EndpointListenerTracker.java | 25 ++++-
.../zookeeper/subscribe/InterfaceMonitor.java | 17 ++-
.../subscribe/InterfaceMonitorManager.java | 106 +++++++++++--------
.../PublishingEndpointListenerFactoryTest.java | 61 ++++++-----
.../subscribe/InterfaceMonitorTest.java | 4 +
8 files changed, 183 insertions(+), 88 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/025516f2/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java
index 0e03722..584da35 100644
--- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java
@@ -33,7 +33,6 @@ import org.apache.zookeeper.ZooKeeper;
import org.osgi.framework.BundleContext;
import org.osgi.service.cm.ConfigurationException;
import org.osgi.service.cm.ManagedService;
-import org.osgi.service.remoteserviceadmin.EndpointEventListener;
import org.osgi.util.tracker.ServiceTracker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,7 +46,7 @@ public class ZooKeeperDiscovery implements Watcher, ManagedService {
private final BundleContext bctx;
private PublishingEndpointListenerFactory endpointListenerFactory;
- private ServiceTracker<EndpointEventListener, EndpointEventListener> endpointListenerTracker;
+ private ServiceTracker<?, ?> endpointListenerTracker;
private InterfaceMonitorManager imManager;
private ZooKeeper zkClient;
private boolean closed;
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/025516f2/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListener.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListener.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListener.java
index 82387ba..d5fe7a6 100644
--- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListener.java
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListener.java
@@ -38,10 +38,12 @@ import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
import org.osgi.framework.BundleContext;
import org.osgi.service.remoteserviceadmin.EndpointDescription;
import org.osgi.service.remoteserviceadmin.EndpointEvent;
import org.osgi.service.remoteserviceadmin.EndpointEventListener;
+import org.osgi.service.remoteserviceadmin.EndpointListener;
import org.osgi.util.tracker.ServiceTracker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,7 +51,8 @@ import org.slf4j.LoggerFactory;
/**
* Listens for local Endpoints and publishes them to ZooKeeper.
*/
-public class PublishingEndpointListener implements EndpointEventListener {
+@SuppressWarnings("deprecation")
+public class PublishingEndpointListener implements EndpointEventListener, EndpointListener {
private static final Logger LOG = LoggerFactory.getLogger(PublishingEndpointListener.class);
@@ -79,13 +82,45 @@ public class PublishingEndpointListener implements EndpointEventListener {
endpointRemoved(endpoint, filter);
break;
case EndpointEvent.MODIFIED:
- endpointRemoved(endpoint, filter);
- endpointAdded(endpoint, filter);
+ endpointModified(endpoint, filter);
break;
}
}
- private void endpointAdded(EndpointDescription endpoint, String matchedFilter) {
+ private void endpointModified(EndpointDescription endpoint, String filter) {
+ try {
+ modifyEndpoint(endpoint);
+ } catch (Exception e) {
+ LOG.error("Error modifying endpoint data in zookeeper for endpoint {}", endpoint.getId(), e);
+ }
+ }
+
+ private void modifyEndpoint(EndpointDescription endpoint) throws URISyntaxException, KeeperException, InterruptedException {
+ Collection<String> interfaces = endpoint.getInterfaces();
+ String endpointKey = getKey(endpoint);
+ Map<String, Object> props = new HashMap<String, Object>(endpoint.getProperties());
+
+ // process plugins
+ Object[] plugins = discoveryPluginTracker.getServices();
+ if (plugins != null) {
+ for (Object plugin : plugins) {
+ if (plugin instanceof DiscoveryPlugin) {
+ endpointKey = ((DiscoveryPlugin)plugin).process(props, endpointKey);
+ }
+ }
+ }
+ LOG.info("Changing endpoint in zookeeper: {}", endpoint);
+ for (String name : interfaces) {
+ String path = Utils.getZooKeeperPath(name);
+ String fullPath = path + '/' + endpointKey;
+ LOG.info("Changing ZooKeeper node for service with path {}", fullPath);
+ createPath(path, zk);
+ zk.setData(fullPath, getData(endpoint), -1);
+ }
+ }
+
+ @Override
+ public void endpointAdded(EndpointDescription endpoint, String matchedFilter) {
synchronized (endpoints) {
if (closed) {
return;
@@ -153,7 +188,8 @@ public class PublishingEndpointListener implements EndpointEventListener {
}
}
- private void endpointRemoved(EndpointDescription endpoint, String matchedFilter) {
+ @Override
+ public void endpointRemoved(EndpointDescription endpoint, String matchedFilter) {
LOG.info("Local EndpointDescription removed: {}", endpoint);
synchronized (endpoints) {
@@ -195,7 +231,9 @@ public class PublishingEndpointListener implements EndpointEventListener {
current.append('/');
current.append(part);
try {
- zk.create(current.toString(), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ if (zk.exists(current.toString(), false) == null) {
+ zk.create(current.toString(), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ }
} catch (NodeExistsException nee) {
// it's not the first node with this path to ever exist - that's normal
}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/025516f2/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerFactory.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerFactory.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerFactory.java
index 8a40b92..444f7bb 100644
--- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerFactory.java
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerFactory.java
@@ -31,6 +31,7 @@ import org.osgi.framework.Constants;
import org.osgi.framework.ServiceFactory;
import org.osgi.framework.ServiceRegistration;
import org.osgi.service.remoteserviceadmin.EndpointEventListener;
+import org.osgi.service.remoteserviceadmin.EndpointListener;
import org.osgi.service.remoteserviceadmin.RemoteConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,6 +39,7 @@ import org.slf4j.LoggerFactory;
/**
* Creates local EndpointListeners that publish to ZooKeeper.
*/
+@SuppressWarnings("deprecation")
public class PublishingEndpointListenerFactory implements ServiceFactory<PublishingEndpointListener> {
private static final Logger LOG = LoggerFactory.getLogger(PublishingEndpointListenerFactory.class);
@@ -78,7 +80,8 @@ public class PublishingEndpointListenerFactory implements ServiceFactory<Publish
String.format("(&(%s=*)(%s=%s))", Constants.OBJECTCLASS,
RemoteConstants.ENDPOINT_FRAMEWORK_UUID, uuid));
props.put(ZooKeeperDiscovery.DISCOVERY_ZOOKEEPER_ID, "true");
- serviceRegistration = bctx.registerService(EndpointEventListener.class.getName(), this, props);
+ String[] ifAr = {EndpointEventListener.class.getName(), EndpointListener.class.getName()};
+ serviceRegistration = bctx.registerService(ifAr, this, props);
}
public synchronized void stop() {
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/025516f2/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/EndpointListenerTracker.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/EndpointListenerTracker.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/EndpointListenerTracker.java
index 6e6ed1b..0ed1097 100644
--- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/EndpointListenerTracker.java
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/EndpointListenerTracker.java
@@ -19,37 +19,52 @@
package org.apache.aries.rsa.discovery.zookeeper.subscribe;
import org.osgi.framework.BundleContext;
+import org.osgi.framework.Filter;
+import org.osgi.framework.FrameworkUtil;
+import org.osgi.framework.InvalidSyntaxException;
import org.osgi.framework.ServiceReference;
import org.osgi.service.remoteserviceadmin.EndpointEventListener;
+import org.osgi.service.remoteserviceadmin.EndpointListener;
import org.osgi.util.tracker.ServiceTracker;
/**
* Tracks interest in EndpointListeners. Delegates to InterfaceMonitorManager to manage
* interest in the scopes of each EndpointListener.
*/
-public class EndpointListenerTracker extends ServiceTracker<EndpointEventListener, EndpointEventListener> {
+@SuppressWarnings({ "rawtypes", "deprecation", "unchecked" })
+public class EndpointListenerTracker extends ServiceTracker {
private final InterfaceMonitorManager imManager;
public EndpointListenerTracker(BundleContext bctx, InterfaceMonitorManager imManager) {
- super(bctx, EndpointEventListener.class, null);
+ super(bctx, getfilter(), null);
this.imManager = imManager;
}
+
+ private static Filter getfilter() {
+ String filterSt = String.format("(|(objectClass=%s)(objectClass=%s))", EndpointEventListener.class.getName(),
+ EndpointListener.class.getName());
+ try {
+ return FrameworkUtil.createFilter(filterSt);
+ } catch (InvalidSyntaxException e) {
+ throw new IllegalArgumentException(e.getMessage(), e);
+ }
+ }
@Override
- public EndpointEventListener addingService(ServiceReference<EndpointEventListener> endpointListener) {
+ public Object addingService(ServiceReference endpointListener) {
imManager.addInterest(endpointListener);
return null;
}
@Override
- public void modifiedService(ServiceReference<EndpointEventListener> endpointListener, EndpointEventListener service) {
+ public void modifiedService(ServiceReference endpointListener, Object service) {
// called when an EndpointListener updates its service properties,
// e.g. when its interest scope is expanded/reduced
imManager.addInterest(endpointListener);
}
@Override
- public void removedService(ServiceReference<EndpointEventListener> endpointListener, EndpointEventListener service) {
+ public void removedService(ServiceReference endpointListener, Object service) {
imManager.removeInterest(endpointListener);
}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/025516f2/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitor.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitor.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitor.java
index 6972989..7078bb8 100644
--- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitor.java
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitor.java
@@ -91,6 +91,13 @@ public class InterfaceMonitor implements Watcher, StatCallback {
private void watch() {
LOG.debug("registering a ZooKeeper.exists({}) callback", znode);
zk.exists(znode, this, this, null);
+ zk.getData(znode, this, new DataCallback() {
+
+ @Override
+ public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
+ processDelta();
+ }
+ }, null);
}
/**
@@ -199,18 +206,22 @@ public class InterfaceMonitor implements Watcher, StatCallback {
EndpointDescription endpoint = getEndpointDescriptionFromNode(childZNode);
if (endpoint != null) {
EndpointDescription prevEndpoint = prevNodes.get(child);
- LOG.info("found new node " + zn + "/[" + child + "] ( []->child ) props: "
- + endpoint.getProperties().values());
+
newNodes.put(child, endpoint);
prevNodes.remove(child);
foundANode = true;
LOG.debug("Properties: {}", endpoint.getProperties());
if (prevEndpoint == null) {
// This guy is new
+ LOG.info("found new node " + zn + "/[" + child + "] ( []->child ) props: "
+ + endpoint.getProperties().values());
EndpointEvent event = new EndpointEvent(EndpointEvent.ADDED, endpoint);
endpointListener.endpointChanged(event, null);
} else if (!prevEndpoint.getProperties().equals(endpoint.getProperties())) {
- // TODO
+ LOG.info("Found changed node " + zn + "/[" + child + "] ( []->child ) props: "
+ + endpoint.getProperties().values());
+ EndpointEvent event = new EndpointEvent(EndpointEvent.MODIFIED, endpoint);
+ endpointListener.endpointChanged(event, null);
}
}
if (recursive && processChildren(childZNode, newNodes, prevNodes)) {
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/025516f2/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManager.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManager.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManager.java
index 26e4462..7d6e4ae 100644
--- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManager.java
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManager.java
@@ -33,13 +33,13 @@ import org.apache.aries.rsa.discovery.zookeeper.ZooKeeperDiscovery;
import org.apache.aries.rsa.discovery.zookeeper.util.Utils;
import org.apache.aries.rsa.util.StringPlus;
import org.apache.zookeeper.ZooKeeper;
-import org.osgi.framework.Bundle;
import org.osgi.framework.BundleContext;
import org.osgi.framework.Filter;
import org.osgi.framework.ServiceReference;
import org.osgi.service.remoteserviceadmin.EndpointDescription;
import org.osgi.service.remoteserviceadmin.EndpointEvent;
import org.osgi.service.remoteserviceadmin.EndpointEventListener;
+import org.osgi.service.remoteserviceadmin.EndpointListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,6 +49,7 @@ import org.slf4j.LoggerFactory;
* The InterfaceMonitor calls back when it detects added or removed external Endpoints.
* These events are then forwarded to all interested EndpointEventListeners.
*/
+@SuppressWarnings({"deprecation", "rawtypes"})
public class InterfaceMonitorManager {
private static final Logger LOG = LoggerFactory.getLogger(InterfaceMonitorManager.class);
private static final Pattern OBJECTCLASS_PATTERN = Pattern.compile(".*\\(objectClass=([^)]+)\\).*");
@@ -56,14 +57,14 @@ public class InterfaceMonitorManager {
private final BundleContext bctx;
private final ZooKeeper zk;
// map of EndpointEventListeners and the scopes they are interested in
- private final Map<ServiceReference<EndpointEventListener>, List<String>> EndpointEventListenerScopes =
- new HashMap<ServiceReference<EndpointEventListener>, List<String>>();
+ private final Map<ServiceReference, List<String>> epListenerScopes =
+ new HashMap<ServiceReference, List<String>>();
// map of scopes and their interest data
private final Map<String, Interest> interests = new HashMap<String, Interest>();
protected static class Interest {
- List<ServiceReference<EndpointEventListener>> EndpointEventListeners =
- new CopyOnWriteArrayList<ServiceReference<EndpointEventListener>>();
+ List<ServiceReference> epListeners =
+ new CopyOnWriteArrayList<ServiceReference>();
InterfaceMonitor monitor;
}
@@ -72,26 +73,26 @@ public class InterfaceMonitorManager {
this.zk = zk;
}
- public void addInterest(ServiceReference<EndpointEventListener> EndpointEventListener) {
- if (isOurOwnEndpointEventListener(EndpointEventListener)) {
+ public void addInterest(ServiceReference<?> eplistener) {
+ if (isOurOwnEndpointEventListener(eplistener)) {
LOG.debug("Skipping our own EndpointEventListener");
return;
}
- List<String> scopes = getScopes(EndpointEventListener);
+ List<String> scopes = getScopes(eplistener);
LOG.debug("adding Interests: {}", scopes);
for (String scope : scopes) {
String objClass = getObjectClass(scope);
- addInterest(EndpointEventListener, scope, objClass);
+ addInterest(eplistener, scope, objClass);
}
}
- private static boolean isOurOwnEndpointEventListener(ServiceReference<EndpointEventListener> EndpointEventListener) {
+ private static boolean isOurOwnEndpointEventListener(ServiceReference<?> EndpointEventListener) {
return Boolean.parseBoolean(String.valueOf(
EndpointEventListener.getProperty(ZooKeeperDiscovery.DISCOVERY_ZOOKEEPER_ID)));
}
- public synchronized void addInterest(ServiceReference<EndpointEventListener> EndpointEventListener,
+ public synchronized void addInterest(ServiceReference epListener,
String scope, String objClass) {
// get or create interest for given scope and add listener to it
Interest interest = interests.get(scope);
@@ -99,27 +100,27 @@ public class InterfaceMonitorManager {
// create interest, add listener and start monitor
interest = new Interest();
interests.put(scope, interest);
- interest.EndpointEventListeners.add(EndpointEventListener); // add it before monitor starts so we don't miss events
+ interest.epListeners.add(epListener); // add it before monitor starts so we don't miss events
interest.monitor = createInterfaceMonitor(scope, objClass, interest);
interest.monitor.start();
} else {
// interest already exists, so just add listener to it
- if (!interest.EndpointEventListeners.contains(EndpointEventListener)) {
- interest.EndpointEventListeners.add(EndpointEventListener);
+ if (!interest.epListeners.contains(epListener)) {
+ interest.epListeners.add(epListener);
}
// notify listener of all known endpoints for given scope
// (as EndpointEventListener contract requires of all added/modified listeners)
for (EndpointDescription endpoint : interest.monitor.getEndpoints()) {
EndpointEvent event = new EndpointEvent(EndpointEvent.ADDED, endpoint);
- notifyListeners(event, scope, Arrays.asList(EndpointEventListener));
+ notifyListeners(event, scope, Arrays.asList(epListener));
}
}
// add scope to listener's scopes list
- List<String> scopes = EndpointEventListenerScopes.get(EndpointEventListener);
+ List<String> scopes = epListenerScopes.get(epListener);
if (scopes == null) {
scopes = new ArrayList<String>(1);
- EndpointEventListenerScopes.put(EndpointEventListener, scopes);
+ epListenerScopes.put(epListener, scopes);
}
if (!scopes.contains(scope)) {
scopes.add(scope);
@@ -128,7 +129,7 @@ public class InterfaceMonitorManager {
public synchronized void removeInterest(ServiceReference<EndpointEventListener> EndpointEventListener) {
LOG.info("removing EndpointEventListener interests: {}", EndpointEventListener);
- List<String> scopes = EndpointEventListenerScopes.get(EndpointEventListener);
+ List<String> scopes = epListenerScopes.get(EndpointEventListener);
if (scopes == null) {
return;
}
@@ -136,14 +137,14 @@ public class InterfaceMonitorManager {
for (String scope : scopes) {
Interest interest = interests.get(scope);
if (interest != null) {
- interest.EndpointEventListeners.remove(EndpointEventListener);
- if (interest.EndpointEventListeners.isEmpty()) {
+ interest.epListeners.remove(EndpointEventListener);
+ if (interest.epListeners.isEmpty()) {
interest.monitor.close();
interests.remove(scope);
}
}
}
- EndpointEventListenerScopes.remove(EndpointEventListener);
+ epListenerScopes.remove(EndpointEventListener);
}
protected InterfaceMonitor createInterfaceMonitor(final String scope, String objClass, final Interest interest) {
@@ -152,30 +153,37 @@ public class InterfaceMonitorManager {
@Override
public void endpointChanged(EndpointEvent event, String filter) {
- notifyListeners(event, scope, interest.EndpointEventListeners);
+ notifyListeners(event, scope, interest.epListeners);
}
};
return new InterfaceMonitor(zk, objClass, listener, scope);
}
private void notifyListeners(EndpointEvent event, String currentScope,
- List<ServiceReference<EndpointEventListener>> EndpointEventListeners) {
+ List<ServiceReference> epListeners) {
EndpointDescription endpoint = event.getEndpoint();
- for (ServiceReference<EndpointEventListener> EndpointEventListenerRef : EndpointEventListeners) {
- EndpointEventListener service = bctx.getService(EndpointEventListenerRef);
+ for (ServiceReference<?> epListenerRef : epListeners) {
+ if (epListenerRef.getBundle() == null) {
+ LOG.info("listening service was unregistered, ignoring");
+ }
+ Object service = bctx.getService(epListenerRef);
+ LOG.trace("matching {} against {}", endpoint, currentScope);
+ if (matchFilter(bctx, currentScope, endpoint)) {
+ LOG.debug("Matched {} against {}", endpoint, currentScope);
try {
- EndpointEventListener EndpointEventListener = (EndpointEventListener)service;
- LOG.trace("matching {} against {}", endpoint, currentScope);
- if (matchFilter(bctx, currentScope, endpoint)) {
- LOG.debug("Matched {} against {}", endpoint, currentScope);
- notifyListener(event, currentScope, EndpointEventListenerRef.getBundle(),
- EndpointEventListener);
+ if (service instanceof EndpointEventListener) {
+ EndpointEventListener epeListener = (EndpointEventListener)service;
+ notifyListener(event, currentScope, epeListener);
+ } else if (service instanceof EndpointListener) {
+ EndpointListener epListener = (EndpointListener)service;
+ notifyListener(event, currentScope, epListener);
}
} finally {
if (service != null) {
- bctx.ungetService(EndpointEventListenerRef);
+ bctx.ungetService(epListenerRef);
}
}
+ }
}
}
@@ -194,14 +202,28 @@ public class InterfaceMonitorManager {
}
- private void notifyListener(EndpointEvent event, String currentScope,
- Bundle listenerBundle, EndpointEventListener listener) {
+ private void notifyListener(EndpointEvent event, String currentScope, EndpointEventListener listener) {
EndpointDescription endpoint = event.getEndpoint();
- if (listenerBundle == null) {
- LOG.info("listening service was unregistered, ignoring");
- } else {
- LOG.info("Calling endpointchanged from bundle {} for endpoint {} ", listenerBundle.getSymbolicName(), endpoint);
- listener.endpointChanged(event, currentScope);
+ LOG.info("Calling endpointchanged on class {} for filter {}, type {}, endpoint {} ", listener, currentScope, endpoint);
+ listener.endpointChanged(event, currentScope);
+ }
+
+ private void notifyListener(EndpointEvent event, String currentScope, EndpointListener listener) {
+ EndpointDescription endpoint = event.getEndpoint();
+ LOG.info("Calling old listener on class {} for filter {}, type {}, endpoint {} ", listener, currentScope, endpoint);
+ switch (event.getType()) {
+ case EndpointEvent.ADDED:
+ listener.endpointAdded(endpoint, currentScope);
+ break;
+
+ case EndpointEvent.MODIFIED:
+ listener.endpointAdded(endpoint, currentScope);
+ listener.endpointRemoved(endpoint, currentScope);
+ break;
+
+ case EndpointEvent.REMOVED:
+ listener.endpointRemoved(endpoint, currentScope);
+ break;
}
}
@@ -210,7 +232,7 @@ public class InterfaceMonitorManager {
interest.monitor.close();
}
interests.clear();
- EndpointEventListenerScopes.clear();
+ epListenerScopes.clear();
}
/**
@@ -223,8 +245,8 @@ public class InterfaceMonitorManager {
/**
* Only for test case!
*/
- protected synchronized Map<ServiceReference<EndpointEventListener>, List<String>> getEndpointListenerScopes() {
- return EndpointEventListenerScopes;
+ protected synchronized Map<ServiceReference, List<String>> getEndpointListenerScopes() {
+ return epListenerScopes;
}
protected List<String> getScopes(ServiceReference<?> sref) {
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/025516f2/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerFactoryTest.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerFactoryTest.java b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerFactoryTest.java
index 777c11c..381ec9d 100644
--- a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerFactoryTest.java
+++ b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerFactoryTest.java
@@ -18,58 +18,50 @@
*/
package org.apache.aries.rsa.discovery.zookeeper.publish;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
import java.util.Dictionary;
import java.util.List;
import org.apache.zookeeper.ZooKeeper;
import org.easymock.EasyMock;
import org.easymock.IMocksControl;
+import org.junit.Before;
+import org.junit.Test;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceRegistration;
import org.osgi.service.remoteserviceadmin.EndpointEventListener;
+import org.osgi.service.remoteserviceadmin.EndpointListener;
-import junit.framework.TestCase;
+@SuppressWarnings("deprecation")
+public class PublishingEndpointListenerFactoryTest {
-public class PublishingEndpointListenerFactoryTest extends TestCase {
+ private IMocksControl c;
+ private BundleContext ctx;
+ private ZooKeeper zk;
- @SuppressWarnings("unchecked")
+ @Before
+ public void before() {
+ c = EasyMock.createNiceControl();
+ zk = c.createMock(ZooKeeper.class);
+ ctx = createBundleContext();
+ }
+
+ @Test
public void testScope() {
- IMocksControl c = EasyMock.createNiceControl();
-
- BundleContext ctx = c.createMock(BundleContext.class);
- ZooKeeper zk = c.createMock(ZooKeeper.class);
- @SuppressWarnings("rawtypes")
- ServiceRegistration sreg = c.createMock(ServiceRegistration.class);
-
PublishingEndpointListenerFactory eplf = new PublishingEndpointListenerFactory(zk, ctx);
- EasyMock.expect(ctx.registerService(EasyMock.eq(EndpointEventListener.class.getName()), EasyMock.eq(eplf),
- (Dictionary<String, String>)EasyMock.anyObject())).andReturn(sreg).once();
-
- EasyMock.expect(ctx.getProperty(EasyMock.eq("org.osgi.framework.uuid"))).andReturn("myUUID").anyTimes();
-
c.replay();
eplf.start();
c.verify();
}
- @SuppressWarnings("unchecked")
+ @Test
public void testServiceFactory() {
- IMocksControl c = EasyMock.createNiceControl();
-
- BundleContext ctx = c.createMock(BundleContext.class);
- ZooKeeper zk = c.createMock(ZooKeeper.class);
- @SuppressWarnings("rawtypes")
- ServiceRegistration sreg = c.createMock(ServiceRegistration.class);
-
PublishingEndpointListenerFactory eplf = new PublishingEndpointListenerFactory(zk, ctx);
- EasyMock.expect(ctx.registerService(EasyMock.eq(EndpointEventListener.class.getName()), EasyMock.eq(eplf),
- (Dictionary<String, String>)EasyMock.anyObject())).andReturn(sreg).once();
-
- EasyMock.expect(ctx.getProperty(EasyMock.eq("org.osgi.framework.uuid"))).andReturn("myUUID").anyTimes();
-
PublishingEndpointListener eli = c.createMock(PublishingEndpointListener.class);
eli.close();
EasyMock.expectLastCall().once();
@@ -78,7 +70,6 @@ public class PublishingEndpointListenerFactoryTest extends TestCase {
eplf.start();
PublishingEndpointListener service = eplf.getService(null, null);
- assertNotNull(service);
assertTrue(service instanceof EndpointEventListener);
List<PublishingEndpointListener> listeners = eplf.getListeners();
@@ -97,4 +88,16 @@ public class PublishingEndpointListenerFactoryTest extends TestCase {
c.verify();
}
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ private BundleContext createBundleContext() {
+ BundleContext ctx = c.createMock(BundleContext.class);
+ ServiceRegistration sreg = c.createMock(ServiceRegistration.class);
+ String[] ifAr = {EndpointEventListener.class.getName(), EndpointListener.class.getName()};
+ EasyMock.expect(ctx.registerService(EasyMock.aryEq(ifAr), EasyMock.anyObject(),
+ (Dictionary<String, String>)EasyMock.anyObject())).andReturn(sreg).once();
+
+ EasyMock.expect(ctx.getProperty(EasyMock.eq("org.osgi.framework.uuid"))).andReturn("myUUID").anyTimes();
+ return ctx;
+ }
}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/025516f2/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorTest.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorTest.java b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorTest.java
index e2ecece..53ddbc4 100644
--- a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorTest.java
+++ b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorTest.java
@@ -20,6 +20,7 @@ package org.apache.aries.rsa.discovery.zookeeper.subscribe;
import static org.easymock.EasyMock.eq;
import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
import java.util.Collections;
@@ -29,6 +30,7 @@ import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.AsyncCallback.DataCallback;
import org.apache.zookeeper.data.Stat;
import org.easymock.EasyMock;
import org.easymock.IMocksControl;
@@ -52,6 +54,8 @@ public class InterfaceMonitorTest extends TestCase {
InterfaceMonitor im = new InterfaceMonitor(zk, interf, endpointListener, scope);
zk.exists(eq(node), eq(im), eq(im), EasyMock.anyObject());
EasyMock.expectLastCall().once();
+ zk.getData(eq(node), eq(im), EasyMock.anyObject(DataCallback.class), EasyMock.anyObject());
+ expectLastCall();
expect(zk.exists(eq(node), eq(false))).andReturn(new Stat()).anyTimes();
expect(zk.getChildren(eq(node), eq(false))).andReturn(Collections.<String> emptyList()).once();
[2/2] aries-rsa git commit: Avoid connection refused exceptions
during tests when discovery client is faster than zookeeper server
Posted by cs...@apache.org.
Avoid connection refused exceptions during tests when discovery client is faster than zookeeper server
Project: http://git-wip-us.apache.org/repos/asf/aries-rsa/repo
Commit: http://git-wip-us.apache.org/repos/asf/aries-rsa/commit/389bae15
Tree: http://git-wip-us.apache.org/repos/asf/aries-rsa/tree/389bae15
Diff: http://git-wip-us.apache.org/repos/asf/aries-rsa/diff/389bae15
Branch: refs/heads/master
Commit: 389bae1517d421621cc96bb2c75f8219c82f3a73
Parents: 025516f
Author: Christian Schneider <cs...@adobe.com>
Authored: Tue Feb 6 16:59:12 2018 +0100
Committer: Christian Schneider <cs...@adobe.com>
Committed: Tue Feb 6 16:59:12 2018 +0100
----------------------------------------------------------------------
.../discovery/zookeeper/ZooKeeperDiscovery.java | 39 ++++++++++++++++----
1 file changed, 32 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/389bae15/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java
index 584da35..13dadad 100644
--- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java
@@ -19,6 +19,7 @@
package org.apache.aries.rsa.discovery.zookeeper;
import java.io.IOException;
+import java.net.Socket;
import java.util.Dictionary;
import java.util.Enumeration;
import java.util.HashMap;
@@ -58,7 +59,7 @@ public class ZooKeeperDiscovery implements Watcher, ManagedService {
this.bctx = bctx;
}
- public synchronized void updated(Dictionary<String, ?> configuration) throws ConfigurationException {
+ public synchronized void updated(final Dictionary<String, ?> configuration) throws ConfigurationException {
LOG.debug("Received configuration update for Zookeeper Discovery: {}", configuration);
// make changes only if config actually changed, to prevent unnecessary ZooKeeper reconnections
if (!toMap(configuration).equals(toMap(curConfiguration))) {
@@ -66,13 +67,18 @@ public class ZooKeeperDiscovery implements Watcher, ManagedService {
curConfiguration = configuration;
// config is null if it doesn't exist, is being deleted or has not yet been loaded
// in which case we just stop running
- if (!closed && configuration != null) {
- try {
- createZookeeper(configuration);
- } catch (IOException e) {
- throw new ConfigurationException(null, "Error starting zookeeper client", e);
- }
+ if (closed || configuration == null) {
+ return;
}
+ new Thread(new Runnable() {
+ public void run() {
+ try {
+ createZookeeper(configuration);
+ } catch (IOException e) {
+ LOG.error("Error starting zookeeper client", e);
+ }
+ }
+ }).start();
}
}
@@ -155,8 +161,27 @@ public class ZooKeeperDiscovery implements Watcher, ManagedService {
String host = (String)getWithDefault(config, "zookeeper.host", "localhost");
String port = (String)getWithDefault(config, "zookeeper.port", "2181");
int timeout = Integer.parseInt((String)getWithDefault(config, "zookeeper.timeout", "3000"));
+ waitPort(host, Integer.parseInt(port));
zkClient = createZooKeeper(host, port, timeout);
}
+
+ private void waitPort(String host, int port) {
+ long start = System.currentTimeMillis();
+ while (System.currentTimeMillis() - start < 2000) {
+ try (Socket socket = new Socket(host, port)) {
+ return;
+ } catch (IOException e) {
+ safeSleep();
+ }
+ }
+ }
+
+ private void safeSleep() {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e1) {
+ }
+ }
public Object getWithDefault(Dictionary<String, ?> config, String key, Object defaultValue) {
Object value = config.get(key);