You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aries.apache.org by cs...@apache.org on 2018/01/30 11:15:23 UTC
[2/3] aries-rsa git commit: Move local service tracking to
EndpointListenerManager
Move local service tracking to EndpointListenerManager
Project: http://git-wip-us.apache.org/repos/asf/aries-rsa/repo
Commit: http://git-wip-us.apache.org/repos/asf/aries-rsa/commit/f9e4d403
Tree: http://git-wip-us.apache.org/repos/asf/aries-rsa/tree/f9e4d403
Diff: http://git-wip-us.apache.org/repos/asf/aries-rsa/diff/f9e4d403
Branch: refs/heads/master
Commit: f9e4d4039d396f8aad1809d67f173e1c8a5dd128
Parents: b16d2b9
Author: Christian Schneider <cs...@adobe.com>
Authored: Tue Jan 30 12:06:22 2018 +0100
Committer: Christian Schneider <cs...@adobe.com>
Committed: Tue Jan 30 12:06:22 2018 +0100
----------------------------------------------------------------------
.../aries/rsa/topologymanager/Activator.java | 5 ++
.../importer/EndpointListenerManager.java | 39 +++++++++-
.../rsa/topologymanager/importer/MultiMap.java | 24 +++---
.../importer/TopologyManagerImport.java | 78 ++++++--------------
.../importer/TopologyManagerImportTest.java | 3 -
5 files changed, 78 insertions(+), 71 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/f9e4d403/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/Activator.java
----------------------------------------------------------------------
diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/Activator.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/Activator.java
index 02d9674..4fe0581 100644
--- a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/Activator.java
+++ b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/Activator.java
@@ -29,6 +29,7 @@ import org.apache.aries.rsa.topologymanager.exporter.DefaultExportPolicy;
import org.apache.aries.rsa.topologymanager.exporter.EndpointListenerNotifier;
import org.apache.aries.rsa.topologymanager.exporter.EndpointRepository;
import org.apache.aries.rsa.topologymanager.exporter.TopologyManagerExport;
+import org.apache.aries.rsa.topologymanager.importer.EndpointListenerManager;
import org.apache.aries.rsa.topologymanager.importer.TopologyManagerImport;
import org.osgi.framework.BundleActivator;
import org.osgi.framework.BundleContext;
@@ -57,6 +58,7 @@ public class Activator implements BundleActivator {
private ThreadPoolExecutor exportExecutor;
private ServiceTracker<EndpointListener, EndpointListener> epListenerTracker;
private ServiceTracker<ExportPolicy, ExportPolicy> policyTracker;
+ private EndpointListenerManager endpointListenerManager;
public void start(final BundleContext bc) throws Exception {
Dictionary<String, String> props = new Hashtable<String, String>();
@@ -103,6 +105,8 @@ public class Activator implements BundleActivator {
exportExecutor = new ThreadPoolExecutor(5, 10, 50, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
exportManager = new TopologyManagerExport(endpointRepo, exportExecutor, policy);
importManager = new TopologyManagerImport(bc);
+ endpointListenerManager = new EndpointListenerManager(bc, importManager);
+ endpointListenerManager.start();
rsaTracker = new RSATracker(bc, RemoteServiceAdmin.class, null);
bc.addServiceListener(exportManager);
rsaTracker.open();
@@ -121,6 +125,7 @@ public class Activator implements BundleActivator {
bc.removeServiceListener(exportManager);
exportExecutor.shutdown();
importManager.stop();
+ endpointListenerManager.stop();
rsaTracker.close();
exportManager = null;
}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/f9e4d403/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/EndpointListenerManager.java
----------------------------------------------------------------------
diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/EndpointListenerManager.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/EndpointListenerManager.java
index 1207f9f..87f0fbe 100644
--- a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/EndpointListenerManager.java
+++ b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/EndpointListenerManager.java
@@ -25,14 +25,21 @@ import java.util.List;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceRegistration;
+import org.osgi.framework.hooks.service.FindHook;
+import org.osgi.framework.hooks.service.ListenerHook;
import org.osgi.service.remoteserviceadmin.EndpointListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Manages an EndpointListener and adjusts its scope according to requested service filters.
+ * Manages the endpoint listener for the import of external services.
+ * The endpoint listener scope reflects the combined filters of all services
+ * that are asked for (by listeners and service lookups) in the current system.
+ *
+ * Discovery will then send callbacks when external endpoints are added / removed that match
+ * the interest in the local system.
*/
-public class EndpointListenerManager {
+public class EndpointListenerManager implements ServiceInterestListener{
private static final Logger LOG = LoggerFactory.getLogger(EndpointListenerManager.class);
@@ -40,15 +47,26 @@ public class EndpointListenerManager {
private volatile ServiceRegistration<EndpointListener> serviceRegistration;
private final List<String> filters = new ArrayList<String>();
private final EndpointListener endpointListener;
+ private final ListenerHookImpl listenerHook;
+ private RSFindHook findHook;
+
+ /**
+ * Count service interest by filter. This allows to modify the scope of the EndpointListener as seldom as possible
+ */
+ private final ReferenceCounter<String> importInterestsCounter = new ReferenceCounter<String>();
public EndpointListenerManager(BundleContext bc, EndpointListener endpointListener) {
this.bctx = bc;
this.endpointListener = endpointListener;
+ this.listenerHook = new ListenerHookImpl(bc, this);
+ findHook = new RSFindHook(bc, this);
}
- protected void start() {
+ public void start() {
serviceRegistration = bctx.registerService(EndpointListener.class, endpointListener,
getRegistrationProperties());
+ bctx.registerService(ListenerHook.class, listenerHook, null);
+ bctx.registerService(FindHook.class, findHook, null);
}
public void stop() {
@@ -95,4 +113,19 @@ public class EndpointListenerManager {
serviceRegistration.setProperties(getRegistrationProperties());
}
}
+
+ @Override
+ public void addServiceInterest(String filter) {
+ if (importInterestsCounter.add(filter) == 1) {
+ extendScope(filter);
+ }
+ }
+
+ @Override
+ public void removeServiceInterest(String filter) {
+ if (importInterestsCounter.remove(filter) == 0) {
+ LOG.debug("last reference to import interest is gone -> removing interest filter: {}", filter);
+ reduceScope(filter);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/f9e4d403/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/MultiMap.java
----------------------------------------------------------------------
diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/MultiMap.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/MultiMap.java
index 5eda3c4..13597b5 100644
--- a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/MultiMap.java
+++ b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/MultiMap.java
@@ -20,8 +20,7 @@ package org.apache.aries.rsa.topologymanager.importer;
import java.util.Collections;
import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
+import java.util.HashSet;
import java.util.Map;
import java.util.Set;
@@ -30,27 +29,27 @@ import java.util.Set;
*/
public class MultiMap<T> {
- private Map<String, List<T>> map;
+ private Map<String, Set<T>> map;
public MultiMap() {
map = new HashMap<>();
}
public synchronized void put(String key, T value) {
- List<T> values = map.get(key);
+ Set<T> values = map.get(key);
if (values == null) {
- values = new LinkedList<>();
+ values = new HashSet<>();
map.put(key, values);
}
values.add(value);
}
- public synchronized List<T> get(String key) {
- return map.getOrDefault(key, Collections.<T>emptyList());
+ public synchronized Set<T> get(String key) {
+ return map.getOrDefault(key, Collections.<T>emptySet());
}
public synchronized void remove(String key, T value) {
- List<T> values = map.get(key);
+ Set<T> values = map.get(key);
values.remove(value);
if (values.isEmpty()) {
map.remove(key);
@@ -60,4 +59,11 @@ public class MultiMap<T> {
public synchronized Set<String> keySet() {
return map.keySet();
}
-}
\ No newline at end of file
+
+ public void remove(T toRemove) {
+ Set<String> keys = new HashSet<>(map.keySet());
+ for (String key : keys) {
+ remove(key, toRemove);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/f9e4d403/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java
----------------------------------------------------------------------
diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java
index bacdea0..52b0bdc 100644
--- a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java
+++ b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java
@@ -51,24 +51,14 @@ import org.slf4j.LoggerFactory;
* ServiceInterestListener interface.
* Manages local creation and destruction of service imports using the available RemoteServiceAdmin services.
*/
-public class TopologyManagerImport implements EndpointListener, RemoteServiceAdminListener, ServiceInterestListener {
+public class TopologyManagerImport implements EndpointListener, RemoteServiceAdminListener {
private static final Logger LOG = LoggerFactory.getLogger(TopologyManagerImport.class);
private ExecutorService execService;
- private final EndpointListenerManager endpointListenerManager;
private final BundleContext bctx;
private Set<RemoteServiceAdmin> rsaSet;
- private final ListenerHookImpl listenerHook;
- private RSFindHook findHook;
- /**
- * Contains an instance of the Class Import Interest for each distinct import request. If the same filter
- * is requested multiple times the existing instance of the Object increments an internal reference
- * counter. If an interest is removed, the related ServiceInterest object is used to reduce the reference
- * counter until it reaches zero. in this case the interest is removed.
- */
- private final ReferenceCounter<String> importInterestsCounter = new ReferenceCounter<String>();
/**
* List of Endpoints by matched filter that were reported by the EndpointListener and can be imported
@@ -85,58 +75,35 @@ public class TopologyManagerImport implements EndpointListener, RemoteServiceAdm
public TopologyManagerImport(BundleContext bc) {
this.rsaSet = new HashSet<RemoteServiceAdmin>();
bctx = bc;
- endpointListenerManager = new EndpointListenerManager(bctx, this);
execService = new ThreadPoolExecutor(5, 10, 50, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
- listenerHook = new ListenerHookImpl(bc, this);
- findHook = new RSFindHook(bc, this);
}
public void start() {
bctx.registerService(RemoteServiceAdminListener.class, this, null);
- bctx.registerService(ListenerHook.class, listenerHook, null);
- bctx.registerService(FindHook.class, findHook, null);
- endpointListenerManager.start();
}
public void stop() {
- endpointListenerManager.stop();
execService.shutdown();
- // this is called from Activator.stop(), which implicitly unregisters our registered services
- }
-
- @Override
- public void addServiceInterest(String filter) {
- if (importInterestsCounter.add(filter) == 1) {
- endpointListenerManager.extendScope(filter);
- }
- }
-
- @Override
- public void removeServiceInterest(String filter) {
- if (importInterestsCounter.remove(filter) == 0) {
- LOG.debug("last reference to import interest is gone -> removing interest filter: {}", filter);
- endpointListenerManager.reduceScope(filter);
- }
}
@Override
public void endpointAdded(EndpointDescription endpoint, String filter) {
LOG.debug("Endpoint added for filter {}, endpoint {}", filter, endpoint);
importPossibilities.put(filter, endpoint);
- triggerImport(filter);
+ triggerSyncImports(filter);
}
@Override
public void endpointRemoved(EndpointDescription endpoint, String filter) {
LOG.debug("Endpoint removed for filter {}, endpoint {}", filter, endpoint);
importPossibilities.remove(filter, endpoint);
- triggerImport(filter);
+ triggerSyncImports(filter);
}
public void add(RemoteServiceAdmin rsa) {
rsaSet.add(rsa);
for (String filter : importPossibilities.keySet()) {
- triggerImport(filter);
+ triggerSyncImports(filter);
}
}
@@ -147,24 +114,24 @@ public class TopologyManagerImport implements EndpointListener, RemoteServiceAdm
@Override
public void remoteAdminEvent(RemoteServiceAdminEvent event) {
if (event.getType() == RemoteServiceAdminEvent.IMPORT_UNREGISTRATION) {
- removeAndClose(event.getImportReference());
+ unImport(event.getImportReference());
}
}
- private void triggerImport(final String filter) {
+ private void triggerSyncImports(final String filter) {
LOG.debug("Import of a service for filter {} was queued", filter);
if (!rsaSet.isEmpty()) {
execService.execute(new Runnable() {
public void run() {
- doImport(filter);
+ syncImports(filter);
}
});
}
}
- private void doImport(final String filter) {
+ private void syncImports(final String filter) {
try {
- unexportNotAvailableServices(filter);
+ unImportForGoneEndpoints(filter);
importServices(filter);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
@@ -173,7 +140,7 @@ public class TopologyManagerImport implements EndpointListener, RemoteServiceAdm
}
private void importServices(String filter) {
- List<ImportRegistration> importRegistrations = importedServices.get(filter);
+ Set<ImportRegistration> importRegistrations = importedServices.get(filter);
for (EndpointDescription endpoint : importPossibilities.get(filter)) {
// TODO but optional: if the service is already imported and the endpoint is still
// in the list of possible imports check if a "better" endpoint is now in the list
@@ -187,12 +154,10 @@ public class TopologyManagerImport implements EndpointListener, RemoteServiceAdm
}
}
- private boolean alreadyImported(EndpointDescription endpoint, List<ImportRegistration> importRegistrations) {
- if (importRegistrations != null) {
- for (ImportRegistration ir : importRegistrations) {
- if (endpoint.equals(ir.getImportReference().getImportedEndpoint())) {
- return true;
- }
+ private boolean alreadyImported(EndpointDescription endpoint, Set<ImportRegistration> importRegistrations) {
+ for (ImportRegistration ir : importRegistrations) {
+ if (endpoint.equals(ir.getImportReference().getImportedEndpoint())) {
+ return true;
}
}
return false;
@@ -219,24 +184,24 @@ public class TopologyManagerImport implements EndpointListener, RemoteServiceAdm
return null;
}
- private void unexportNotAvailableServices(String filter) {
- List<ImportRegistration> importRegistrations = importedServices.get(filter);
- List<EndpointDescription> endpoints = importPossibilities.get(filter);
+ private void unImportForGoneEndpoints(String filter) {
+ Set<ImportRegistration> importRegistrations = importedServices.get(filter);
+ Set<EndpointDescription> endpoints = importPossibilities.get(filter);
for (ImportRegistration ir : importRegistrations) {
EndpointDescription endpoint = ir.getImportReference().getImportedEndpoint();
if (!endpoints.contains(endpoint)) {
- removeAndClose(ir.getImportReference());
+ unImport(ir.getImportReference());
}
}
}
- private void removeAndClose(ImportReference ref) {
+ private void unImport(ImportReference ref) {
List<ImportRegistration> removed = new ArrayList<ImportRegistration>();
- for (String key : importedServices.keySet()) {
+ HashSet<String> imported = new HashSet<>(importedServices.keySet());
+ for (String key : imported) {
for (ImportRegistration ir : importedServices.get(key)) {
if (ir.getImportReference().equals(ref)) {
removed.add(ir);
- importedServices.remove(key, ir);
}
}
}
@@ -245,6 +210,7 @@ public class TopologyManagerImport implements EndpointListener, RemoteServiceAdm
private void closeAll(List<ImportRegistration> removed) {
for (ImportRegistration ir : removed) {
+ importedServices.remove(ir);
ir.close();
}
}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/f9e4d403/topology-manager/src/test/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImportTest.java
----------------------------------------------------------------------
diff --git a/topology-manager/src/test/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImportTest.java b/topology-manager/src/test/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImportTest.java
index 919f733..8b8bbc2 100644
--- a/topology-manager/src/test/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImportTest.java
+++ b/topology-manager/src/test/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImportTest.java
@@ -79,13 +79,10 @@ public class TopologyManagerImportTest {
@SuppressWarnings({ "rawtypes", "unchecked" })
private BundleContext getBundleContext(IMocksControl c) {
ServiceRegistration sreg = c.createMock(ServiceRegistration.class);
- sreg.unregister();
- EasyMock.expectLastCall().once();
BundleContext bc = c.createMock(BundleContext.class);
EasyMock.expect(bc.registerService(EasyMock.anyObject(Class.class),
EasyMock.anyObject(),
(Dictionary)EasyMock.anyObject())).andReturn(sreg).anyTimes();
- EasyMock.expect(bc.getProperty(Constants.FRAMEWORK_UUID)).andReturn("myid").atLeastOnce();
return bc;
}
}