You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aries.apache.org by cs...@apache.org on 2016/03/11 20:43:20 UTC

[21/50] [abbrv] aries-rsa git commit: Switch project setup to Aries

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/d73a3a7f/topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/importer/ServiceInterestListener.java
----------------------------------------------------------------------
diff --git a/topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/importer/ServiceInterestListener.java b/topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/importer/ServiceInterestListener.java
new file mode 100644
index 0000000..f4db92e
--- /dev/null
+++ b/topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/importer/ServiceInterestListener.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.dosgi.topologymanager.importer;
+
+public interface ServiceInterestListener {
+
+    void addServiceInterest(String filter);
+
+    void removeServiceInterest(String filter);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/d73a3a7f/topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/importer/TopologyManagerImport.java
----------------------------------------------------------------------
diff --git a/topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/importer/TopologyManagerImport.java b/topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/importer/TopologyManagerImport.java
new file mode 100644
index 0000000..30fe4c2
--- /dev/null
+++ b/topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/importer/TopologyManagerImport.java
@@ -0,0 +1,345 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.dosgi.topologymanager.importer;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.hooks.service.ListenerHook;
+import org.osgi.service.remoteserviceadmin.EndpointDescription;
+import org.osgi.service.remoteserviceadmin.EndpointListener;
+import org.osgi.service.remoteserviceadmin.ImportReference;
+import org.osgi.service.remoteserviceadmin.ImportRegistration;
+import org.osgi.service.remoteserviceadmin.RemoteServiceAdmin;
+import org.osgi.service.remoteserviceadmin.RemoteServiceAdminEvent;
+import org.osgi.service.remoteserviceadmin.RemoteServiceAdminListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Listens for remote endpoints using the EndpointListener interface and the EndpointListenerManager.
+ * Listens for local service interests using the ListenerHookImpl that calls back through the
+ * ServiceInterestListener interface.
+ * Manages local creation and destruction of service imports using the available RemoteServiceAdmin services.
+ */
+public class TopologyManagerImport implements EndpointListener, RemoteServiceAdminListener, ServiceInterestListener {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TopologyManagerImport.class);
+    private ExecutorService execService;
+
+    private final EndpointListenerManager endpointListenerManager;
+    private final BundleContext bctx;
+    private Set<RemoteServiceAdmin> rsaSet;
+    private final ListenerHookImpl listenerHook;
+
+    /**
+     * If set to false only one service is imported for each import interest even it multiple services are
+     * available. If set to true, all available services are imported.
+     *
+     * TODO: Make this available as a configuration option
+     */
+    private boolean importAllAvailable = true;
+
+    /**
+     * Contains an instance of the Class Import Interest for each distinct import request. If the same filter
+     * is requested multiple times the existing instance of the Object increments an internal reference
+     * counter. If an interest is removed, the related ServiceInterest object is used to reduce the reference
+     * counter until it reaches zero. in this case the interest is removed.
+     */
+    private final ReferenceCounter<String> importInterestsCounter = new ReferenceCounter<String>();
+
+    /**
+     * List of Endpoints by matched filter that were reported by the EndpointListener and can be imported
+     */
+    private final Map<String /* filter */, List<EndpointDescription>> importPossibilities
+        = new HashMap<String, List<EndpointDescription>>();
+
+    /**
+     * List of already imported Endpoints by their matched filter
+     */
+    private final Map<String /* filter */, List<ImportRegistration>> importedServices
+        = new HashMap<String, List<ImportRegistration>>();
+    
+
+    public TopologyManagerImport(BundleContext bc) {
+        this.rsaSet = new HashSet<RemoteServiceAdmin>();
+        bctx = bc;
+        endpointListenerManager = new EndpointListenerManager(bctx, this);
+        execService = new ThreadPoolExecutor(5, 10, 50, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
+        listenerHook = new ListenerHookImpl(bc, this);
+    }
+    
+    public void start() {
+        bctx.registerService(RemoteServiceAdminListener.class, this, null);
+        bctx.registerService(ListenerHook.class, listenerHook, null);
+        endpointListenerManager.start();
+    }
+
+    public void stop() {
+        endpointListenerManager.stop();
+        execService.shutdown();
+        // this is called from Activator.stop(), which implicitly unregisters our registered services
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.cxf.dosgi.topologymanager.ServiceInterestListener#addServiceInterest(java.lang.String)
+     */
+    public void addServiceInterest(String filter) {
+        if (importInterestsCounter.add(filter) == 1) {
+            endpointListenerManager.extendScope(filter);
+        }
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.cxf.dosgi.topologymanager.ServiceInterestListener#removeServiceInterest(java.lang.String)
+     */
+    public void removeServiceInterest(String filter) {
+        if (importInterestsCounter.remove(filter) == 0) {
+            LOG.debug("last reference to import interest is gone -> removing interest filter: {}", filter);
+            endpointListenerManager.reduceScope(filter);
+            synchronized (importedServices) {
+                List<ImportRegistration> irs = importedServices.remove(filter);
+                if (irs != null) {
+                    for (ImportRegistration ir : irs) {
+                        ir.close();
+                    }
+                }
+            }
+        }
+    }
+
+    public void endpointAdded(EndpointDescription endpoint, String filter) {
+        if (filter == null) {
+            LOG.error("Endpoint is not handled because no matching filter was provided!");
+            return;
+        }
+        LOG.debug("importable service added for filter {}, endpoint {}", filter, endpoint);
+        addImportPossibility(endpoint, filter);
+        triggerImport(filter);
+    }
+
+    public void endpointRemoved(EndpointDescription endpoint, String filter) {
+        LOG.debug("EndpointRemoved {}", endpoint);
+        removeImportPossibility(endpoint, filter);
+        triggerImport(filter);
+    }
+
+    private void addImportPossibility(EndpointDescription endpoint, String filter) {
+        synchronized (importPossibilities) {
+            List<EndpointDescription> endpoints = importPossibilities.get(filter);
+            if (endpoints == null) {
+                endpoints = new ArrayList<EndpointDescription>();
+                importPossibilities.put(filter, endpoints);
+            }
+            // prevent adding the same endpoint multiple times, which can happen sometimes,
+            // and which causes imports to remain available even when services are actually down
+            if (!endpoints.contains(endpoint)) {
+                endpoints.add(endpoint);
+            }
+        }
+    }
+
+    private void removeImportPossibility(EndpointDescription endpoint, String filter) {
+        synchronized (importPossibilities) {
+            List<EndpointDescription> endpoints = importPossibilities.get(filter);
+            if (endpoints != null) {
+                endpoints.remove(endpoint);
+                if (endpoints.isEmpty()) {
+                    importPossibilities.remove(filter);
+                }
+            }
+        }
+    }
+
+    public void add(RemoteServiceAdmin rsa) {
+        rsaSet.add(rsa);
+        synchronized (importPossibilities) {
+            for (String filter : importPossibilities.keySet()) {
+                triggerImport(filter);
+            }
+        }
+    }
+    
+    public void remove(RemoteServiceAdmin rsa) {
+        rsaSet.remove(rsa);
+    }
+
+
+    private void triggerImport(final String filter) {
+        LOG.debug("Import of a service for filter {} was queued", filter);
+
+        execService.execute(new Runnable() {
+            public void run() {
+                try {
+                    unexportNotAvailableServices(filter);
+                    importServices(filter);
+                } catch (Exception e) {
+                    LOG.error(e.getMessage(), e);
+                }
+                // Notify EndpointListeners? NO!
+            }
+        });
+    }
+
+    private void unexportNotAvailableServices(String filter) {
+        synchronized (importedServices) {
+            List<ImportRegistration> importRegistrations = importedServices.get(filter);
+            if (importRegistrations != null) {
+                // iterate over a copy
+                for (ImportRegistration ir : new ArrayList<ImportRegistration>(importRegistrations)) {
+                    EndpointDescription endpoint = ir.getImportReference().getImportedEndpoint();
+                    if (!isImportPossibilityAvailable(endpoint, filter)) {
+                        removeImport(ir, null); // also unexports the service
+                    }
+                }
+            }
+        }
+    }
+
+    private boolean isImportPossibilityAvailable(EndpointDescription endpoint, String filter) {
+        synchronized (importPossibilities) {
+            List<EndpointDescription> endpoints = importPossibilities.get(filter);
+            return endpoints != null && endpoints.contains(endpoint);
+        }
+    }
+
+    // return a copy to prevent sync issues
+    private List<EndpointDescription> getImportPossibilitiesCopy(String filter) {
+        synchronized (importPossibilities) {
+            List<EndpointDescription> possibilities = importPossibilities.get(filter);
+            return possibilities == null
+                ? Collections.<EndpointDescription>emptyList()
+                : new ArrayList<EndpointDescription>(possibilities);
+        }
+    }
+
+    private void importServices(String filter) {
+        synchronized (importedServices) {
+            List<ImportRegistration> importRegistrations = importedServices.get(filter);
+            for (EndpointDescription endpoint : getImportPossibilitiesCopy(filter)) {
+                // TODO but optional: if the service is already imported and the endpoint is still
+                // in the list of possible imports check if a "better" endpoint is now in the list
+                if (!alreadyImported(endpoint, importRegistrations)) {
+                    // service not imported yet -> import it now
+                    ImportRegistration ir = importService(endpoint);
+                    if (ir != null) {
+                        // import was successful
+                        if (importRegistrations == null) {
+                            importRegistrations = new ArrayList<ImportRegistration>();
+                            importedServices.put(filter, importRegistrations);
+                        }
+                        importRegistrations.add(ir);
+                        if (!importAllAvailable) {
+                            return;
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    private boolean alreadyImported(EndpointDescription endpoint, List<ImportRegistration> importRegistrations) {
+        if (importRegistrations != null) {
+            for (ImportRegistration ir : importRegistrations) {
+                if (endpoint.equals(ir.getImportReference().getImportedEndpoint())) {
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
+    /**
+     * Tries to import the service with each rsa until one import is successful
+     *
+     * @param endpoint endpoint to import
+     * @return import registration of the first successful import
+     */
+    private ImportRegistration importService(EndpointDescription endpoint) {
+        for (RemoteServiceAdmin rsa : rsaSet) {
+            ImportRegistration ir = rsa.importService(endpoint);
+            if (ir != null) {
+                if (ir.getException() == null) {
+                    LOG.debug("Service import was successful {}", ir);
+                    return ir;
+                } else {
+                    LOG.info("Error importing service " + endpoint, ir.getException());
+                }
+            }
+        }
+        return null;
+    }
+
+    /**
+     * Remove and close (unexport) the given import. The import is specified either
+     * by its ImportRegistration or by its ImportReference (only one of them must
+     * be specified).
+     * <p>
+     * If this method is called from within iterations on the underlying data structure,
+     * the iterations must be made on copies of the structures rather than the original
+     * references in order to prevent ConcurrentModificationExceptions.
+     *
+     * @param reg the import registration to remove
+     * @param ref the import reference to remove
+     */
+    private void removeImport(ImportRegistration reg, ImportReference ref) {
+        // this method may be called recursively by calling ImportRegistration.close()
+        // and receiving a RemoteServiceAdminEvent for its unregistration, which results
+        // in a ConcurrentModificationException. We avoid this by closing the registrations
+        // only after data structure manipulation is done, and being re-entrant.
+        synchronized (importedServices) {
+            List<ImportRegistration> removed = new ArrayList<ImportRegistration>();
+            for (Iterator<List<ImportRegistration>> it1 = importedServices.values().iterator(); it1.hasNext();) {
+                Collection<ImportRegistration> irs = it1.next();
+                for (Iterator<ImportRegistration> it2 = irs.iterator(); it2.hasNext();) {
+                    ImportRegistration ir = it2.next();
+                    if (ir.equals(reg) || ir.getImportReference().equals(ref)) {
+                        removed.add(ir);
+                        it2.remove();
+                    }
+                }
+                if (irs.isEmpty()) {
+                    it1.remove();
+                }
+            }
+            for (ImportRegistration ir : removed) {
+                ir.close();
+            }
+        }
+    }
+
+    public void remoteAdminEvent(RemoteServiceAdminEvent event) {
+        if (event.getType() == RemoteServiceAdminEvent.IMPORT_UNREGISTRATION) {
+            removeImport(null, event.getImportReference());
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/d73a3a7f/topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/ActivatorTest.java
----------------------------------------------------------------------
diff --git a/topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/ActivatorTest.java b/topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/ActivatorTest.java
new file mode 100644
index 0000000..100e3a3
--- /dev/null
+++ b/topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/ActivatorTest.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.dosgi.topologymanager;
+
+import org.apache.cxf.dosgi.topologymanager.exporter.DefaultExportPolicy;
+import org.apache.cxf.dosgi.topologymanager.exporter.TopologyManagerExport;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.easymock.IMocksControl;
+import org.junit.Test;
+import org.osgi.framework.Bundle;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.Constants;
+import org.osgi.framework.Filter;
+import org.osgi.framework.FrameworkUtil;
+import org.osgi.framework.ServiceReference;
+
+public class ActivatorTest {
+
+    @Test
+    public void testStart() throws Exception {
+        IMocksControl c = EasyMock.createNiceControl();
+        BundleContext context = c.createMock(BundleContext.class);
+        EasyMock.expect(context.getProperty(Constants.FRAMEWORK_UUID)).andReturn("myid");
+        context.addServiceListener(EasyMock.isA(TopologyManagerExport.class));
+        EasyMock.expectLastCall();
+        final Capture<String> filter = EasyMock.newCapture();
+        EasyMock.expect(context.createFilter(EasyMock.capture(filter)))
+            .andAnswer(new IAnswer<Filter>() {
+                public Filter answer() throws Throwable {
+                    return FrameworkUtil.createFilter(filter.getValue());
+                }
+            }).times(2);
+        ServiceReference<?> sref = c.createMock(ServiceReference.class);
+        Bundle bundle = c.createMock(Bundle.class);
+        EasyMock.expect(sref.getBundle()).andReturn(bundle).anyTimes();
+        EasyMock.expect(context.getServiceReferences((String)null, Activator.DOSGI_SERVICES))
+            .andReturn(new ServiceReference[]{sref});
+
+        c.replay();
+        Activator activator = new Activator();
+        activator.doStart(context, new DefaultExportPolicy());
+        c.verify();
+        
+        c.reset();
+        c.replay();
+        activator.doStop(context);
+        c.verify();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/d73a3a7f/topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/exporter/EndpointListenerNotifierTest.java
----------------------------------------------------------------------
diff --git a/topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/exporter/EndpointListenerNotifierTest.java b/topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/exporter/EndpointListenerNotifierTest.java
new file mode 100644
index 0000000..04bd017
--- /dev/null
+++ b/topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/exporter/EndpointListenerNotifierTest.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.dosgi.topologymanager.exporter;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Dictionary;
+import java.util.HashSet;
+import java.util.Hashtable;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Test;
+import org.osgi.framework.Filter;
+import org.osgi.framework.FrameworkUtil;
+import org.osgi.framework.InvalidSyntaxException;
+import org.osgi.framework.ServiceReference;
+import org.osgi.service.remoteserviceadmin.EndpointDescription;
+import org.osgi.service.remoteserviceadmin.EndpointListener;
+import org.osgi.service.remoteserviceadmin.RemoteConstants;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+@SuppressWarnings({
+    "rawtypes", "unchecked"
+   })
+public class EndpointListenerNotifierTest {
+
+    @Test
+    public void testNotifyListener() throws InvalidSyntaxException {
+        EndpointDescription endpoint1 = createEndpoint("myClass");
+        EndpointDescription endpoint2 = createEndpoint("notMyClass");
+
+        // Expect listener to be called for endpoint1 but not for endpoint2 
+        EndpointListener epl = listenerExpects(endpoint1, "(objectClass=myClass)");
+
+        EndpointRepository exportRepository = new EndpointRepository();
+        EndpointListenerNotifier tm = new EndpointListenerNotifier(exportRepository);
+
+        EasyMock.replay(epl);
+        Set<Filter> filters = new HashSet<Filter>();
+        filters.add(FrameworkUtil.createFilter("(objectClass=myClass)"));
+        tm.add(epl, filters);
+        tm.endpointAdded(endpoint1, null);
+        tm.endpointAdded(endpoint2, null);
+        tm.endpointRemoved(endpoint1, null);
+        tm.endpointRemoved(endpoint2, null);
+        EasyMock.verify(epl);
+    }
+
+    private EndpointListener listenerExpects(EndpointDescription endpoint, String filter) {
+        EndpointListener epl = EasyMock.createStrictMock(EndpointListener.class);
+        epl.endpointAdded(EasyMock.eq(endpoint), EasyMock.eq(filter));
+        EasyMock.expectLastCall().once();
+        epl.endpointRemoved(EasyMock.eq(endpoint), EasyMock.eq(filter));
+        EasyMock.expectLastCall().once();
+        return epl;
+    }
+    
+    @Test
+    public void testNotifyListeners() throws InvalidSyntaxException {
+        EndpointDescription endpoint1 = createEndpoint("myClass");
+        
+        EndpointListener epl = EasyMock.createStrictMock(EndpointListener.class);
+        epl.endpointAdded(EasyMock.eq(endpoint1), EasyMock.eq("(objectClass=myClass)"));
+        EasyMock.expectLastCall().once();
+        epl.endpointRemoved(EasyMock.eq(endpoint1), EasyMock.eq("(objectClass=myClass)"));
+        EasyMock.expectLastCall().once();
+
+        EndpointRepository exportRepository = new EndpointRepository();
+        EndpointListenerNotifier tm = new EndpointListenerNotifier(exportRepository);
+
+        EasyMock.replay(epl);
+        Set<Filter> filters = new HashSet<Filter>();
+        filters.add(FrameworkUtil.createFilter("(objectClass=myClass)"));
+        tm.add(epl, filters);
+        tm.endpointAdded(endpoint1, null);
+        tm.endpointRemoved(endpoint1, null);
+        tm.remove(epl);
+        EasyMock.verify(epl);
+    }
+    
+    public EndpointDescription createEndpoint(String iface) {
+        Map<String, Object> props = new Hashtable<String, Object>(); 
+        props.put("objectClass", new String[]{iface});
+        props.put(RemoteConstants.ENDPOINT_ID, iface);
+        props.put(RemoteConstants.SERVICE_IMPORTED_CONFIGS, "any");
+        return new EndpointDescription(props);
+    }
+
+    @Test
+    public void testNormalizeScopeForSingleString() {
+        ServiceReference sr = createListenerServiceWithFilter("(myProp=A)");
+        Set<Filter> res = EndpointListenerNotifier.getFiltersFromEndpointListenerScope(sr);
+        assertEquals(1, res.size());
+        Filter filter = res.iterator().next();
+        filterMatches(filter);
+    }
+
+    @Test
+    public void testNormalizeScopeForStringArray() {
+        String[] filters = {"(myProp=A)", "(otherProp=B)"};
+        ServiceReference sr = createListenerServiceWithFilter(filters); 
+        Set<Filter> res = EndpointListenerNotifier.getFiltersFromEndpointListenerScope(sr);
+        assertEquals(filters.length, res.size());
+        Iterator<Filter> it = res.iterator();
+        Filter filter1 = it.next();
+        Filter filter2 = it.next();
+        Dictionary<String, String> props = new Hashtable();
+        props.put("myProp", "A");
+        assertThat(filter1.match(props) || filter2.match(props), is(true));
+    }
+
+    @Test
+    public void testNormalizeScopeForCollection() {
+        Collection<String> collection = Arrays.asList("(myProp=A)", "(otherProp=B)");
+        ServiceReference sr = createListenerServiceWithFilter(collection);
+        Set<Filter> res = EndpointListenerNotifier.getFiltersFromEndpointListenerScope(sr);
+        Iterator<Filter> it = res.iterator();
+        Filter filter1 = it.next();
+        Filter filter2 = it.next();
+        Dictionary<String, String> props = new Hashtable();
+        props.put("myProp", "A");
+        Assert.assertThat(filter1.match(props) || filter2.match(props), is(true));
+    }
+    
+    private void filterMatches(Filter filter) {
+        Dictionary<String, String> props = new Hashtable();
+        props.put("myProp", "A");
+        Assert.assertTrue("Filter should match", filter.match(props));
+    }
+
+    private ServiceReference createListenerServiceWithFilter(Object filters) {
+        ServiceReference sr = EasyMock.createMock(ServiceReference.class);
+        EasyMock.expect(sr.getProperty(EndpointListener.ENDPOINT_LISTENER_SCOPE)).andReturn(filters);
+        EasyMock.replay(sr);
+        return sr;
+    }
+}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/d73a3a7f/topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/exporter/EndpointRepositoryTest.java
----------------------------------------------------------------------
diff --git a/topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/exporter/EndpointRepositoryTest.java b/topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/exporter/EndpointRepositoryTest.java
new file mode 100644
index 0000000..cb07f43
--- /dev/null
+++ b/topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/exporter/EndpointRepositoryTest.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.dosgi.topologymanager.exporter;
+
+import java.util.Arrays;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+
+import org.easymock.EasyMock;
+import org.easymock.IMocksControl;
+import org.junit.Test;
+import org.osgi.framework.Bundle;
+import org.osgi.framework.InvalidSyntaxException;
+import org.osgi.framework.ServiceReference;
+import org.osgi.service.remoteserviceadmin.EndpointDescription;
+import org.osgi.service.remoteserviceadmin.EndpointListener;
+import org.osgi.service.remoteserviceadmin.RemoteConstants;
+import org.osgi.service.remoteserviceadmin.RemoteServiceAdmin;
+
+public class EndpointRepositoryTest {
+
+    @Test
+    public void testAddRemove() throws InvalidSyntaxException {
+        EndpointDescription ep1 = createEndpoint("my");
+        
+        IMocksControl c = EasyMock.createControl();
+        ServiceReference<?> sref = createService(c);
+        RemoteServiceAdmin rsa = c.createMock(RemoteServiceAdmin.class);
+        EndpointListener notifier = c.createMock(EndpointListener.class);
+        
+        notifier.endpointAdded(ep1, null);
+        EasyMock.expectLastCall();
+        
+        c.replay();
+        EndpointRepository repo = new EndpointRepository();
+        repo.setNotifier(notifier);
+        List<EndpointDescription> endpoints = Arrays.asList(ep1);
+        repo.addEndpoints(sref, rsa, endpoints);
+        c.verify();
+
+        c.reset();
+        notifier.endpointRemoved(ep1, null);
+        EasyMock.expectLastCall();
+
+        c.replay();
+        repo.removeRemoteServiceAdmin(rsa);
+        c.verify();
+    }
+
+    private ServiceReference<?> createService(IMocksControl c) {
+        ServiceReference<?> sref = c.createMock(ServiceReference.class);
+        Bundle bundle = c.createMock(Bundle.class);
+        EasyMock.expect(bundle.getSymbolicName()).andReturn("myBundle");
+        EasyMock.expect(sref.getBundle()).andReturn(bundle);
+        return sref;
+    }
+
+    public EndpointDescription createEndpoint(String iface) {
+        Map<String, Object> props = new Hashtable<String, Object>(); 
+        props.put("objectClass", new String[]{iface});
+        props.put(RemoteConstants.ENDPOINT_ID, iface);
+        props.put(RemoteConstants.SERVICE_IMPORTED_CONFIGS, "any");
+        return new EndpointDescription(props);
+    }
+}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/d73a3a7f/topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/exporter/TopologyManagerExportTest.java
----------------------------------------------------------------------
diff --git a/topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/exporter/TopologyManagerExportTest.java b/topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/exporter/TopologyManagerExportTest.java
new file mode 100644
index 0000000..0eda150
--- /dev/null
+++ b/topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/exporter/TopologyManagerExportTest.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.dosgi.topologymanager.exporter;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executor;
+
+import org.apache.cxf.dosgi.dsw.api.ExportPolicy;
+import org.easymock.EasyMock;
+import org.easymock.IMocksControl;
+import org.junit.Test;
+import org.osgi.framework.Bundle;
+import org.osgi.framework.Constants;
+import org.osgi.framework.ServiceEvent;
+import org.osgi.framework.ServiceReference;
+import org.osgi.service.remoteserviceadmin.EndpointDescription;
+import org.osgi.service.remoteserviceadmin.EndpointListener;
+import org.osgi.service.remoteserviceadmin.ExportReference;
+import org.osgi.service.remoteserviceadmin.ExportRegistration;
+import org.osgi.service.remoteserviceadmin.RemoteConstants;
+import org.osgi.service.remoteserviceadmin.RemoteServiceAdmin;
+
+import static org.easymock.EasyMock.expectLastCall;
+
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class TopologyManagerExportTest {
+
+    /**
+     * This tests if the topology manager handles a service marked to be exported correctly by exporting it to
+     * an available RemoteServiceAdmin and notifying an EndpointListener afterwards.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testServiceExportUnexport() throws Exception {
+        IMocksControl c = EasyMock.createControl();
+        RemoteServiceAdmin rsa = c.createMock(RemoteServiceAdmin.class);
+        final EndpointListener notifier = c.createMock(EndpointListener.class);
+        final ServiceReference sref = createUserService(c);
+        EndpointDescription epd = createEndpoint();
+        expectServiceExported(c, rsa, notifier, sref, epd);
+        
+        c.replay();
+        EndpointRepository endpointRepo = new EndpointRepository();
+        endpointRepo.setNotifier(notifier);
+        Executor executor = syncExecutor();
+        ExportPolicy policy = new DefaultExportPolicy();
+        TopologyManagerExport exportManager = new TopologyManagerExport(endpointRepo, executor, policy);
+        exportManager.add(rsa);
+        exportManager.serviceChanged(new ServiceEvent(ServiceEvent.REGISTERED, sref));
+        c.verify();
+        
+        c.reset();
+        notifier.endpointRemoved(epd, null);
+        expectLastCall().once();
+        c.replay();
+        exportManager.serviceChanged(new ServiceEvent(ServiceEvent.UNREGISTERING, sref));
+        c.verify();
+        
+        c.reset();
+        c.replay();
+        exportManager.serviceChanged(new ServiceEvent(ServiceEvent.MODIFIED, sref));
+        c.verify();
+        
+        c.reset();
+        c.replay();
+        exportManager.remove(rsa);
+        c.verify();
+    }
+
+    @Test
+    public void testExportExisting() throws Exception {
+        IMocksControl c = EasyMock.createControl();
+        RemoteServiceAdmin rsa = c.createMock(RemoteServiceAdmin.class);
+        final EndpointListenerNotifier mockEpListenerNotifier = c.createMock(EndpointListenerNotifier.class);
+        final ServiceReference sref = createUserService(c);
+        expectServiceExported(c, rsa, mockEpListenerNotifier, sref, createEndpoint());
+        c.replay();
+
+        EndpointRepository endpointRepo = new EndpointRepository();
+        endpointRepo.setNotifier(mockEpListenerNotifier);
+        ExportPolicy policy = new DefaultExportPolicy();
+        TopologyManagerExport exportManager = new TopologyManagerExport(endpointRepo, syncExecutor(), policy);
+        exportManager.serviceChanged(new ServiceEvent(ServiceEvent.REGISTERED, sref));
+        exportManager.add(rsa);
+        c.verify();
+    }
+
+    private void expectServiceExported(IMocksControl c, RemoteServiceAdmin rsa,
+                                       final EndpointListener listener,
+                                       final ServiceReference sref, EndpointDescription epd) {
+        ExportRegistration exportRegistration = createExportRegistration(c, epd);
+        EasyMock.expect(rsa.exportService(EasyMock.same(sref), (Map<String, Object>)EasyMock.anyObject()))
+            .andReturn(Collections.singletonList(exportRegistration)).once();
+        listener.endpointAdded(epd, null); 
+        EasyMock.expectLastCall().once();
+    }
+
+    private Executor syncExecutor() {
+        return new Executor() {
+            @Override
+            public void execute(Runnable command) {
+                command.run();
+            }
+        };
+    }
+
+    private ExportRegistration createExportRegistration(IMocksControl c, EndpointDescription endpoint) {
+        ExportRegistration exportRegistration = c.createMock(ExportRegistration.class);
+        ExportReference exportReference = c.createMock(ExportReference.class);
+        EasyMock.expect(exportRegistration.getExportReference()).andReturn(exportReference).anyTimes();
+        EasyMock.expect(exportRegistration.getException()).andReturn(null).anyTimes();
+        EasyMock.expect(exportReference.getExportedEndpoint()).andReturn(endpoint).anyTimes();
+        return exportRegistration;
+    }
+
+    private EndpointDescription createEndpoint() {
+        Map<String, Object> props = new HashMap<String, Object>();
+        props.put(RemoteConstants.ENDPOINT_ID, "1");
+        props.put(Constants.OBJECTCLASS, new String[] {"abc"});
+        props.put(RemoteConstants.SERVICE_IMPORTED_CONFIGS, "cxf");
+        return new EndpointDescription(props);
+    }
+
+    private ServiceReference createUserService(IMocksControl c) {
+        final ServiceReference sref = c.createMock(ServiceReference.class);
+        EasyMock.expect(sref.getProperty(EasyMock.same(RemoteConstants.SERVICE_EXPORTED_INTERFACES)))
+            .andReturn("*").anyTimes();
+        Bundle srefBundle = c.createMock(Bundle.class);
+        EasyMock.expect(sref.getBundle()).andReturn(srefBundle).atLeastOnce();
+        EasyMock.expect(sref.getProperty("objectClass")).andReturn("org.My").anyTimes();
+        EasyMock.expect(srefBundle.getSymbolicName()).andReturn("serviceBundleName").atLeastOnce();
+        return sref;
+    }
+}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/d73a3a7f/topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/importer/EndpointListenerImplTest.java
----------------------------------------------------------------------
diff --git a/topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/importer/EndpointListenerImplTest.java b/topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/importer/EndpointListenerImplTest.java
new file mode 100644
index 0000000..c736197
--- /dev/null
+++ b/topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/importer/EndpointListenerImplTest.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.dosgi.topologymanager.importer;
+
+import java.util.Dictionary;
+import java.util.List;
+
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.easymock.IMocksControl;
+import org.junit.Assert;
+import org.junit.Test;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.remoteserviceadmin.EndpointListener;
+
+public class EndpointListenerImplTest extends Assert {
+
+    int testCase;
+
+    @SuppressWarnings({
+     "rawtypes", "unchecked"
+    })
+    @Test
+    public void testScopeChange() {
+        IMocksControl c = EasyMock.createNiceControl();
+        BundleContext bc = c.createMock(BundleContext.class);
+        TopologyManagerImport tm = c.createMock(TopologyManagerImport.class);
+        ServiceRegistration sr = c.createMock(ServiceRegistration.class);
+
+        // expect Listener registration
+        EasyMock.expect(bc.registerService(EasyMock.anyObject(Class.class),
+                                           EasyMock.anyObject(),
+                                           (Dictionary)EasyMock.anyObject())).andReturn(sr).atLeastOnce();
+
+        sr.setProperties((Dictionary)EasyMock.anyObject());
+
+        // expect property changes based on later calls
+        EasyMock.expectLastCall().andAnswer(new IAnswer<Object>() {
+
+            public Object answer() throws Throwable {
+                Object[] args = EasyMock.getCurrentArguments();
+                Dictionary props = (Dictionary)args[0];
+                List<String> scope = (List<String>)props.get(EndpointListener.ENDPOINT_LISTENER_SCOPE);
+                switch (testCase) {
+                case 1:
+                    assertEquals(1, scope.size());
+                    assertEquals("(a=b)", scope.get(0));
+                    break;
+                case 2:
+                    assertEquals(0, scope.size());
+                    break;
+                case 3:
+                    assertEquals("adding entry to empty list failed", 1, scope.size());
+                    assertEquals("(a=b)", scope.get(0));
+                    break;
+                case 4:
+                    assertEquals("adding second entry failed", 2, scope.size());
+                    assertNotNull(scope.contains("(a=b)"));
+                    assertNotNull(scope.contains("(c=d)"));
+                    break;
+                case 5:
+                    assertEquals("remove failed", 1, scope.size());
+                    assertEquals("(c=d)", scope.get(0));
+                    break;
+                default:
+                    assertTrue("This should not happen!", false);
+                }
+                return null;
+            }
+        }).atLeastOnce();
+
+        c.replay();
+
+        EndpointListenerManager endpointListener = new EndpointListenerManager(bc, tm);
+
+        endpointListener.start();
+
+        testCase = 1;
+        endpointListener.extendScope("(a=b)");
+        testCase = 2;
+        endpointListener.reduceScope("(a=b)");
+
+        testCase = 3;
+        endpointListener.extendScope("(a=b)");
+        testCase = 4;
+        endpointListener.extendScope("(c=d)");
+        testCase = 5;
+        endpointListener.reduceScope("(a=b)");
+
+        endpointListener.stop();
+
+        c.verify();
+    }
+}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/d73a3a7f/topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/importer/ListenerHookImplTest.java
----------------------------------------------------------------------
diff --git a/topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/importer/ListenerHookImplTest.java b/topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/importer/ListenerHookImplTest.java
new file mode 100644
index 0000000..1e2f90c
--- /dev/null
+++ b/topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/importer/ListenerHookImplTest.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.dosgi.topologymanager.importer;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Dictionary;
+import java.util.Hashtable;
+
+import org.easymock.EasyMock;
+import org.easymock.IMocksControl;
+import org.junit.Test;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.Filter;
+import org.osgi.framework.FrameworkUtil;
+import org.osgi.framework.InvalidSyntaxException;
+import org.osgi.framework.hooks.service.ListenerHook.ListenerInfo;
+import org.osgi.service.remoteserviceadmin.RemoteConstants;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class ListenerHookImplTest {
+
+    @Test
+    public void testExtendFilter() throws InvalidSyntaxException {
+        String filter = "(a=b)";
+        BundleContext bc = createBundleContext();
+        filter = new ListenerHookImpl(bc, null).extendFilter(filter);
+
+        Filter f = FrameworkUtil.createFilter(filter);
+
+        Dictionary<String, String> m = new Hashtable<String, String>();
+        m.put("a", "b");
+        assertTrue(filter + " filter must match as uuid is missing", f.match(m));
+        m.put(RemoteConstants.ENDPOINT_FRAMEWORK_UUID, "MyUUID");
+        assertFalse(filter + " filter must NOT match as uuid is the local one", f.match(m));
+    }
+    
+    @Test
+    public void testAddedRemoved() throws InvalidSyntaxException {
+        IMocksControl c = EasyMock.createControl();
+        String filter = "(objectClass=My)";
+        BundleContext bc = createBundleContext();
+        BundleContext listenerBc = createBundleContext();
+        ServiceInterestListener serviceInterestListener = c.createMock(ServiceInterestListener.class);
+        ListenerHookImpl listenerHook = new ListenerHookImpl(bc, serviceInterestListener);
+
+        ListenerInfo listener = c.createMock(ListenerInfo.class);
+        EasyMock.expect(listener.getBundleContext()).andReturn(listenerBc);
+        EasyMock.expect(listener.getFilter()).andReturn(filter).atLeastOnce();
+        
+        // Main assertions
+        serviceInterestListener.addServiceInterest(listenerHook.extendFilter(filter));
+        EasyMock.expectLastCall();
+        serviceInterestListener.removeServiceInterest(listenerHook.extendFilter(filter));
+        EasyMock.expectLastCall();
+
+        Collection<ListenerInfo> listeners = Collections.singletonList(listener);
+        
+        c.replay();
+        listenerHook.added(listeners);
+        listenerHook.removed(listeners);
+        c.verify();
+    }
+
+    private BundleContext createBundleContext() {
+        BundleContext bc = EasyMock.createNiceMock(BundleContext.class);
+        EasyMock.expect(bc.getProperty(EasyMock.eq("org.osgi.framework.uuid"))).andReturn("MyUUID").atLeastOnce();
+        EasyMock.replay(bc);
+        return bc;
+    }
+}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/d73a3a7f/topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/importer/ReferenceCounterTest.java
----------------------------------------------------------------------
diff --git a/topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/importer/ReferenceCounterTest.java b/topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/importer/ReferenceCounterTest.java
new file mode 100644
index 0000000..3ab78db
--- /dev/null
+++ b/topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/importer/ReferenceCounterTest.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.dosgi.topologymanager.importer;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class ReferenceCounterTest {
+
+    @Test
+    public void testCounter() {
+        ReferenceCounter<String> counter = new ReferenceCounter<String>();
+        assertEquals(-1, counter.remove("a"));
+        assertEquals(-1, counter.remove("a"));
+        assertEquals(1, counter.add("a"));
+        assertEquals(2, counter.add("a"));
+        assertEquals(3, counter.add("a"));
+        assertEquals(2, counter.remove("a"));
+        assertEquals(1, counter.remove("a"));
+        assertEquals(2, counter.add("a"));
+        assertEquals(1, counter.remove("a"));
+        assertEquals(0, counter.remove("a"));
+        assertEquals(-1, counter.remove("a"));
+    }
+}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/d73a3a7f/topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/importer/TopologyManagerImportTest.java
----------------------------------------------------------------------
diff --git a/topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/importer/TopologyManagerImportTest.java b/topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/importer/TopologyManagerImportTest.java
new file mode 100644
index 0000000..00be969
--- /dev/null
+++ b/topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/importer/TopologyManagerImportTest.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.dosgi.topologymanager.importer;
+
+import java.util.Dictionary;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.easymock.IMocksControl;
+import org.junit.Test;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.Constants;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.remoteserviceadmin.EndpointDescription;
+import org.osgi.service.remoteserviceadmin.ImportReference;
+import org.osgi.service.remoteserviceadmin.ImportRegistration;
+import org.osgi.service.remoteserviceadmin.RemoteServiceAdmin;
+
+import static org.junit.Assert.assertTrue;
+
+public class TopologyManagerImportTest {
+
+    @SuppressWarnings({
+     "rawtypes", "unchecked"
+    })
+    @Test
+    public void testImportForNewlyAddedRSA() throws InterruptedException {
+        IMocksControl c = EasyMock.createControl();
+
+        c.makeThreadSafe(true);
+
+        final Semaphore sema = new Semaphore(0);
+
+        ServiceRegistration sreg = c.createMock(ServiceRegistration.class);
+        sreg.unregister();
+        EasyMock.expectLastCall().once();
+        
+        BundleContext bc = c.createMock(BundleContext.class);
+        EasyMock.expect(bc.registerService(EasyMock.anyObject(Class.class),
+                                           EasyMock.anyObject(),
+                                           (Dictionary)EasyMock.anyObject())).andReturn(sreg).anyTimes();
+        EasyMock.expect(bc.getProperty(Constants.FRAMEWORK_UUID)).andReturn("myid");
+
+        EndpointDescription endpoint = c.createMock(EndpointDescription.class);
+        RemoteServiceAdmin rsa = c.createMock(RemoteServiceAdmin.class);
+        final ImportRegistration ireg = c.createMock(ImportRegistration.class);
+        EasyMock.expect(ireg.getException()).andReturn(null).anyTimes();
+        ImportReference iref = c.createMock(ImportReference.class);
+        EasyMock.expect(ireg.getImportReference()).andReturn(iref).anyTimes();
+        EasyMock.expect(iref.getImportedEndpoint()).andReturn(endpoint).anyTimes();
+
+        EasyMock.expect(rsa.importService(EasyMock.eq(endpoint))).andAnswer(new IAnswer<ImportRegistration>() {
+            public ImportRegistration answer() throws Throwable {
+                sema.release();
+                return ireg;
+            }
+        }).once();
+        c.replay();
+
+        TopologyManagerImport tm = new TopologyManagerImport(bc);
+        tm.start();
+        tm.endpointAdded(endpoint, "myFilter");
+        tm.add(rsa);
+        assertTrue("rsa.ImportService should have been called",
+                   sema.tryAcquire(100, TimeUnit.SECONDS));
+        tm.stop();
+        c.verify();
+    }
+}