You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aries.apache.org by cs...@apache.org on 2018/01/30 11:15:23 UTC

[2/3] aries-rsa git commit: Move local service tracking to EndpointListenerManager

Move local service tracking to EndpointListenerManager


Project: http://git-wip-us.apache.org/repos/asf/aries-rsa/repo
Commit: http://git-wip-us.apache.org/repos/asf/aries-rsa/commit/f9e4d403
Tree: http://git-wip-us.apache.org/repos/asf/aries-rsa/tree/f9e4d403
Diff: http://git-wip-us.apache.org/repos/asf/aries-rsa/diff/f9e4d403

Branch: refs/heads/master
Commit: f9e4d4039d396f8aad1809d67f173e1c8a5dd128
Parents: b16d2b9
Author: Christian Schneider <cs...@adobe.com>
Authored: Tue Jan 30 12:06:22 2018 +0100
Committer: Christian Schneider <cs...@adobe.com>
Committed: Tue Jan 30 12:06:22 2018 +0100

----------------------------------------------------------------------
 .../aries/rsa/topologymanager/Activator.java    |  5 ++
 .../importer/EndpointListenerManager.java       | 39 +++++++++-
 .../rsa/topologymanager/importer/MultiMap.java  | 24 +++---
 .../importer/TopologyManagerImport.java         | 78 ++++++--------------
 .../importer/TopologyManagerImportTest.java     |  3 -
 5 files changed, 78 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/f9e4d403/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/Activator.java
----------------------------------------------------------------------
diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/Activator.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/Activator.java
index 02d9674..4fe0581 100644
--- a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/Activator.java
+++ b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/Activator.java
@@ -29,6 +29,7 @@ import org.apache.aries.rsa.topologymanager.exporter.DefaultExportPolicy;
 import org.apache.aries.rsa.topologymanager.exporter.EndpointListenerNotifier;
 import org.apache.aries.rsa.topologymanager.exporter.EndpointRepository;
 import org.apache.aries.rsa.topologymanager.exporter.TopologyManagerExport;
+import org.apache.aries.rsa.topologymanager.importer.EndpointListenerManager;
 import org.apache.aries.rsa.topologymanager.importer.TopologyManagerImport;
 import org.osgi.framework.BundleActivator;
 import org.osgi.framework.BundleContext;
@@ -57,6 +58,7 @@ public class Activator implements BundleActivator {
     private ThreadPoolExecutor exportExecutor;
     private ServiceTracker<EndpointListener, EndpointListener> epListenerTracker;
     private ServiceTracker<ExportPolicy, ExportPolicy> policyTracker;
+    private EndpointListenerManager endpointListenerManager;
 
     public void start(final BundleContext bc) throws Exception {
         Dictionary<String, String> props = new Hashtable<String, String>();
@@ -103,6 +105,8 @@ public class Activator implements BundleActivator {
         exportExecutor = new ThreadPoolExecutor(5, 10, 50, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
         exportManager = new TopologyManagerExport(endpointRepo, exportExecutor, policy);
         importManager = new TopologyManagerImport(bc);
+        endpointListenerManager = new EndpointListenerManager(bc, importManager);
+        endpointListenerManager.start();
         rsaTracker = new RSATracker(bc, RemoteServiceAdmin.class, null);
         bc.addServiceListener(exportManager);
         rsaTracker.open();
@@ -121,6 +125,7 @@ public class Activator implements BundleActivator {
         bc.removeServiceListener(exportManager);
         exportExecutor.shutdown();
         importManager.stop();
+        endpointListenerManager.stop();
         rsaTracker.close();
         exportManager = null;
     }

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/f9e4d403/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/EndpointListenerManager.java
----------------------------------------------------------------------
diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/EndpointListenerManager.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/EndpointListenerManager.java
index 1207f9f..87f0fbe 100644
--- a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/EndpointListenerManager.java
+++ b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/EndpointListenerManager.java
@@ -25,14 +25,21 @@ import java.util.List;
 
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.ServiceRegistration;
+import org.osgi.framework.hooks.service.FindHook;
+import org.osgi.framework.hooks.service.ListenerHook;
 import org.osgi.service.remoteserviceadmin.EndpointListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Manages an EndpointListener and adjusts its scope according to requested service filters.
+ * Manages the endpoint listener for the import of external services.
+ * The endpoint listener scope reflects the combined filters of all services 
+ * that are asked for (by listeners and service lookups) in the current system. 
+ * 
+ * Discovery will then send callbacks when external endpoints are added / removed that match
+ * the interest in the local system.
  */
-public class EndpointListenerManager {
+public class EndpointListenerManager implements ServiceInterestListener{
 
     private static final Logger LOG = LoggerFactory.getLogger(EndpointListenerManager.class);
 
@@ -40,15 +47,26 @@ public class EndpointListenerManager {
     private volatile ServiceRegistration<EndpointListener> serviceRegistration;
     private final List<String> filters = new ArrayList<String>();
     private final EndpointListener endpointListener;
+    private final ListenerHookImpl listenerHook;
+    private RSFindHook findHook;
+    
+    /**
+     * Count service interest by filter. This allows to modify the scope of the EndpointListener as seldom as possible
+     */
+    private final ReferenceCounter<String> importInterestsCounter = new ReferenceCounter<String>();
 
     public EndpointListenerManager(BundleContext bc, EndpointListener endpointListener) {
         this.bctx = bc;
         this.endpointListener = endpointListener;
+        this.listenerHook = new ListenerHookImpl(bc, this);
+        findHook = new RSFindHook(bc, this);
     }
 
-    protected void start() {
+    public void start() {
         serviceRegistration = bctx.registerService(EndpointListener.class, endpointListener,
                                                    getRegistrationProperties());
+        bctx.registerService(ListenerHook.class, listenerHook, null);
+        bctx.registerService(FindHook.class, findHook, null);
     }
 
     public void stop() {
@@ -95,4 +113,19 @@ public class EndpointListenerManager {
             serviceRegistration.setProperties(getRegistrationProperties());
         }
     }
+
+    @Override
+    public void addServiceInterest(String filter) {
+        if (importInterestsCounter.add(filter) == 1) {
+            extendScope(filter);
+        }
+    }
+
+    @Override
+    public void removeServiceInterest(String filter) {
+        if (importInterestsCounter.remove(filter) == 0) {
+            LOG.debug("last reference to import interest is gone -> removing interest filter: {}", filter);
+            reduceScope(filter);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/f9e4d403/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/MultiMap.java
----------------------------------------------------------------------
diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/MultiMap.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/MultiMap.java
index 5eda3c4..13597b5 100644
--- a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/MultiMap.java
+++ b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/MultiMap.java
@@ -20,8 +20,7 @@ package org.apache.aries.rsa.topologymanager.importer;
 
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
@@ -30,27 +29,27 @@ import java.util.Set;
  */
 public class MultiMap<T> {
 
-    private Map<String, List<T>> map;
+    private Map<String, Set<T>> map;
     
     public MultiMap() {
         map = new HashMap<>();
     }
     
     public synchronized void put(String key, T value) {
-        List<T> values = map.get(key);
+        Set<T> values = map.get(key);
         if (values == null) {
-            values = new LinkedList<>();
+            values = new HashSet<>();
             map.put(key, values);
         }
         values.add(value);
     }
     
-    public synchronized List<T> get(String key) {
-        return map.getOrDefault(key, Collections.<T>emptyList());
+    public synchronized Set<T> get(String key) {
+        return map.getOrDefault(key, Collections.<T>emptySet());
     }
 
     public synchronized void remove(String key, T value) {
-        List<T> values = map.get(key);
+        Set<T> values = map.get(key);
         values.remove(value);
         if (values.isEmpty()) {
             map.remove(key);
@@ -60,4 +59,11 @@ public class MultiMap<T> {
     public synchronized Set<String> keySet() {
         return map.keySet();
     }
-}
\ No newline at end of file
+
+    public void remove(T toRemove) {
+        Set<String> keys = new HashSet<>(map.keySet());
+        for (String key : keys) {
+            remove(key, toRemove);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/f9e4d403/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java
----------------------------------------------------------------------
diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java
index bacdea0..52b0bdc 100644
--- a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java
+++ b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java
@@ -51,24 +51,14 @@ import org.slf4j.LoggerFactory;
  * ServiceInterestListener interface.
  * Manages local creation and destruction of service imports using the available RemoteServiceAdmin services.
  */
-public class TopologyManagerImport implements EndpointListener, RemoteServiceAdminListener, ServiceInterestListener {
+public class TopologyManagerImport implements EndpointListener, RemoteServiceAdminListener {
 
     private static final Logger LOG = LoggerFactory.getLogger(TopologyManagerImport.class);
     private ExecutorService execService;
 
-    private final EndpointListenerManager endpointListenerManager;
     private final BundleContext bctx;
     private Set<RemoteServiceAdmin> rsaSet;
-    private final ListenerHookImpl listenerHook;
-    private RSFindHook findHook;
 
-    /**
-     * Contains an instance of the Class Import Interest for each distinct import request. If the same filter
-     * is requested multiple times the existing instance of the Object increments an internal reference
-     * counter. If an interest is removed, the related ServiceInterest object is used to reduce the reference
-     * counter until it reaches zero. in this case the interest is removed.
-     */
-    private final ReferenceCounter<String> importInterestsCounter = new ReferenceCounter<String>();
 
     /**
      * List of Endpoints by matched filter that were reported by the EndpointListener and can be imported
@@ -85,58 +75,35 @@ public class TopologyManagerImport implements EndpointListener, RemoteServiceAdm
     public TopologyManagerImport(BundleContext bc) {
         this.rsaSet = new HashSet<RemoteServiceAdmin>();
         bctx = bc;
-        endpointListenerManager = new EndpointListenerManager(bctx, this);
         execService = new ThreadPoolExecutor(5, 10, 50, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
-        listenerHook = new ListenerHookImpl(bc, this);
-        findHook = new RSFindHook(bc, this);
     }
     
     public void start() {
         bctx.registerService(RemoteServiceAdminListener.class, this, null);
-        bctx.registerService(ListenerHook.class, listenerHook, null);
-        bctx.registerService(FindHook.class, findHook, null);
-        endpointListenerManager.start();
     }
 
     public void stop() {
-        endpointListenerManager.stop();
         execService.shutdown();
-        // this is called from Activator.stop(), which implicitly unregisters our registered services
-    }
-
-    @Override
-    public void addServiceInterest(String filter) {
-        if (importInterestsCounter.add(filter) == 1) {
-            endpointListenerManager.extendScope(filter);
-        }
-    }
-
-    @Override
-    public void removeServiceInterest(String filter) {
-        if (importInterestsCounter.remove(filter) == 0) {
-            LOG.debug("last reference to import interest is gone -> removing interest filter: {}", filter);
-            endpointListenerManager.reduceScope(filter);
-        }
     }
 
     @Override
     public void endpointAdded(EndpointDescription endpoint, String filter) {
         LOG.debug("Endpoint added for filter {}, endpoint {}", filter, endpoint);
         importPossibilities.put(filter, endpoint);
-        triggerImport(filter);
+        triggerSyncImports(filter);
     }
 
     @Override
     public void endpointRemoved(EndpointDescription endpoint, String filter) {
         LOG.debug("Endpoint removed for filter {}, endpoint {}", filter, endpoint);
         importPossibilities.remove(filter, endpoint);
-        triggerImport(filter);
+        triggerSyncImports(filter);
     }
 
     public void add(RemoteServiceAdmin rsa) {
         rsaSet.add(rsa);
         for (String filter : importPossibilities.keySet()) {
-            triggerImport(filter);
+            triggerSyncImports(filter);
         }
     }
     
@@ -147,24 +114,24 @@ public class TopologyManagerImport implements EndpointListener, RemoteServiceAdm
     @Override
     public void remoteAdminEvent(RemoteServiceAdminEvent event) {
         if (event.getType() == RemoteServiceAdminEvent.IMPORT_UNREGISTRATION) {
-            removeAndClose(event.getImportReference());
+            unImport(event.getImportReference());
         }
     }
 
-    private void triggerImport(final String filter) {
+    private void triggerSyncImports(final String filter) {
         LOG.debug("Import of a service for filter {} was queued", filter);
         if (!rsaSet.isEmpty()) {
             execService.execute(new Runnable() {
                 public void run() {
-                    doImport(filter);
+                    syncImports(filter);
                 }
             });
         }
     }
     
-    private void doImport(final String filter) {
+    private void syncImports(final String filter) {
         try {
-            unexportNotAvailableServices(filter);
+            unImportForGoneEndpoints(filter);
             importServices(filter);
         } catch (Exception e) {
             LOG.error(e.getMessage(), e);
@@ -173,7 +140,7 @@ public class TopologyManagerImport implements EndpointListener, RemoteServiceAdm
     }
 
     private void importServices(String filter) {
-        List<ImportRegistration> importRegistrations = importedServices.get(filter);
+        Set<ImportRegistration> importRegistrations = importedServices.get(filter);
         for (EndpointDescription endpoint : importPossibilities.get(filter)) {
             // TODO but optional: if the service is already imported and the endpoint is still
             // in the list of possible imports check if a "better" endpoint is now in the list
@@ -187,12 +154,10 @@ public class TopologyManagerImport implements EndpointListener, RemoteServiceAdm
         }
     }
 
-    private boolean alreadyImported(EndpointDescription endpoint, List<ImportRegistration> importRegistrations) {
-        if (importRegistrations != null) {
-            for (ImportRegistration ir : importRegistrations) {
-                if (endpoint.equals(ir.getImportReference().getImportedEndpoint())) {
-                    return true;
-                }
+    private boolean alreadyImported(EndpointDescription endpoint, Set<ImportRegistration> importRegistrations) {
+        for (ImportRegistration ir : importRegistrations) {
+            if (endpoint.equals(ir.getImportReference().getImportedEndpoint())) {
+                return true;
             }
         }
         return false;
@@ -219,24 +184,24 @@ public class TopologyManagerImport implements EndpointListener, RemoteServiceAdm
         return null;
     }
 
-    private void unexportNotAvailableServices(String filter) {
-        List<ImportRegistration> importRegistrations = importedServices.get(filter);
-        List<EndpointDescription> endpoints = importPossibilities.get(filter);
+    private void unImportForGoneEndpoints(String filter) {
+        Set<ImportRegistration> importRegistrations = importedServices.get(filter);
+        Set<EndpointDescription> endpoints = importPossibilities.get(filter);
         for (ImportRegistration ir : importRegistrations) {
             EndpointDescription endpoint = ir.getImportReference().getImportedEndpoint();
             if (!endpoints.contains(endpoint)) {
-                removeAndClose(ir.getImportReference());
+                unImport(ir.getImportReference());
             }
         }
     }
 
-    private void removeAndClose(ImportReference ref) {
+    private void unImport(ImportReference ref) {
         List<ImportRegistration> removed = new ArrayList<ImportRegistration>();
-        for (String key : importedServices.keySet()) {
+        HashSet<String> imported = new HashSet<>(importedServices.keySet());
+        for (String key : imported) {
             for (ImportRegistration ir : importedServices.get(key)) {
                 if (ir.getImportReference().equals(ref)) {
                     removed.add(ir);
-                    importedServices.remove(key, ir);
                 }
             }
         }
@@ -245,6 +210,7 @@ public class TopologyManagerImport implements EndpointListener, RemoteServiceAdm
 
     private void closeAll(List<ImportRegistration> removed) {
         for (ImportRegistration ir : removed) {
+            importedServices.remove(ir);
             ir.close();
         }
     }

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/f9e4d403/topology-manager/src/test/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImportTest.java
----------------------------------------------------------------------
diff --git a/topology-manager/src/test/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImportTest.java b/topology-manager/src/test/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImportTest.java
index 919f733..8b8bbc2 100644
--- a/topology-manager/src/test/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImportTest.java
+++ b/topology-manager/src/test/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImportTest.java
@@ -79,13 +79,10 @@ public class TopologyManagerImportTest {
     @SuppressWarnings({ "rawtypes", "unchecked" })
     private BundleContext getBundleContext(IMocksControl c) {
         ServiceRegistration sreg = c.createMock(ServiceRegistration.class);
-        sreg.unregister();
-        EasyMock.expectLastCall().once();
         BundleContext bc = c.createMock(BundleContext.class);
         EasyMock.expect(bc.registerService(EasyMock.anyObject(Class.class),
                                            EasyMock.anyObject(),
                                            (Dictionary)EasyMock.anyObject())).andReturn(sreg).anyTimes();
-        EasyMock.expect(bc.getProperty(Constants.FRAMEWORK_UUID)).andReturn("myid").atLeastOnce();
         return bc;
     }
 }