You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aries.apache.org by cs...@apache.org on 2018/01/30 11:15:22 UTC
[1/3] aries-rsa git commit: Refactoring of TopologyManagerImport
Repository: aries-rsa
Updated Branches:
refs/heads/master 3ec2e269c -> 0fea532a9
Refactoring of TopologyManagerImport
Project: http://git-wip-us.apache.org/repos/asf/aries-rsa/repo
Commit: http://git-wip-us.apache.org/repos/asf/aries-rsa/commit/b16d2b9a
Tree: http://git-wip-us.apache.org/repos/asf/aries-rsa/tree/b16d2b9a
Diff: http://git-wip-us.apache.org/repos/asf/aries-rsa/diff/b16d2b9a
Branch: refs/heads/master
Commit: b16d2b9a1290cbe26a1299d491681a5cc769b650
Parents: 3ec2e26
Author: Christian Schneider <cs...@adobe.com>
Authored: Tue Jan 30 11:12:17 2018 +0100
Committer: Christian Schneider <cs...@adobe.com>
Committed: Tue Jan 30 11:12:17 2018 +0100
----------------------------------------------------------------------
.../rsa/topologymanager/importer/MultiMap.java | 63 ++++++
.../importer/TopologyManagerImport.java | 215 +++++--------------
.../importer/TopologyManagerImportTest.java | 47 ++--
3 files changed, 143 insertions(+), 182 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/b16d2b9a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/MultiMap.java
----------------------------------------------------------------------
diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/MultiMap.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/MultiMap.java
new file mode 100644
index 0000000..5eda3c4
--- /dev/null
+++ b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/MultiMap.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.topologymanager.importer;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Minimal implementation of a synchronized map
+ */
+public class MultiMap<T> {
+
+ private Map<String, List<T>> map;
+
+ public MultiMap() {
+ map = new HashMap<>();
+ }
+
+ public synchronized void put(String key, T value) {
+ List<T> values = map.get(key);
+ if (values == null) {
+ values = new LinkedList<>();
+ map.put(key, values);
+ }
+ values.add(value);
+ }
+
+ public synchronized List<T> get(String key) {
+ return map.getOrDefault(key, Collections.<T>emptyList());
+ }
+
+ public synchronized void remove(String key, T value) {
+ List<T> values = map.get(key);
+ values.remove(value);
+ if (values.isEmpty()) {
+ map.remove(key);
+ }
+ }
+
+ public synchronized Set<String> keySet() {
+ return map.keySet();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/b16d2b9a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java
----------------------------------------------------------------------
diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java
index 54a1c56..bacdea0 100644
--- a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java
+++ b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java
@@ -63,14 +63,6 @@ public class TopologyManagerImport implements EndpointListener, RemoteServiceAdm
private RSFindHook findHook;
/**
- * If set to false only one service is imported for each import interest even it multiple services are
- * available. If set to true, all available services are imported.
- *
- * TODO: Make this available as a configuration option
- */
- private boolean importAllAvailable = true;
-
- /**
* Contains an instance of the Class Import Interest for each distinct import request. If the same filter
* is requested multiple times the existing instance of the Object increments an internal reference
* counter. If an interest is removed, the related ServiceInterest object is used to reduce the reference
@@ -81,14 +73,14 @@ public class TopologyManagerImport implements EndpointListener, RemoteServiceAdm
/**
* List of Endpoints by matched filter that were reported by the EndpointListener and can be imported
*/
- private final Map<String /* filter */, List<EndpointDescription>> importPossibilities
- = new HashMap<String, List<EndpointDescription>>();
+ private final MultiMap<EndpointDescription> importPossibilities
+ = new MultiMap<EndpointDescription>();
/**
* List of already imported Endpoints by their matched filter
*/
- private final Map<String /* filter */, List<ImportRegistration>> importedServices
- = new HashMap<String, List<ImportRegistration>>();
+ private final MultiMap<ImportRegistration> importedServices
+ = new MultiMap<ImportRegistration>();
public TopologyManagerImport(BundleContext bc) {
this.rsaSet = new HashSet<RemoteServiceAdmin>();
@@ -112,119 +104,84 @@ public class TopologyManagerImport implements EndpointListener, RemoteServiceAdm
// this is called from Activator.stop(), which implicitly unregisters our registered services
}
- /* (non-Javadoc)
- * @see org.apache.cxf.dosgi.topologymanager.ServiceInterestListener#addServiceInterest(java.lang.String)
- */
+ @Override
public void addServiceInterest(String filter) {
if (importInterestsCounter.add(filter) == 1) {
endpointListenerManager.extendScope(filter);
}
}
- /* (non-Javadoc)
- * @see org.apache.cxf.dosgi.topologymanager.ServiceInterestListener#removeServiceInterest(java.lang.String)
- */
+ @Override
public void removeServiceInterest(String filter) {
if (importInterestsCounter.remove(filter) == 0) {
LOG.debug("last reference to import interest is gone -> removing interest filter: {}", filter);
endpointListenerManager.reduceScope(filter);
- List<ImportRegistration> irs = remove(filter, importedServices);
- if (irs != null) {
- for (ImportRegistration ir : irs) {
- ir.close();
- }
- }
}
}
+ @Override
public void endpointAdded(EndpointDescription endpoint, String filter) {
- if (filter == null) {
- LOG.error("Endpoint is not handled because no matching filter was provided!");
- return;
- }
- LOG.debug("importable service added for filter {}, endpoint {}", filter, endpoint);
- addImportPossibility(endpoint, filter);
+ LOG.debug("Endpoint added for filter {}, endpoint {}", filter, endpoint);
+ importPossibilities.put(filter, endpoint);
triggerImport(filter);
}
+ @Override
public void endpointRemoved(EndpointDescription endpoint, String filter) {
- LOG.debug("EndpointRemoved {}", endpoint);
- removeImportPossibility(endpoint, filter);
+ LOG.debug("Endpoint removed for filter {}, endpoint {}", filter, endpoint);
+ importPossibilities.remove(filter, endpoint);
triggerImport(filter);
}
- private void addImportPossibility(EndpointDescription endpoint, String filter) {
- put(filter, importPossibilities, endpoint);
- }
-
- private void removeImportPossibility(EndpointDescription endpoint, String filter) {
- List<EndpointDescription> endpoints = get(filter, importPossibilities);
- remove(filter, importPossibilities, endpoint);
- if (endpoints.isEmpty()) {
- remove(filter,importPossibilities,null);
- }
- }
-
public void add(RemoteServiceAdmin rsa) {
rsaSet.add(rsa);
-
- for (String filter : keySet(importPossibilities)) {
+ for (String filter : importPossibilities.keySet()) {
triggerImport(filter);
}
-
}
public void remove(RemoteServiceAdmin rsa) {
rsaSet.remove(rsa);
}
+ @Override
+ public void remoteAdminEvent(RemoteServiceAdminEvent event) {
+ if (event.getType() == RemoteServiceAdminEvent.IMPORT_UNREGISTRATION) {
+ removeAndClose(event.getImportReference());
+ }
+ }
private void triggerImport(final String filter) {
LOG.debug("Import of a service for filter {} was queued", filter);
-
- execService.execute(new Runnable() {
- public void run() {
- try {
- unexportNotAvailableServices(filter);
- importServices(filter);
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
+ if (!rsaSet.isEmpty()) {
+ execService.execute(new Runnable() {
+ public void run() {
+ doImport(filter);
}
- // Notify EndpointListeners? NO!
- }
- });
- }
-
- private void unexportNotAvailableServices(String filter) {
- List<ImportRegistration> importRegistrations = get(filter, importedServices);
- for (ImportRegistration ir : importRegistrations) {
- EndpointDescription endpoint = ir.getImportReference().getImportedEndpoint();
- if (!isImportPossibilityAvailable(endpoint, filter)) {
- removeImport(ir, null); // also unexports the service
- }
+ });
}
}
-
- private boolean isImportPossibilityAvailable(EndpointDescription endpoint, String filter) {
- List<EndpointDescription> endpoints = get(filter, importPossibilities);
- return endpoints != null && endpoints.contains(endpoint);
-
+
+ private void doImport(final String filter) {
+ try {
+ unexportNotAvailableServices(filter);
+ importServices(filter);
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
+ // Notify EndpointListeners? NO!
}
private void importServices(String filter) {
- List<ImportRegistration> importRegistrations = get(filter, importedServices);
- for (EndpointDescription endpoint : get(filter, importPossibilities)) {
+ List<ImportRegistration> importRegistrations = importedServices.get(filter);
+ for (EndpointDescription endpoint : importPossibilities.get(filter)) {
// TODO but optional: if the service is already imported and the endpoint is still
// in the list of possible imports check if a "better" endpoint is now in the list
if (!alreadyImported(endpoint, importRegistrations)) {
- // service not imported yet -> import it now
ImportRegistration ir = importService(endpoint);
if (ir != null) {
// import was successful
- put(filter, importedServices, ir);
- if (!importAllAvailable) {
- return;
- }
+ importedServices.put(filter, ir);
}
}
}
@@ -262,96 +219,34 @@ public class TopologyManagerImport implements EndpointListener, RemoteServiceAdm
return null;
}
- /**
- * Remove and close (unexport) the given import. The import is specified either
- * by its ImportRegistration or by its ImportReference (only one of them must
- * be specified).
- * <p>
- * If this method is called from within iterations on the underlying data structure,
- * the iterations must be made on copies of the structures rather than the original
- * references in order to prevent ConcurrentModificationExceptions.
- *
- * @param reg the import registration to remove
- * @param ref the import reference to remove
- */
- private void removeImport(ImportRegistration reg, ImportReference ref) {
- // this method may be called recursively by calling ImportRegistration.close()
- // and receiving a RemoteServiceAdminEvent for its unregistration, which results
- // in a ConcurrentModificationException. We avoid this by closing the registrations
- // only after data structure manipulation is done, and being re-entrant.
- List<ImportRegistration> removed = new ArrayList<ImportRegistration>();
- Set<Entry<String, List<ImportRegistration>>> entries = entrySet(importedServices);
- for (Entry<String, List<ImportRegistration>> entry : entries) {
- for (ImportRegistration ir : entry.getValue()) {
- if (ir.equals(reg) || ir.getImportReference().equals(ref)) {
- removed.add(ir);
- remove(entry.getKey(), importedServices, ir);
- }
- }
- }
- for (ImportRegistration ir : removed) {
- ir.close();
- }
- }
-
- public void remoteAdminEvent(RemoteServiceAdminEvent event) {
- if (event.getType() == RemoteServiceAdminEvent.IMPORT_UNREGISTRATION) {
- removeImport(null, event.getImportReference());
- }
- }
-
- private <T> void put(String key, Map<String, List<T>> map, T value) {
- synchronized (map) {
- List<T> list = map.get(key);
- if(list == null) {
- list = new CopyOnWriteArrayList<T>();
- map.put(key, list);
- }
- //make sure there is no duplicates
- if(!list.contains(value)) {
- list.add(value);
+ private void unexportNotAvailableServices(String filter) {
+ List<ImportRegistration> importRegistrations = importedServices.get(filter);
+ List<EndpointDescription> endpoints = importPossibilities.get(filter);
+ for (ImportRegistration ir : importRegistrations) {
+ EndpointDescription endpoint = ir.getImportReference().getImportedEndpoint();
+ if (!endpoints.contains(endpoint)) {
+ removeAndClose(ir.getImportReference());
}
}
}
- private <T> List<T> get(String key, Map<String, List<T>> map) {
- synchronized (map) {
- List<T> list = map.get(key);
- if(list == null)
- return Collections.emptyList();
- return list;
- }
- }
-
- private <T> List<T> remove(String key, Map<String, List<T>> map) {
- synchronized (map) {
- return map.remove(key);
- }
- }
-
- private <T> void remove(String key, Map<String, List<T>> map, T value) {
- synchronized (map) {
- List<T> list = map.get(key);
- if (list != null) {
- list.remove(value);
- if(list.isEmpty()) {
- map.remove(key);
+ private void removeAndClose(ImportReference ref) {
+ List<ImportRegistration> removed = new ArrayList<ImportRegistration>();
+ for (String key : importedServices.keySet()) {
+ for (ImportRegistration ir : importedServices.get(key)) {
+ if (ir.getImportReference().equals(ref)) {
+ removed.add(ir);
+ importedServices.remove(key, ir);
}
}
}
+ closeAll(removed);
}
- private <T> Set<Entry<String, List<T>>> entrySet(Map<String, List<T>> map) {
- synchronized (map) {
- Set<Entry<String, List<T>>> entries = map.entrySet();
- return new HashSet<Entry<String, List<T>>>(entries);
- }
- }
-
- private <T> Set<String> keySet(Map<String, List<T>> map) {
- synchronized (map) {
- Set<String> keySet = map.keySet();
- return new HashSet<String>(keySet);
+ private void closeAll(List<ImportRegistration> removed) {
+ for (ImportRegistration ir : removed) {
+ ir.close();
}
}
+
}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/b16d2b9a/topology-manager/src/test/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImportTest.java
----------------------------------------------------------------------
diff --git a/topology-manager/src/test/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImportTest.java b/topology-manager/src/test/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImportTest.java
index 07fb0ae..919f733 100644
--- a/topology-manager/src/test/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImportTest.java
+++ b/topology-manager/src/test/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImportTest.java
@@ -39,39 +39,20 @@ import static org.junit.Assert.assertTrue;
public class TopologyManagerImportTest {
- @SuppressWarnings({
- "rawtypes", "unchecked"
- })
@Test
public void testImportForNewlyAddedRSA() throws InterruptedException {
IMocksControl c = EasyMock.createControl();
-
c.makeThreadSafe(true);
-
final Semaphore sema = new Semaphore(0);
-
- ServiceRegistration sreg = c.createMock(ServiceRegistration.class);
- sreg.unregister();
- EasyMock.expectLastCall().once();
-
- BundleContext bc = c.createMock(BundleContext.class);
- EasyMock.expect(bc.registerService(EasyMock.anyObject(Class.class),
- EasyMock.anyObject(),
- (Dictionary)EasyMock.anyObject())).andReturn(sreg).anyTimes();
- EasyMock.expect(bc.getProperty(Constants.FRAMEWORK_UUID)).andReturn("myid").atLeastOnce();
+ BundleContext bc = getBundleContext(c);
EndpointDescription endpoint = c.createMock(EndpointDescription.class);
RemoteServiceAdmin rsa = c.createMock(RemoteServiceAdmin.class);
- final ImportRegistration ireg = c.createMock(ImportRegistration.class);
- EasyMock.expect(ireg.getException()).andReturn(null).anyTimes();
- ImportReference iref = c.createMock(ImportReference.class);
- EasyMock.expect(ireg.getImportReference()).andReturn(iref).anyTimes();
- EasyMock.expect(iref.getImportedEndpoint()).andReturn(endpoint).anyTimes();
-
+ final ImportRegistration ir = getRegistration(c, endpoint);
EasyMock.expect(rsa.importService(EasyMock.eq(endpoint))).andAnswer(new IAnswer<ImportRegistration>() {
public ImportRegistration answer() throws Throwable {
sema.release();
- return ireg;
+ return ir;
}
}).once();
c.replay();
@@ -85,4 +66,26 @@ public class TopologyManagerImportTest {
tm.stop();
c.verify();
}
+
+ private ImportRegistration getRegistration(IMocksControl c, EndpointDescription endpoint) {
+ final ImportRegistration ireg = c.createMock(ImportRegistration.class);
+ EasyMock.expect(ireg.getException()).andReturn(null).anyTimes();
+ ImportReference iref = c.createMock(ImportReference.class);
+ EasyMock.expect(ireg.getImportReference()).andReturn(iref).anyTimes();
+ EasyMock.expect(iref.getImportedEndpoint()).andReturn(endpoint).anyTimes();
+ return ireg;
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ private BundleContext getBundleContext(IMocksControl c) {
+ ServiceRegistration sreg = c.createMock(ServiceRegistration.class);
+ sreg.unregister();
+ EasyMock.expectLastCall().once();
+ BundleContext bc = c.createMock(BundleContext.class);
+ EasyMock.expect(bc.registerService(EasyMock.anyObject(Class.class),
+ EasyMock.anyObject(),
+ (Dictionary)EasyMock.anyObject())).andReturn(sreg).anyTimes();
+ EasyMock.expect(bc.getProperty(Constants.FRAMEWORK_UUID)).andReturn("myid").atLeastOnce();
+ return bc;
+ }
}
[3/3] aries-rsa git commit: Move EndpointListenerManager and
dependent classes to its own package
Posted by cs...@apache.org.
Move EndpointListenerManager and dependent classes to its own package
Project: http://git-wip-us.apache.org/repos/asf/aries-rsa/repo
Commit: http://git-wip-us.apache.org/repos/asf/aries-rsa/commit/0fea532a
Tree: http://git-wip-us.apache.org/repos/asf/aries-rsa/tree/0fea532a
Diff: http://git-wip-us.apache.org/repos/asf/aries-rsa/diff/0fea532a
Branch: refs/heads/master
Commit: 0fea532a995d2ce1e5d41b900a641a9e2b8fc05d
Parents: f9e4d40
Author: Christian Schneider <cs...@adobe.com>
Authored: Tue Jan 30 12:15:12 2018 +0100
Committer: Christian Schneider <cs...@adobe.com>
Committed: Tue Jan 30 12:15:12 2018 +0100
----------------------------------------------------------------------
.../aries/rsa/topologymanager/Activator.java | 2 +-
.../importer/EndpointListenerManager.java | 131 -------------------
.../topologymanager/importer/FilterHelper.java | 77 -----------
.../importer/ListenerHookImpl.java | 91 -------------
.../topologymanager/importer/RSATracker.java | 26 ----
.../topologymanager/importer/RSFindHook.java | 70 ----------
.../importer/ReferenceCounter.java | 76 -----------
.../importer/ServiceInterestListener.java | 26 ----
.../importer/TopologyManagerImport.java | 5 +-
.../importer/local/EndpointListenerManager.java | 131 +++++++++++++++++++
.../importer/local/FilterHelper.java | 77 +++++++++++
.../importer/local/ListenerHookImpl.java | 91 +++++++++++++
.../importer/local/RSFindHook.java | 70 ++++++++++
.../topologymanager/importer/local/Readme.md | 3 +
.../importer/local/ReferenceCounter.java | 76 +++++++++++
.../importer/local/ServiceInterestListener.java | 26 ++++
.../importer/EndpointListenerImplTest.java | 113 ----------------
.../importer/FilterHelperTest.java | 54 --------
.../importer/ListenerHookImplTest.java | 91 -------------
.../importer/ReferenceCounterTest.java | 44 -------
.../local/EndpointListenerImplTest.java | 113 ++++++++++++++++
.../importer/local/FilterHelperTest.java | 55 ++++++++
.../importer/local/ListenerHookImplTest.java | 91 +++++++++++++
.../importer/local/ReferenceCounterTest.java | 44 +++++++
24 files changed, 780 insertions(+), 803 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/0fea532a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/Activator.java
----------------------------------------------------------------------
diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/Activator.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/Activator.java
index 4fe0581..97378a8 100644
--- a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/Activator.java
+++ b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/Activator.java
@@ -29,8 +29,8 @@ import org.apache.aries.rsa.topologymanager.exporter.DefaultExportPolicy;
import org.apache.aries.rsa.topologymanager.exporter.EndpointListenerNotifier;
import org.apache.aries.rsa.topologymanager.exporter.EndpointRepository;
import org.apache.aries.rsa.topologymanager.exporter.TopologyManagerExport;
-import org.apache.aries.rsa.topologymanager.importer.EndpointListenerManager;
import org.apache.aries.rsa.topologymanager.importer.TopologyManagerImport;
+import org.apache.aries.rsa.topologymanager.importer.local.EndpointListenerManager;
import org.osgi.framework.BundleActivator;
import org.osgi.framework.BundleContext;
import org.osgi.framework.Filter;
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/0fea532a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/EndpointListenerManager.java
----------------------------------------------------------------------
diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/EndpointListenerManager.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/EndpointListenerManager.java
deleted file mode 100644
index 87f0fbe..0000000
--- a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/EndpointListenerManager.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.aries.rsa.topologymanager.importer;
-
-import java.util.ArrayList;
-import java.util.Dictionary;
-import java.util.Hashtable;
-import java.util.List;
-
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.ServiceRegistration;
-import org.osgi.framework.hooks.service.FindHook;
-import org.osgi.framework.hooks.service.ListenerHook;
-import org.osgi.service.remoteserviceadmin.EndpointListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Manages the endpoint listener for the import of external services.
- * The endpoint listener scope reflects the combined filters of all services
- * that are asked for (by listeners and service lookups) in the current system.
- *
- * Discovery will then send callbacks when external endpoints are added / removed that match
- * the interest in the local system.
- */
-public class EndpointListenerManager implements ServiceInterestListener{
-
- private static final Logger LOG = LoggerFactory.getLogger(EndpointListenerManager.class);
-
- private final BundleContext bctx;
- private volatile ServiceRegistration<EndpointListener> serviceRegistration;
- private final List<String> filters = new ArrayList<String>();
- private final EndpointListener endpointListener;
- private final ListenerHookImpl listenerHook;
- private RSFindHook findHook;
-
- /**
- * Count service interest by filter. This allows to modify the scope of the EndpointListener as seldom as possible
- */
- private final ReferenceCounter<String> importInterestsCounter = new ReferenceCounter<String>();
-
- public EndpointListenerManager(BundleContext bc, EndpointListener endpointListener) {
- this.bctx = bc;
- this.endpointListener = endpointListener;
- this.listenerHook = new ListenerHookImpl(bc, this);
- findHook = new RSFindHook(bc, this);
- }
-
- public void start() {
- serviceRegistration = bctx.registerService(EndpointListener.class, endpointListener,
- getRegistrationProperties());
- bctx.registerService(ListenerHook.class, listenerHook, null);
- bctx.registerService(FindHook.class, findHook, null);
- }
-
- public void stop() {
- if (serviceRegistration != null) {
- serviceRegistration.unregister();
- }
- }
-
- protected void extendScope(String filter) {
- if (filter == null) {
- return;
- }
- LOG.debug("EndpointListener: extending scope by {}", filter);
- synchronized (filters) {
- filters.add(filter);
- }
- updateRegistration();
- }
-
- protected void reduceScope(String filter) {
- if (filter == null) {
- return;
- }
- LOG.debug("EndpointListener: reducing scope by {}", filter);
- synchronized (filters) {
- filters.remove(filter);
- }
- updateRegistration();
- }
-
- private Dictionary<String, Object> getRegistrationProperties() {
- Dictionary<String, Object> p = new Hashtable<String, Object>();
-
- synchronized (filters) {
- LOG.debug("Current filter: {}", filters);
- p.put(EndpointListener.ENDPOINT_LISTENER_SCOPE, new ArrayList<String>(filters));
- }
-
- return p;
- }
-
- private void updateRegistration() {
- if (serviceRegistration != null) {
- serviceRegistration.setProperties(getRegistrationProperties());
- }
- }
-
- @Override
- public void addServiceInterest(String filter) {
- if (importInterestsCounter.add(filter) == 1) {
- extendScope(filter);
- }
- }
-
- @Override
- public void removeServiceInterest(String filter) {
- if (importInterestsCounter.remove(filter) == 0) {
- LOG.debug("last reference to import interest is gone -> removing interest filter: {}", filter);
- reduceScope(filter);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/0fea532a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/FilterHelper.java
----------------------------------------------------------------------
diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/FilterHelper.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/FilterHelper.java
deleted file mode 100644
index 98cb94d..0000000
--- a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/FilterHelper.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.aries.rsa.topologymanager.importer;
-
-import java.util.HashSet;
-import java.util.Set;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import org.osgi.framework.Constants;
-
-public final class FilterHelper {
- private static final String OBJECTCLASS_EXPRESSION = ".*\\(" + Constants.OBJECTCLASS + "=([^)]+)\\).*";
- private static final Pattern OBJECTCLASS_PATTERN = Pattern.compile(OBJECTCLASS_EXPRESSION);
-
- private FilterHelper() {
- // prevent instantiation
- }
-
- public static String getObjectClass(String filter) {
- if (filter != null) {
- Matcher matcher = OBJECTCLASS_PATTERN.matcher(filter);
- if (matcher.matches() && matcher.groupCount() >= 1) {
- return matcher.group(1);
- }
- }
- return null;
- }
-
- private static final Set<String> SYSTEM_PACKAGES;
- static {
- SYSTEM_PACKAGES = new HashSet<String>();
- SYSTEM_PACKAGES.add("org.osgi.service");
- SYSTEM_PACKAGES.add("org.apache.felix");
- SYSTEM_PACKAGES.add("org.ops4j.pax.logging");
- SYSTEM_PACKAGES.add("ch.ethz.iks.slp");
- SYSTEM_PACKAGES.add("org.ungoverned.osgi.service");
- SYSTEM_PACKAGES.add("org.springframework.osgi.context.event.OsgiBundleApplicationContextListener");
- SYSTEM_PACKAGES.add("java.net.ContentHandler");
- }
-
- public static boolean isClassExcluded(String className) {
- if (className == null) {
- return true;
- }
-
- for (String p : SYSTEM_PACKAGES) {
- if (className.startsWith(p)) {
- return true;
- }
- }
- return false;
- }
-
- public static String getFullFilter(String objectClass, String filter) {
- if (objectClass == null) {
- return filter;
- }
- String nameFilter = String.format("(objectClass=%s)", objectClass);
- return (filter == null) ? nameFilter : String.format("(&%s%s)", nameFilter, filter);
- }
-}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/0fea532a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/ListenerHookImpl.java
----------------------------------------------------------------------
diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/ListenerHookImpl.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/ListenerHookImpl.java
deleted file mode 100644
index 1ca1d19..0000000
--- a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/ListenerHookImpl.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.aries.rsa.topologymanager.importer;
-
-import java.util.Collection;
-
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.Constants;
-import org.osgi.framework.hooks.service.ListenerHook;
-import org.osgi.service.remoteserviceadmin.RemoteConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Listens for service listeners and informs ServiceInterestListener about added and removed interest
- * in services
- */
-public class ListenerHookImpl implements ListenerHook {
-
- private static final Logger LOG = LoggerFactory.getLogger(ListenerHookImpl.class);
-
- private final BundleContext bctx;
- private final ServiceInterestListener serviceInterestListener;
- private final String frameworkUUID;
-
- public ListenerHookImpl(BundleContext bc, ServiceInterestListener serviceInterestListener) {
- this.bctx = bc;
- this.frameworkUUID = bctx.getProperty(Constants.FRAMEWORK_UUID);
- this.serviceInterestListener = serviceInterestListener;
- }
-
- @Override
- public void added(Collection<ListenerInfo> listeners) {
- LOG.debug("added listeners {}", listeners);
- for (ListenerInfo listenerInfo : listeners) {
- LOG.debug("Filter {}", listenerInfo.getFilter());
-
- String className = FilterHelper.getObjectClass(listenerInfo.getFilter());
-
- if (listenerInfo.getBundleContext().equals(bctx)) {
- LOG.debug("ListenerHookImpl: skipping request from myself");
- continue;
- }
-
- if (listenerInfo.getFilter() == null) {
- LOG.debug("skipping empty filter");
- continue;
- }
-
- if (FilterHelper.isClassExcluded(className)) {
- LOG.debug("Skipping import request for excluded class [{}]", className);
- continue;
- }
- String exFilter = extendFilter(listenerInfo.getFilter());
- serviceInterestListener.addServiceInterest(exFilter);
- }
- }
-
- @Override
- public void removed(Collection<ListenerInfo> listeners) {
- LOG.debug("removed listeners {}", listeners);
-
- for (ListenerInfo listenerInfo : listeners) {
- LOG.debug("Filter {}", listenerInfo.getFilter());
-
- // TODO: determine if service was handled?
- String exFilter = extendFilter(listenerInfo.getFilter());
- serviceInterestListener.removeServiceInterest(exFilter);
- }
- }
-
- String extendFilter(String filter) {
- return "(&" + filter + "(!(" + RemoteConstants.ENDPOINT_FRAMEWORK_UUID + "=" + frameworkUUID + ")))";
- }
-}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/0fea532a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/RSATracker.java
----------------------------------------------------------------------
diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/RSATracker.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/RSATracker.java
deleted file mode 100644
index 4aa648f..0000000
--- a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/RSATracker.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.aries.rsa.topologymanager.importer;
-
-import org.osgi.service.remoteserviceadmin.RemoteServiceAdmin;
-
-public interface RSATracker {
- void added(RemoteServiceAdmin rsa);
- void removed(RemoteServiceAdmin rsa);
-}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/0fea532a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/RSFindHook.java
----------------------------------------------------------------------
diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/RSFindHook.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/RSFindHook.java
deleted file mode 100644
index 722cee2..0000000
--- a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/RSFindHook.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.aries.rsa.topologymanager.importer;
-
-import java.util.Collection;
-
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.Constants;
-import org.osgi.framework.ServiceReference;
-import org.osgi.framework.hooks.service.FindHook;
-import org.osgi.service.remoteserviceadmin.RemoteConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class RSFindHook implements FindHook {
- private static final Logger LOG = LoggerFactory.getLogger(RSFindHook.class);
-
- private BundleContext bctx;
- private String frameworkUUID;
- private ServiceInterestListener serviceInterestListener;
-
- public RSFindHook(BundleContext bc, ServiceInterestListener serviceInterestListener) {
- this.bctx = bc;
- this.frameworkUUID = bctx.getProperty(Constants.FRAMEWORK_UUID);
- this.serviceInterestListener = serviceInterestListener;
- }
-
- @Override
- public void find(BundleContext context, String name, String filter, boolean allServices,
- Collection<ServiceReference<?>> references) {
- if (context.equals(bctx)) {
- LOG.debug("ListenerHookImpl: skipping request from myself");
- return;
- }
-
- String fullFilter = FilterHelper.getFullFilter(name, filter);
-
- if (fullFilter == null) {
- LOG.debug("skipping empty filter");
- return;
- }
- String className = name != null ? name : FilterHelper.getObjectClass(fullFilter);
- if (FilterHelper.isClassExcluded(className)) {
- LOG.debug("Skipping import request for excluded class [{}]", className);
- return;
- }
- String exFilter = extendFilter(fullFilter);
- serviceInterestListener.addServiceInterest(exFilter);
- }
-
- String extendFilter(String filter) {
- return "(&" + filter + "(!(" + RemoteConstants.ENDPOINT_FRAMEWORK_UUID + "=" + frameworkUUID + ")))";
- }
-}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/0fea532a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/ReferenceCounter.java
----------------------------------------------------------------------
diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/ReferenceCounter.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/ReferenceCounter.java
deleted file mode 100644
index 71e796c..0000000
--- a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/ReferenceCounter.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.aries.rsa.topologymanager.importer;
-
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-/**
- * Manages a reference count per key.
- *
- * @param <K> the key type
- */
-public class ReferenceCounter<K> {
-
- private final ConcurrentMap<K, Integer> counts = new ConcurrentHashMap<K, Integer>();
-
- /**
- * Increases the reference count for the given key,
- * or sets it to 1 if the key has no existing count.
- *
- * @param key a key
- * @return the updated reference count
- */
- public int add(K key) {
- while (true) {
- Integer count = counts.get(key);
- if (count == null) {
- if (counts.putIfAbsent(key, 1) == null) {
- return 1;
- }
- } else if (counts.replace(key, count, count + 1)) {
- return count + 1;
- }
- }
- }
-
- /**
- * Decreases the reference count for the given key,
- * and removes it if it reaches 0.
- * If the key has no existing count, -1 is returned.
- *
- * @param key a key
- * @return the updated reference count, or -1 if the key has no existing count
- */
- public int remove(K key) {
- while (true) {
- Integer count = counts.get(key);
- if (count == null) {
- return -1;
- }
- if (count == 1) {
- if (counts.remove(key, 1)) {
- return 0;
- }
- } else if (counts.replace(key, count, count - 1)) {
- return count - 1;
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/0fea532a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/ServiceInterestListener.java
----------------------------------------------------------------------
diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/ServiceInterestListener.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/ServiceInterestListener.java
deleted file mode 100644
index 9e7b70c..0000000
--- a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/ServiceInterestListener.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.aries.rsa.topologymanager.importer;
-
-public interface ServiceInterestListener {
-
- void addServiceInterest(String filter);
-
- void removeServiceInterest(String filter);
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/0fea532a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java
----------------------------------------------------------------------
diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java
index 52b0bdc..e6ab859 100644
--- a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java
+++ b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java
@@ -46,9 +46,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Listens for remote endpoints using the EndpointListener interface and the EndpointListenerManager.
- * Listens for local service interests using the ListenerHookImpl that calls back through the
- * ServiceInterestListener interface.
+ * Listens for remote endpoints using the EndpointListener. The scope of this listener is managed by
+ * the EndpointListenerManager.
* Manages local creation and destruction of service imports using the available RemoteServiceAdmin services.
*/
public class TopologyManagerImport implements EndpointListener, RemoteServiceAdminListener {
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/0fea532a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/local/EndpointListenerManager.java
----------------------------------------------------------------------
diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/local/EndpointListenerManager.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/local/EndpointListenerManager.java
new file mode 100644
index 0000000..93208f4
--- /dev/null
+++ b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/local/EndpointListenerManager.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.topologymanager.importer.local;
+
+import java.util.ArrayList;
+import java.util.Dictionary;
+import java.util.Hashtable;
+import java.util.List;
+
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.framework.hooks.service.FindHook;
+import org.osgi.framework.hooks.service.ListenerHook;
+import org.osgi.service.remoteserviceadmin.EndpointListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manages the endpoint listener for the import of external services.
+ * The endpoint listener scope reflects the combined filters of all services
+ * that are asked for (by listeners and service lookups) in the current system.
+ *
+ * Discovery will then send callbacks when external endpoints are added / removed that match
+ * the interest in the local system.
+ */
+public class EndpointListenerManager implements ServiceInterestListener{
+
+ private static final Logger LOG = LoggerFactory.getLogger(EndpointListenerManager.class);
+
+ private final BundleContext bctx;
+ private volatile ServiceRegistration<EndpointListener> serviceRegistration;
+ private final List<String> filters = new ArrayList<String>();
+ private final EndpointListener endpointListener;
+ private final ListenerHookImpl listenerHook;
+ private RSFindHook findHook;
+
+ /**
+ * Count service interest by filter. This allows to modify the scope of the EndpointListener as seldom as possible
+ */
+ private final ReferenceCounter<String> importInterestsCounter = new ReferenceCounter<String>();
+
+ public EndpointListenerManager(BundleContext bc, EndpointListener endpointListener) {
+ this.bctx = bc;
+ this.endpointListener = endpointListener;
+ this.listenerHook = new ListenerHookImpl(bc, this);
+ findHook = new RSFindHook(bc, this);
+ }
+
+ public void start() {
+ serviceRegistration = bctx.registerService(EndpointListener.class, endpointListener,
+ getRegistrationProperties());
+ bctx.registerService(ListenerHook.class, listenerHook, null);
+ bctx.registerService(FindHook.class, findHook, null);
+ }
+
+ public void stop() {
+ if (serviceRegistration != null) {
+ serviceRegistration.unregister();
+ }
+ }
+
+ protected void extendScope(String filter) {
+ if (filter == null) {
+ return;
+ }
+ LOG.debug("EndpointListener: extending scope by {}", filter);
+ synchronized (filters) {
+ filters.add(filter);
+ }
+ updateRegistration();
+ }
+
+ protected void reduceScope(String filter) {
+ if (filter == null) {
+ return;
+ }
+ LOG.debug("EndpointListener: reducing scope by {}", filter);
+ synchronized (filters) {
+ filters.remove(filter);
+ }
+ updateRegistration();
+ }
+
+ private Dictionary<String, Object> getRegistrationProperties() {
+ Dictionary<String, Object> p = new Hashtable<String, Object>();
+
+ synchronized (filters) {
+ LOG.debug("Current filter: {}", filters);
+ p.put(EndpointListener.ENDPOINT_LISTENER_SCOPE, new ArrayList<String>(filters));
+ }
+
+ return p;
+ }
+
+ private void updateRegistration() {
+ if (serviceRegistration != null) {
+ serviceRegistration.setProperties(getRegistrationProperties());
+ }
+ }
+
+ @Override
+ public void addServiceInterest(String filter) {
+ if (importInterestsCounter.add(filter) == 1) {
+ extendScope(filter);
+ }
+ }
+
+ @Override
+ public void removeServiceInterest(String filter) {
+ if (importInterestsCounter.remove(filter) == 0) {
+ LOG.debug("last reference to import interest is gone -> removing interest filter: {}", filter);
+ reduceScope(filter);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/0fea532a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/local/FilterHelper.java
----------------------------------------------------------------------
diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/local/FilterHelper.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/local/FilterHelper.java
new file mode 100644
index 0000000..60fad01
--- /dev/null
+++ b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/local/FilterHelper.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.topologymanager.importer.local;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.osgi.framework.Constants;
+
+public final class FilterHelper {
+ private static final String OBJECTCLASS_EXPRESSION = ".*\\(" + Constants.OBJECTCLASS + "=([^)]+)\\).*";
+ private static final Pattern OBJECTCLASS_PATTERN = Pattern.compile(OBJECTCLASS_EXPRESSION);
+
+ private FilterHelper() {
+ // prevent instantiation
+ }
+
+ public static String getObjectClass(String filter) {
+ if (filter != null) {
+ Matcher matcher = OBJECTCLASS_PATTERN.matcher(filter);
+ if (matcher.matches() && matcher.groupCount() >= 1) {
+ return matcher.group(1);
+ }
+ }
+ return null;
+ }
+
+ private static final Set<String> SYSTEM_PACKAGES;
+ static {
+ SYSTEM_PACKAGES = new HashSet<String>();
+ SYSTEM_PACKAGES.add("org.osgi.service");
+ SYSTEM_PACKAGES.add("org.apache.felix");
+ SYSTEM_PACKAGES.add("org.ops4j.pax.logging");
+ SYSTEM_PACKAGES.add("ch.ethz.iks.slp");
+ SYSTEM_PACKAGES.add("org.ungoverned.osgi.service");
+ SYSTEM_PACKAGES.add("org.springframework.osgi.context.event.OsgiBundleApplicationContextListener");
+ SYSTEM_PACKAGES.add("java.net.ContentHandler");
+ }
+
+ public static boolean isClassExcluded(String className) {
+ if (className == null) {
+ return true;
+ }
+
+ for (String p : SYSTEM_PACKAGES) {
+ if (className.startsWith(p)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public static String getFullFilter(String objectClass, String filter) {
+ if (objectClass == null) {
+ return filter;
+ }
+ String nameFilter = String.format("(objectClass=%s)", objectClass);
+ return (filter == null) ? nameFilter : String.format("(&%s%s)", nameFilter, filter);
+ }
+}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/0fea532a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/local/ListenerHookImpl.java
----------------------------------------------------------------------
diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/local/ListenerHookImpl.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/local/ListenerHookImpl.java
new file mode 100644
index 0000000..0543e3a
--- /dev/null
+++ b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/local/ListenerHookImpl.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.topologymanager.importer.local;
+
+import java.util.Collection;
+
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.Constants;
+import org.osgi.framework.hooks.service.ListenerHook;
+import org.osgi.service.remoteserviceadmin.RemoteConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Listens for service listeners and informs ServiceInterestListener about added and removed interest
+ * in services
+ */
+public class ListenerHookImpl implements ListenerHook {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ListenerHookImpl.class);
+
+ private final BundleContext bctx;
+ private final ServiceInterestListener serviceInterestListener;
+ private final String frameworkUUID;
+
+ public ListenerHookImpl(BundleContext bc, ServiceInterestListener serviceInterestListener) {
+ this.bctx = bc;
+ this.frameworkUUID = bctx.getProperty(Constants.FRAMEWORK_UUID);
+ this.serviceInterestListener = serviceInterestListener;
+ }
+
+ @Override
+ public void added(Collection<ListenerInfo> listeners) {
+ LOG.debug("added listeners {}", listeners);
+ for (ListenerInfo listenerInfo : listeners) {
+ LOG.debug("Filter {}", listenerInfo.getFilter());
+
+ String className = FilterHelper.getObjectClass(listenerInfo.getFilter());
+
+ if (listenerInfo.getBundleContext().equals(bctx)) {
+ LOG.debug("ListenerHookImpl: skipping request from myself");
+ continue;
+ }
+
+ if (listenerInfo.getFilter() == null) {
+ LOG.debug("skipping empty filter");
+ continue;
+ }
+
+ if (FilterHelper.isClassExcluded(className)) {
+ LOG.debug("Skipping import request for excluded class [{}]", className);
+ continue;
+ }
+ String exFilter = extendFilter(listenerInfo.getFilter());
+ serviceInterestListener.addServiceInterest(exFilter);
+ }
+ }
+
+ @Override
+ public void removed(Collection<ListenerInfo> listeners) {
+ LOG.debug("removed listeners {}", listeners);
+
+ for (ListenerInfo listenerInfo : listeners) {
+ LOG.debug("Filter {}", listenerInfo.getFilter());
+
+ // TODO: determine if service was handled?
+ String exFilter = extendFilter(listenerInfo.getFilter());
+ serviceInterestListener.removeServiceInterest(exFilter);
+ }
+ }
+
+ String extendFilter(String filter) {
+ return "(&" + filter + "(!(" + RemoteConstants.ENDPOINT_FRAMEWORK_UUID + "=" + frameworkUUID + ")))";
+ }
+}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/0fea532a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/local/RSFindHook.java
----------------------------------------------------------------------
diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/local/RSFindHook.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/local/RSFindHook.java
new file mode 100644
index 0000000..1d73ad7
--- /dev/null
+++ b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/local/RSFindHook.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.topologymanager.importer.local;
+
+import java.util.Collection;
+
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.Constants;
+import org.osgi.framework.ServiceReference;
+import org.osgi.framework.hooks.service.FindHook;
+import org.osgi.service.remoteserviceadmin.RemoteConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RSFindHook implements FindHook {
+ private static final Logger LOG = LoggerFactory.getLogger(RSFindHook.class);
+
+ private BundleContext bctx;
+ private String frameworkUUID;
+ private ServiceInterestListener serviceInterestListener;
+
+ public RSFindHook(BundleContext bc, ServiceInterestListener serviceInterestListener) {
+ this.bctx = bc;
+ this.frameworkUUID = bctx.getProperty(Constants.FRAMEWORK_UUID);
+ this.serviceInterestListener = serviceInterestListener;
+ }
+
+ @Override
+ public void find(BundleContext context, String name, String filter, boolean allServices,
+ Collection<ServiceReference<?>> references) {
+ if (context.equals(bctx)) {
+ LOG.debug("ListenerHookImpl: skipping request from myself");
+ return;
+ }
+
+ String fullFilter = FilterHelper.getFullFilter(name, filter);
+
+ if (fullFilter == null) {
+ LOG.debug("skipping empty filter");
+ return;
+ }
+ String className = name != null ? name : FilterHelper.getObjectClass(fullFilter);
+ if (FilterHelper.isClassExcluded(className)) {
+ LOG.debug("Skipping import request for excluded class [{}]", className);
+ return;
+ }
+ String exFilter = extendFilter(fullFilter);
+ serviceInterestListener.addServiceInterest(exFilter);
+ }
+
+ String extendFilter(String filter) {
+ return "(&" + filter + "(!(" + RemoteConstants.ENDPOINT_FRAMEWORK_UUID + "=" + frameworkUUID + ")))";
+ }
+}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/0fea532a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/local/Readme.md
----------------------------------------------------------------------
diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/local/Readme.md b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/local/Readme.md
new file mode 100644
index 0000000..0d02d57
--- /dev/null
+++ b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/local/Readme.md
@@ -0,0 +1,3 @@
+Takes care of tracking local demand for services by using a ListenerHook and FindHook.
+Manages the scope of the EndpointLister that is used for importing external services so it matches the local service
+demand.
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/0fea532a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/local/ReferenceCounter.java
----------------------------------------------------------------------
diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/local/ReferenceCounter.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/local/ReferenceCounter.java
new file mode 100644
index 0000000..a224b8b
--- /dev/null
+++ b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/local/ReferenceCounter.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.topologymanager.importer.local;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * Manages a reference count per key.
+ *
+ * @param <K> the key type
+ */
+public class ReferenceCounter<K> {
+
+ private final ConcurrentMap<K, Integer> counts = new ConcurrentHashMap<K, Integer>();
+
+ /**
+ * Increases the reference count for the given key,
+ * or sets it to 1 if the key has no existing count.
+ *
+ * @param key a key
+ * @return the updated reference count
+ */
+ public int add(K key) {
+ while (true) {
+ Integer count = counts.get(key);
+ if (count == null) {
+ if (counts.putIfAbsent(key, 1) == null) {
+ return 1;
+ }
+ } else if (counts.replace(key, count, count + 1)) {
+ return count + 1;
+ }
+ }
+ }
+
+ /**
+ * Decreases the reference count for the given key,
+ * and removes it if it reaches 0.
+ * If the key has no existing count, -1 is returned.
+ *
+ * @param key a key
+ * @return the updated reference count, or -1 if the key has no existing count
+ */
+ public int remove(K key) {
+ while (true) {
+ Integer count = counts.get(key);
+ if (count == null) {
+ return -1;
+ }
+ if (count == 1) {
+ if (counts.remove(key, 1)) {
+ return 0;
+ }
+ } else if (counts.replace(key, count, count - 1)) {
+ return count - 1;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/0fea532a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/local/ServiceInterestListener.java
----------------------------------------------------------------------
diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/local/ServiceInterestListener.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/local/ServiceInterestListener.java
new file mode 100644
index 0000000..f821215
--- /dev/null
+++ b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/local/ServiceInterestListener.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.topologymanager.importer.local;
+
+public interface ServiceInterestListener {
+
+ void addServiceInterest(String filter);
+
+ void removeServiceInterest(String filter);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/0fea532a/topology-manager/src/test/java/org/apache/aries/rsa/topologymanager/importer/EndpointListenerImplTest.java
----------------------------------------------------------------------
diff --git a/topology-manager/src/test/java/org/apache/aries/rsa/topologymanager/importer/EndpointListenerImplTest.java b/topology-manager/src/test/java/org/apache/aries/rsa/topologymanager/importer/EndpointListenerImplTest.java
deleted file mode 100644
index 4e8e6b4..0000000
--- a/topology-manager/src/test/java/org/apache/aries/rsa/topologymanager/importer/EndpointListenerImplTest.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.aries.rsa.topologymanager.importer;
-
-import java.util.Dictionary;
-import java.util.List;
-
-import org.apache.aries.rsa.topologymanager.importer.EndpointListenerManager;
-import org.apache.aries.rsa.topologymanager.importer.TopologyManagerImport;
-import org.easymock.EasyMock;
-import org.easymock.IAnswer;
-import org.easymock.IMocksControl;
-import org.junit.Assert;
-import org.junit.Test;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.ServiceRegistration;
-import org.osgi.service.remoteserviceadmin.EndpointListener;
-
-public class EndpointListenerImplTest extends Assert {
-
- int testCase;
-
- @SuppressWarnings({
- "rawtypes", "unchecked"
- })
- @Test
- public void testScopeChange() {
- IMocksControl c = EasyMock.createNiceControl();
- BundleContext bc = c.createMock(BundleContext.class);
- TopologyManagerImport tm = c.createMock(TopologyManagerImport.class);
- ServiceRegistration sr = c.createMock(ServiceRegistration.class);
-
- // expect Listener registration
- EasyMock.expect(bc.registerService(EasyMock.anyObject(Class.class),
- EasyMock.anyObject(),
- (Dictionary)EasyMock.anyObject())).andReturn(sr).atLeastOnce();
-
- sr.setProperties((Dictionary)EasyMock.anyObject());
-
- // expect property changes based on later calls
- EasyMock.expectLastCall().andAnswer(new IAnswer<Object>() {
-
- public Object answer() throws Throwable {
- Object[] args = EasyMock.getCurrentArguments();
- Dictionary props = (Dictionary)args[0];
- List<String> scope = (List<String>)props.get(EndpointListener.ENDPOINT_LISTENER_SCOPE);
- switch (testCase) {
- case 1:
- assertEquals(1, scope.size());
- assertEquals("(a=b)", scope.get(0));
- break;
- case 2:
- assertEquals(0, scope.size());
- break;
- case 3:
- assertEquals("adding entry to empty list failed", 1, scope.size());
- assertEquals("(a=b)", scope.get(0));
- break;
- case 4:
- assertEquals("adding second entry failed", 2, scope.size());
- assertNotNull(scope.contains("(a=b)"));
- assertNotNull(scope.contains("(c=d)"));
- break;
- case 5:
- assertEquals("remove failed", 1, scope.size());
- assertEquals("(c=d)", scope.get(0));
- break;
- default:
- assertTrue("This should not happen!", false);
- }
- return null;
- }
- }).atLeastOnce();
-
- c.replay();
-
- EndpointListenerManager endpointListener = new EndpointListenerManager(bc, tm);
-
- endpointListener.start();
-
- testCase = 1;
- endpointListener.extendScope("(a=b)");
- testCase = 2;
- endpointListener.reduceScope("(a=b)");
-
- testCase = 3;
- endpointListener.extendScope("(a=b)");
- testCase = 4;
- endpointListener.extendScope("(c=d)");
- testCase = 5;
- endpointListener.reduceScope("(a=b)");
-
- endpointListener.stop();
-
- c.verify();
- }
-}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/0fea532a/topology-manager/src/test/java/org/apache/aries/rsa/topologymanager/importer/FilterHelperTest.java
----------------------------------------------------------------------
diff --git a/topology-manager/src/test/java/org/apache/aries/rsa/topologymanager/importer/FilterHelperTest.java b/topology-manager/src/test/java/org/apache/aries/rsa/topologymanager/importer/FilterHelperTest.java
deleted file mode 100644
index cf4ab00..0000000
--- a/topology-manager/src/test/java/org/apache/aries/rsa/topologymanager/importer/FilterHelperTest.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.aries.rsa.topologymanager.importer;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-public class FilterHelperTest {
-
- @Test
- public void testClass() {
- testWithClassName(FilterHelperTest.class.getName());
- }
-
- @Test
- public void testInnerClass() {
- testWithClassName(InnerClass.class.getName());
- }
-
- private void testWithClassName(String className) {
- String filter = String.format("(objectClass=%s)", className);
- String objClass = FilterHelper.getObjectClass(filter);
- Assert.assertEquals(className, objClass);
- }
-
- @Test
- public void testGetFullFilter() {
- String filter = "(a=b)";
- String objectClass = "my.Test";
- Assert.assertEquals(filter, FilterHelper.getFullFilter(null, filter));
- Assert.assertEquals("(objectClass=my.Test)", FilterHelper.getFullFilter(objectClass, null));
- Assert.assertEquals("(&(objectClass=my.Test)(a=b))", FilterHelper.getFullFilter(objectClass, filter));
- }
-
- class InnerClass {
- }
-
-}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/0fea532a/topology-manager/src/test/java/org/apache/aries/rsa/topologymanager/importer/ListenerHookImplTest.java
----------------------------------------------------------------------
diff --git a/topology-manager/src/test/java/org/apache/aries/rsa/topologymanager/importer/ListenerHookImplTest.java b/topology-manager/src/test/java/org/apache/aries/rsa/topologymanager/importer/ListenerHookImplTest.java
deleted file mode 100644
index efd9202..0000000
--- a/topology-manager/src/test/java/org/apache/aries/rsa/topologymanager/importer/ListenerHookImplTest.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.aries.rsa.topologymanager.importer;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Dictionary;
-import java.util.Hashtable;
-
-import org.apache.aries.rsa.topologymanager.importer.ListenerHookImpl;
-import org.apache.aries.rsa.topologymanager.importer.ServiceInterestListener;
-import org.easymock.EasyMock;
-import org.easymock.IMocksControl;
-import org.junit.Test;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.Filter;
-import org.osgi.framework.FrameworkUtil;
-import org.osgi.framework.InvalidSyntaxException;
-import org.osgi.framework.hooks.service.ListenerHook.ListenerInfo;
-import org.osgi.service.remoteserviceadmin.RemoteConstants;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-public class ListenerHookImplTest {
-
- @Test
- public void testExtendFilter() throws InvalidSyntaxException {
- String filter = "(a=b)";
- BundleContext bc = createBundleContext();
- filter = new ListenerHookImpl(bc, null).extendFilter(filter);
-
- Filter f = FrameworkUtil.createFilter(filter);
-
- Dictionary<String, String> m = new Hashtable<String, String>();
- m.put("a", "b");
- assertTrue(filter + " filter must match as uuid is missing", f.match(m));
- m.put(RemoteConstants.ENDPOINT_FRAMEWORK_UUID, "MyUUID");
- assertFalse(filter + " filter must NOT match as uuid is the local one", f.match(m));
- }
-
- @Test
- public void testAddedRemoved() throws InvalidSyntaxException {
- IMocksControl c = EasyMock.createControl();
- String filter = "(objectClass=My)";
- BundleContext bc = createBundleContext();
- BundleContext listenerBc = createBundleContext();
- ServiceInterestListener serviceInterestListener = c.createMock(ServiceInterestListener.class);
- ListenerHookImpl listenerHook = new ListenerHookImpl(bc, serviceInterestListener);
-
- ListenerInfo listener = c.createMock(ListenerInfo.class);
- EasyMock.expect(listener.getBundleContext()).andReturn(listenerBc);
- EasyMock.expect(listener.getFilter()).andReturn(filter).atLeastOnce();
-
- // Main assertions
- serviceInterestListener.addServiceInterest(listenerHook.extendFilter(filter));
- EasyMock.expectLastCall();
- serviceInterestListener.removeServiceInterest(listenerHook.extendFilter(filter));
- EasyMock.expectLastCall();
-
- Collection<ListenerInfo> listeners = Collections.singletonList(listener);
-
- c.replay();
- listenerHook.added(listeners);
- listenerHook.removed(listeners);
- c.verify();
- }
-
- private BundleContext createBundleContext() {
- BundleContext bc = EasyMock.createNiceMock(BundleContext.class);
- EasyMock.expect(bc.getProperty(EasyMock.eq("org.osgi.framework.uuid"))).andReturn("MyUUID").atLeastOnce();
- EasyMock.replay(bc);
- return bc;
- }
-}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/0fea532a/topology-manager/src/test/java/org/apache/aries/rsa/topologymanager/importer/ReferenceCounterTest.java
----------------------------------------------------------------------
diff --git a/topology-manager/src/test/java/org/apache/aries/rsa/topologymanager/importer/ReferenceCounterTest.java b/topology-manager/src/test/java/org/apache/aries/rsa/topologymanager/importer/ReferenceCounterTest.java
deleted file mode 100644
index 9b58288..0000000
--- a/topology-manager/src/test/java/org/apache/aries/rsa/topologymanager/importer/ReferenceCounterTest.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.aries.rsa.topologymanager.importer;
-
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-import org.apache.aries.rsa.topologymanager.importer.ReferenceCounter;
-
-public class ReferenceCounterTest {
-
- @Test
- public void testCounter() {
- ReferenceCounter<String> counter = new ReferenceCounter<String>();
- assertEquals(-1, counter.remove("a"));
- assertEquals(-1, counter.remove("a"));
- assertEquals(1, counter.add("a"));
- assertEquals(2, counter.add("a"));
- assertEquals(3, counter.add("a"));
- assertEquals(2, counter.remove("a"));
- assertEquals(1, counter.remove("a"));
- assertEquals(2, counter.add("a"));
- assertEquals(1, counter.remove("a"));
- assertEquals(0, counter.remove("a"));
- assertEquals(-1, counter.remove("a"));
- }
-}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/0fea532a/topology-manager/src/test/java/org/apache/aries/rsa/topologymanager/importer/local/EndpointListenerImplTest.java
----------------------------------------------------------------------
diff --git a/topology-manager/src/test/java/org/apache/aries/rsa/topologymanager/importer/local/EndpointListenerImplTest.java b/topology-manager/src/test/java/org/apache/aries/rsa/topologymanager/importer/local/EndpointListenerImplTest.java
new file mode 100644
index 0000000..0df85fb
--- /dev/null
+++ b/topology-manager/src/test/java/org/apache/aries/rsa/topologymanager/importer/local/EndpointListenerImplTest.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.topologymanager.importer.local;
+
+import java.util.Dictionary;
+import java.util.List;
+
+import org.apache.aries.rsa.topologymanager.importer.TopologyManagerImport;
+import org.apache.aries.rsa.topologymanager.importer.local.EndpointListenerManager;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.easymock.IMocksControl;
+import org.junit.Assert;
+import org.junit.Test;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.remoteserviceadmin.EndpointListener;
+
+public class EndpointListenerImplTest extends Assert {
+
+ int testCase;
+
+ @SuppressWarnings({
+ "rawtypes", "unchecked"
+ })
+ @Test
+ public void testScopeChange() {
+ IMocksControl c = EasyMock.createNiceControl();
+ BundleContext bc = c.createMock(BundleContext.class);
+ TopologyManagerImport tm = c.createMock(TopologyManagerImport.class);
+ ServiceRegistration sr = c.createMock(ServiceRegistration.class);
+
+ // expect Listener registration
+ EasyMock.expect(bc.registerService(EasyMock.anyObject(Class.class),
+ EasyMock.anyObject(),
+ (Dictionary)EasyMock.anyObject())).andReturn(sr).atLeastOnce();
+
+ sr.setProperties((Dictionary)EasyMock.anyObject());
+
+ // expect property changes based on later calls
+ EasyMock.expectLastCall().andAnswer(new IAnswer<Object>() {
+
+ public Object answer() throws Throwable {
+ Object[] args = EasyMock.getCurrentArguments();
+ Dictionary props = (Dictionary)args[0];
+ List<String> scope = (List<String>)props.get(EndpointListener.ENDPOINT_LISTENER_SCOPE);
+ switch (testCase) {
+ case 1:
+ assertEquals(1, scope.size());
+ assertEquals("(a=b)", scope.get(0));
+ break;
+ case 2:
+ assertEquals(0, scope.size());
+ break;
+ case 3:
+ assertEquals("adding entry to empty list failed", 1, scope.size());
+ assertEquals("(a=b)", scope.get(0));
+ break;
+ case 4:
+ assertEquals("adding second entry failed", 2, scope.size());
+ assertNotNull(scope.contains("(a=b)"));
+ assertNotNull(scope.contains("(c=d)"));
+ break;
+ case 5:
+ assertEquals("remove failed", 1, scope.size());
+ assertEquals("(c=d)", scope.get(0));
+ break;
+ default:
+ assertTrue("This should not happen!", false);
+ }
+ return null;
+ }
+ }).atLeastOnce();
+
+ c.replay();
+
+ EndpointListenerManager endpointListener = new EndpointListenerManager(bc, tm);
+
+ endpointListener.start();
+
+ testCase = 1;
+ endpointListener.extendScope("(a=b)");
+ testCase = 2;
+ endpointListener.reduceScope("(a=b)");
+
+ testCase = 3;
+ endpointListener.extendScope("(a=b)");
+ testCase = 4;
+ endpointListener.extendScope("(c=d)");
+ testCase = 5;
+ endpointListener.reduceScope("(a=b)");
+
+ endpointListener.stop();
+
+ c.verify();
+ }
+}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/0fea532a/topology-manager/src/test/java/org/apache/aries/rsa/topologymanager/importer/local/FilterHelperTest.java
----------------------------------------------------------------------
diff --git a/topology-manager/src/test/java/org/apache/aries/rsa/topologymanager/importer/local/FilterHelperTest.java b/topology-manager/src/test/java/org/apache/aries/rsa/topologymanager/importer/local/FilterHelperTest.java
new file mode 100644
index 0000000..ec642da
--- /dev/null
+++ b/topology-manager/src/test/java/org/apache/aries/rsa/topologymanager/importer/local/FilterHelperTest.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.topologymanager.importer.local;
+
+import org.apache.aries.rsa.topologymanager.importer.local.FilterHelper;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class FilterHelperTest {
+
+ @Test
+ public void testClass() {
+ testWithClassName(FilterHelperTest.class.getName());
+ }
+
+ @Test
+ public void testInnerClass() {
+ testWithClassName(InnerClass.class.getName());
+ }
+
+ private void testWithClassName(String className) {
+ String filter = String.format("(objectClass=%s)", className);
+ String objClass = FilterHelper.getObjectClass(filter);
+ Assert.assertEquals(className, objClass);
+ }
+
+ @Test
+ public void testGetFullFilter() {
+ String filter = "(a=b)";
+ String objectClass = "my.Test";
+ Assert.assertEquals(filter, FilterHelper.getFullFilter(null, filter));
+ Assert.assertEquals("(objectClass=my.Test)", FilterHelper.getFullFilter(objectClass, null));
+ Assert.assertEquals("(&(objectClass=my.Test)(a=b))", FilterHelper.getFullFilter(objectClass, filter));
+ }
+
+ class InnerClass {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/0fea532a/topology-manager/src/test/java/org/apache/aries/rsa/topologymanager/importer/local/ListenerHookImplTest.java
----------------------------------------------------------------------
diff --git a/topology-manager/src/test/java/org/apache/aries/rsa/topologymanager/importer/local/ListenerHookImplTest.java b/topology-manager/src/test/java/org/apache/aries/rsa/topologymanager/importer/local/ListenerHookImplTest.java
new file mode 100644
index 0000000..07df9d6
--- /dev/null
+++ b/topology-manager/src/test/java/org/apache/aries/rsa/topologymanager/importer/local/ListenerHookImplTest.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.topologymanager.importer.local;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Dictionary;
+import java.util.Hashtable;
+
+import org.apache.aries.rsa.topologymanager.importer.local.ListenerHookImpl;
+import org.apache.aries.rsa.topologymanager.importer.local.ServiceInterestListener;
+import org.easymock.EasyMock;
+import org.easymock.IMocksControl;
+import org.junit.Test;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.Filter;
+import org.osgi.framework.FrameworkUtil;
+import org.osgi.framework.InvalidSyntaxException;
+import org.osgi.framework.hooks.service.ListenerHook.ListenerInfo;
+import org.osgi.service.remoteserviceadmin.RemoteConstants;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class ListenerHookImplTest {
+
+ @Test
+ public void testExtendFilter() throws InvalidSyntaxException {
+ String filter = "(a=b)";
+ BundleContext bc = createBundleContext();
+ filter = new ListenerHookImpl(bc, null).extendFilter(filter);
+
+ Filter f = FrameworkUtil.createFilter(filter);
+
+ Dictionary<String, String> m = new Hashtable<String, String>();
+ m.put("a", "b");
+ assertTrue(filter + " filter must match as uuid is missing", f.match(m));
+ m.put(RemoteConstants.ENDPOINT_FRAMEWORK_UUID, "MyUUID");
+ assertFalse(filter + " filter must NOT match as uuid is the local one", f.match(m));
+ }
+
+ @Test
+ public void testAddedRemoved() throws InvalidSyntaxException {
+ IMocksControl c = EasyMock.createControl();
+ String filter = "(objectClass=My)";
+ BundleContext bc = createBundleContext();
+ BundleContext listenerBc = createBundleContext();
+ ServiceInterestListener serviceInterestListener = c.createMock(ServiceInterestListener.class);
+ ListenerHookImpl listenerHook = new ListenerHookImpl(bc, serviceInterestListener);
+
+ ListenerInfo listener = c.createMock(ListenerInfo.class);
+ EasyMock.expect(listener.getBundleContext()).andReturn(listenerBc);
+ EasyMock.expect(listener.getFilter()).andReturn(filter).atLeastOnce();
+
+ // Main assertions
+ serviceInterestListener.addServiceInterest(listenerHook.extendFilter(filter));
+ EasyMock.expectLastCall();
+ serviceInterestListener.removeServiceInterest(listenerHook.extendFilter(filter));
+ EasyMock.expectLastCall();
+
+ Collection<ListenerInfo> listeners = Collections.singletonList(listener);
+
+ c.replay();
+ listenerHook.added(listeners);
+ listenerHook.removed(listeners);
+ c.verify();
+ }
+
+ private BundleContext createBundleContext() {
+ BundleContext bc = EasyMock.createNiceMock(BundleContext.class);
+ EasyMock.expect(bc.getProperty(EasyMock.eq("org.osgi.framework.uuid"))).andReturn("MyUUID").atLeastOnce();
+ EasyMock.replay(bc);
+ return bc;
+ }
+}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/0fea532a/topology-manager/src/test/java/org/apache/aries/rsa/topologymanager/importer/local/ReferenceCounterTest.java
----------------------------------------------------------------------
diff --git a/topology-manager/src/test/java/org/apache/aries/rsa/topologymanager/importer/local/ReferenceCounterTest.java b/topology-manager/src/test/java/org/apache/aries/rsa/topologymanager/importer/local/ReferenceCounterTest.java
new file mode 100644
index 0000000..e1cba7d
--- /dev/null
+++ b/topology-manager/src/test/java/org/apache/aries/rsa/topologymanager/importer/local/ReferenceCounterTest.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.topologymanager.importer.local;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.aries.rsa.topologymanager.importer.local.ReferenceCounter;
+
+public class ReferenceCounterTest {
+
+ @Test
+ public void testCounter() {
+ ReferenceCounter<String> counter = new ReferenceCounter<String>();
+ assertEquals(-1, counter.remove("a"));
+ assertEquals(-1, counter.remove("a"));
+ assertEquals(1, counter.add("a"));
+ assertEquals(2, counter.add("a"));
+ assertEquals(3, counter.add("a"));
+ assertEquals(2, counter.remove("a"));
+ assertEquals(1, counter.remove("a"));
+ assertEquals(2, counter.add("a"));
+ assertEquals(1, counter.remove("a"));
+ assertEquals(0, counter.remove("a"));
+ assertEquals(-1, counter.remove("a"));
+ }
+}
[2/3] aries-rsa git commit: Move local service tracking to
EndpointListenerManager
Posted by cs...@apache.org.
Move local service tracking to EndpointListenerManager
Project: http://git-wip-us.apache.org/repos/asf/aries-rsa/repo
Commit: http://git-wip-us.apache.org/repos/asf/aries-rsa/commit/f9e4d403
Tree: http://git-wip-us.apache.org/repos/asf/aries-rsa/tree/f9e4d403
Diff: http://git-wip-us.apache.org/repos/asf/aries-rsa/diff/f9e4d403
Branch: refs/heads/master
Commit: f9e4d4039d396f8aad1809d67f173e1c8a5dd128
Parents: b16d2b9
Author: Christian Schneider <cs...@adobe.com>
Authored: Tue Jan 30 12:06:22 2018 +0100
Committer: Christian Schneider <cs...@adobe.com>
Committed: Tue Jan 30 12:06:22 2018 +0100
----------------------------------------------------------------------
.../aries/rsa/topologymanager/Activator.java | 5 ++
.../importer/EndpointListenerManager.java | 39 +++++++++-
.../rsa/topologymanager/importer/MultiMap.java | 24 +++---
.../importer/TopologyManagerImport.java | 78 ++++++--------------
.../importer/TopologyManagerImportTest.java | 3 -
5 files changed, 78 insertions(+), 71 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/f9e4d403/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/Activator.java
----------------------------------------------------------------------
diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/Activator.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/Activator.java
index 02d9674..4fe0581 100644
--- a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/Activator.java
+++ b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/Activator.java
@@ -29,6 +29,7 @@ import org.apache.aries.rsa.topologymanager.exporter.DefaultExportPolicy;
import org.apache.aries.rsa.topologymanager.exporter.EndpointListenerNotifier;
import org.apache.aries.rsa.topologymanager.exporter.EndpointRepository;
import org.apache.aries.rsa.topologymanager.exporter.TopologyManagerExport;
+import org.apache.aries.rsa.topologymanager.importer.EndpointListenerManager;
import org.apache.aries.rsa.topologymanager.importer.TopologyManagerImport;
import org.osgi.framework.BundleActivator;
import org.osgi.framework.BundleContext;
@@ -57,6 +58,7 @@ public class Activator implements BundleActivator {
private ThreadPoolExecutor exportExecutor;
private ServiceTracker<EndpointListener, EndpointListener> epListenerTracker;
private ServiceTracker<ExportPolicy, ExportPolicy> policyTracker;
+ private EndpointListenerManager endpointListenerManager;
public void start(final BundleContext bc) throws Exception {
Dictionary<String, String> props = new Hashtable<String, String>();
@@ -103,6 +105,8 @@ public class Activator implements BundleActivator {
exportExecutor = new ThreadPoolExecutor(5, 10, 50, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
exportManager = new TopologyManagerExport(endpointRepo, exportExecutor, policy);
importManager = new TopologyManagerImport(bc);
+ endpointListenerManager = new EndpointListenerManager(bc, importManager);
+ endpointListenerManager.start();
rsaTracker = new RSATracker(bc, RemoteServiceAdmin.class, null);
bc.addServiceListener(exportManager);
rsaTracker.open();
@@ -121,6 +125,7 @@ public class Activator implements BundleActivator {
bc.removeServiceListener(exportManager);
exportExecutor.shutdown();
importManager.stop();
+ endpointListenerManager.stop();
rsaTracker.close();
exportManager = null;
}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/f9e4d403/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/EndpointListenerManager.java
----------------------------------------------------------------------
diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/EndpointListenerManager.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/EndpointListenerManager.java
index 1207f9f..87f0fbe 100644
--- a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/EndpointListenerManager.java
+++ b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/EndpointListenerManager.java
@@ -25,14 +25,21 @@ import java.util.List;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceRegistration;
+import org.osgi.framework.hooks.service.FindHook;
+import org.osgi.framework.hooks.service.ListenerHook;
import org.osgi.service.remoteserviceadmin.EndpointListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Manages an EndpointListener and adjusts its scope according to requested service filters.
+ * Manages the endpoint listener for the import of external services.
+ * The endpoint listener scope reflects the combined filters of all services
+ * that are asked for (by listeners and service lookups) in the current system.
+ *
+ * Discovery will then send callbacks when external endpoints are added / removed that match
+ * the interest in the local system.
*/
-public class EndpointListenerManager {
+public class EndpointListenerManager implements ServiceInterestListener{
private static final Logger LOG = LoggerFactory.getLogger(EndpointListenerManager.class);
@@ -40,15 +47,26 @@ public class EndpointListenerManager {
private volatile ServiceRegistration<EndpointListener> serviceRegistration;
private final List<String> filters = new ArrayList<String>();
private final EndpointListener endpointListener;
+ private final ListenerHookImpl listenerHook;
+ private RSFindHook findHook;
+
+ /**
+ * Count service interest by filter. This allows to modify the scope of the EndpointListener as seldom as possible
+ */
+ private final ReferenceCounter<String> importInterestsCounter = new ReferenceCounter<String>();
public EndpointListenerManager(BundleContext bc, EndpointListener endpointListener) {
this.bctx = bc;
this.endpointListener = endpointListener;
+ this.listenerHook = new ListenerHookImpl(bc, this);
+ findHook = new RSFindHook(bc, this);
}
- protected void start() {
+ public void start() {
serviceRegistration = bctx.registerService(EndpointListener.class, endpointListener,
getRegistrationProperties());
+ bctx.registerService(ListenerHook.class, listenerHook, null);
+ bctx.registerService(FindHook.class, findHook, null);
}
public void stop() {
@@ -95,4 +113,19 @@ public class EndpointListenerManager {
serviceRegistration.setProperties(getRegistrationProperties());
}
}
+
+ @Override
+ public void addServiceInterest(String filter) {
+ if (importInterestsCounter.add(filter) == 1) {
+ extendScope(filter);
+ }
+ }
+
+ @Override
+ public void removeServiceInterest(String filter) {
+ if (importInterestsCounter.remove(filter) == 0) {
+ LOG.debug("last reference to import interest is gone -> removing interest filter: {}", filter);
+ reduceScope(filter);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/f9e4d403/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/MultiMap.java
----------------------------------------------------------------------
diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/MultiMap.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/MultiMap.java
index 5eda3c4..13597b5 100644
--- a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/MultiMap.java
+++ b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/MultiMap.java
@@ -20,8 +20,7 @@ package org.apache.aries.rsa.topologymanager.importer;
import java.util.Collections;
import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
+import java.util.HashSet;
import java.util.Map;
import java.util.Set;
@@ -30,27 +29,27 @@ import java.util.Set;
*/
public class MultiMap<T> {
- private Map<String, List<T>> map;
+ private Map<String, Set<T>> map;
public MultiMap() {
map = new HashMap<>();
}
public synchronized void put(String key, T value) {
- List<T> values = map.get(key);
+ Set<T> values = map.get(key);
if (values == null) {
- values = new LinkedList<>();
+ values = new HashSet<>();
map.put(key, values);
}
values.add(value);
}
- public synchronized List<T> get(String key) {
- return map.getOrDefault(key, Collections.<T>emptyList());
+ public synchronized Set<T> get(String key) {
+ return map.getOrDefault(key, Collections.<T>emptySet());
}
public synchronized void remove(String key, T value) {
- List<T> values = map.get(key);
+ Set<T> values = map.get(key);
values.remove(value);
if (values.isEmpty()) {
map.remove(key);
@@ -60,4 +59,11 @@ public class MultiMap<T> {
public synchronized Set<String> keySet() {
return map.keySet();
}
-}
\ No newline at end of file
+
+ public void remove(T toRemove) {
+ Set<String> keys = new HashSet<>(map.keySet());
+ for (String key : keys) {
+ remove(key, toRemove);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/f9e4d403/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java
----------------------------------------------------------------------
diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java
index bacdea0..52b0bdc 100644
--- a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java
+++ b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java
@@ -51,24 +51,14 @@ import org.slf4j.LoggerFactory;
* ServiceInterestListener interface.
* Manages local creation and destruction of service imports using the available RemoteServiceAdmin services.
*/
-public class TopologyManagerImport implements EndpointListener, RemoteServiceAdminListener, ServiceInterestListener {
+public class TopologyManagerImport implements EndpointListener, RemoteServiceAdminListener {
private static final Logger LOG = LoggerFactory.getLogger(TopologyManagerImport.class);
private ExecutorService execService;
- private final EndpointListenerManager endpointListenerManager;
private final BundleContext bctx;
private Set<RemoteServiceAdmin> rsaSet;
- private final ListenerHookImpl listenerHook;
- private RSFindHook findHook;
- /**
- * Contains an instance of the Class Import Interest for each distinct import request. If the same filter
- * is requested multiple times the existing instance of the Object increments an internal reference
- * counter. If an interest is removed, the related ServiceInterest object is used to reduce the reference
- * counter until it reaches zero. in this case the interest is removed.
- */
- private final ReferenceCounter<String> importInterestsCounter = new ReferenceCounter<String>();
/**
* List of Endpoints by matched filter that were reported by the EndpointListener and can be imported
@@ -85,58 +75,35 @@ public class TopologyManagerImport implements EndpointListener, RemoteServiceAdm
public TopologyManagerImport(BundleContext bc) {
this.rsaSet = new HashSet<RemoteServiceAdmin>();
bctx = bc;
- endpointListenerManager = new EndpointListenerManager(bctx, this);
execService = new ThreadPoolExecutor(5, 10, 50, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
- listenerHook = new ListenerHookImpl(bc, this);
- findHook = new RSFindHook(bc, this);
}
public void start() {
bctx.registerService(RemoteServiceAdminListener.class, this, null);
- bctx.registerService(ListenerHook.class, listenerHook, null);
- bctx.registerService(FindHook.class, findHook, null);
- endpointListenerManager.start();
}
public void stop() {
- endpointListenerManager.stop();
execService.shutdown();
- // this is called from Activator.stop(), which implicitly unregisters our registered services
- }
-
- @Override
- public void addServiceInterest(String filter) {
- if (importInterestsCounter.add(filter) == 1) {
- endpointListenerManager.extendScope(filter);
- }
- }
-
- @Override
- public void removeServiceInterest(String filter) {
- if (importInterestsCounter.remove(filter) == 0) {
- LOG.debug("last reference to import interest is gone -> removing interest filter: {}", filter);
- endpointListenerManager.reduceScope(filter);
- }
}
@Override
public void endpointAdded(EndpointDescription endpoint, String filter) {
LOG.debug("Endpoint added for filter {}, endpoint {}", filter, endpoint);
importPossibilities.put(filter, endpoint);
- triggerImport(filter);
+ triggerSyncImports(filter);
}
@Override
public void endpointRemoved(EndpointDescription endpoint, String filter) {
LOG.debug("Endpoint removed for filter {}, endpoint {}", filter, endpoint);
importPossibilities.remove(filter, endpoint);
- triggerImport(filter);
+ triggerSyncImports(filter);
}
public void add(RemoteServiceAdmin rsa) {
rsaSet.add(rsa);
for (String filter : importPossibilities.keySet()) {
- triggerImport(filter);
+ triggerSyncImports(filter);
}
}
@@ -147,24 +114,24 @@ public class TopologyManagerImport implements EndpointListener, RemoteServiceAdm
@Override
public void remoteAdminEvent(RemoteServiceAdminEvent event) {
if (event.getType() == RemoteServiceAdminEvent.IMPORT_UNREGISTRATION) {
- removeAndClose(event.getImportReference());
+ unImport(event.getImportReference());
}
}
- private void triggerImport(final String filter) {
+ private void triggerSyncImports(final String filter) {
LOG.debug("Import of a service for filter {} was queued", filter);
if (!rsaSet.isEmpty()) {
execService.execute(new Runnable() {
public void run() {
- doImport(filter);
+ syncImports(filter);
}
});
}
}
- private void doImport(final String filter) {
+ private void syncImports(final String filter) {
try {
- unexportNotAvailableServices(filter);
+ unImportForGoneEndpoints(filter);
importServices(filter);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
@@ -173,7 +140,7 @@ public class TopologyManagerImport implements EndpointListener, RemoteServiceAdm
}
private void importServices(String filter) {
- List<ImportRegistration> importRegistrations = importedServices.get(filter);
+ Set<ImportRegistration> importRegistrations = importedServices.get(filter);
for (EndpointDescription endpoint : importPossibilities.get(filter)) {
// TODO but optional: if the service is already imported and the endpoint is still
// in the list of possible imports check if a "better" endpoint is now in the list
@@ -187,12 +154,10 @@ public class TopologyManagerImport implements EndpointListener, RemoteServiceAdm
}
}
- private boolean alreadyImported(EndpointDescription endpoint, List<ImportRegistration> importRegistrations) {
- if (importRegistrations != null) {
- for (ImportRegistration ir : importRegistrations) {
- if (endpoint.equals(ir.getImportReference().getImportedEndpoint())) {
- return true;
- }
+ private boolean alreadyImported(EndpointDescription endpoint, Set<ImportRegistration> importRegistrations) {
+ for (ImportRegistration ir : importRegistrations) {
+ if (endpoint.equals(ir.getImportReference().getImportedEndpoint())) {
+ return true;
}
}
return false;
@@ -219,24 +184,24 @@ public class TopologyManagerImport implements EndpointListener, RemoteServiceAdm
return null;
}
- private void unexportNotAvailableServices(String filter) {
- List<ImportRegistration> importRegistrations = importedServices.get(filter);
- List<EndpointDescription> endpoints = importPossibilities.get(filter);
+ private void unImportForGoneEndpoints(String filter) {
+ Set<ImportRegistration> importRegistrations = importedServices.get(filter);
+ Set<EndpointDescription> endpoints = importPossibilities.get(filter);
for (ImportRegistration ir : importRegistrations) {
EndpointDescription endpoint = ir.getImportReference().getImportedEndpoint();
if (!endpoints.contains(endpoint)) {
- removeAndClose(ir.getImportReference());
+ unImport(ir.getImportReference());
}
}
}
- private void removeAndClose(ImportReference ref) {
+ private void unImport(ImportReference ref) {
List<ImportRegistration> removed = new ArrayList<ImportRegistration>();
- for (String key : importedServices.keySet()) {
+ HashSet<String> imported = new HashSet<>(importedServices.keySet());
+ for (String key : imported) {
for (ImportRegistration ir : importedServices.get(key)) {
if (ir.getImportReference().equals(ref)) {
removed.add(ir);
- importedServices.remove(key, ir);
}
}
}
@@ -245,6 +210,7 @@ public class TopologyManagerImport implements EndpointListener, RemoteServiceAdm
private void closeAll(List<ImportRegistration> removed) {
for (ImportRegistration ir : removed) {
+ importedServices.remove(ir);
ir.close();
}
}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/f9e4d403/topology-manager/src/test/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImportTest.java
----------------------------------------------------------------------
diff --git a/topology-manager/src/test/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImportTest.java b/topology-manager/src/test/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImportTest.java
index 919f733..8b8bbc2 100644
--- a/topology-manager/src/test/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImportTest.java
+++ b/topology-manager/src/test/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImportTest.java
@@ -79,13 +79,10 @@ public class TopologyManagerImportTest {
@SuppressWarnings({ "rawtypes", "unchecked" })
private BundleContext getBundleContext(IMocksControl c) {
ServiceRegistration sreg = c.createMock(ServiceRegistration.class);
- sreg.unregister();
- EasyMock.expectLastCall().once();
BundleContext bc = c.createMock(BundleContext.class);
EasyMock.expect(bc.registerService(EasyMock.anyObject(Class.class),
EasyMock.anyObject(),
(Dictionary)EasyMock.anyObject())).andReturn(sreg).anyTimes();
- EasyMock.expect(bc.getProperty(Constants.FRAMEWORK_UUID)).andReturn("myid").atLeastOnce();
return bc;
}
}