You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aries.apache.org by cs...@apache.org on 2016/03/11 20:43:18 UTC

[19/50] [abbrv] aries-rsa git commit: [DOSGI-231] Create ExportPolicy SPI

[DOSGI-231] Create ExportPolicy SPI


Project: http://git-wip-us.apache.org/repos/asf/aries-rsa/repo
Commit: http://git-wip-us.apache.org/repos/asf/aries-rsa/commit/8c7fbc8c
Tree: http://git-wip-us.apache.org/repos/asf/aries-rsa/tree/8c7fbc8c
Diff: http://git-wip-us.apache.org/repos/asf/aries-rsa/diff/8c7fbc8c

Branch: refs/heads/master
Commit: 8c7fbc8c5a76069eb5604cb0405fc888adb38139
Parents: c5ba7e4
Author: Christian Schneider <ch...@die-schneider.net>
Authored: Fri Mar 11 10:01:08 2016 +0100
Committer: Christian Schneider <ch...@die-schneider.net>
Committed: Fri Mar 11 10:01:08 2016 +0100

----------------------------------------------------------------------
 .../apache/cxf/dosgi/dsw/api/ExportPolicy.java  |  46 +++++++++
 dsw/cxf-topology-manager/pom.xml                |   8 +-
 .../cxf/dosgi/topologymanager/Activator.java    | 100 +++++++++++++++----
 .../exporter/DefaultExportPolicy.java           |  37 +++++++
 .../exporter/EndpointListenerNotifier.java      |  70 +++++++------
 .../exporter/EndpointRepository.java            |  38 ++++---
 .../exporter/TopologyManagerExport.java         |  40 +++++---
 .../dosgi/topologymanager/ActivatorTest.java    |   5 +-
 .../exporter/EndpointListenerNotifierTest.java  |  16 ++-
 .../exporter/EndpointRepositoryTest.java        |  82 +++++++++++++++
 .../exporter/TopologyManagerExportTest.java     |  27 ++---
 11 files changed, 359 insertions(+), 110 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/8c7fbc8c/dsw/cxf-dosgi-provider-api/src/main/java/org/apache/cxf/dosgi/dsw/api/ExportPolicy.java
----------------------------------------------------------------------
diff --git a/dsw/cxf-dosgi-provider-api/src/main/java/org/apache/cxf/dosgi/dsw/api/ExportPolicy.java b/dsw/cxf-dosgi-provider-api/src/main/java/org/apache/cxf/dosgi/dsw/api/ExportPolicy.java
new file mode 100644
index 0000000..9e82c04
--- /dev/null
+++ b/dsw/cxf-dosgi-provider-api/src/main/java/org/apache/cxf/dosgi/dsw/api/ExportPolicy.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.dosgi.dsw.api;
+
+import java.util.Map;
+
+import org.osgi.framework.ServiceReference;
+
+
+/**
+ * SPI for TopologyManagerExport. Allows to export services that are
+ * not marked for export as well customize the way services are exported.
+ * 
+ * Use cases:
+ * - Mandate SSL and basic authoriziation by adding the respective intents and configs
+ * - Add logging interceptor as intent
+ * - Remove exported interfaces to filter out services
+ */
+public interface ExportPolicy {
+    /**
+     * Allows to supply additional properties for a service that are then
+     * given to RemoteServiceAdmin. The service will be exported
+     * if the original service or the additional properties contain the
+     * non empty property service.exported.interfaces. 
+     * 
+     * @param service to export
+     * @return additional properties for exported Service (must not be null)
+     */
+    Map<String, ?> additionalParameters(ServiceReference<?> sref);
+}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/8c7fbc8c/dsw/cxf-topology-manager/pom.xml
----------------------------------------------------------------------
diff --git a/dsw/cxf-topology-manager/pom.xml b/dsw/cxf-topology-manager/pom.xml
index 2c64d28..b6c143e 100644
--- a/dsw/cxf-topology-manager/pom.xml
+++ b/dsw/cxf-topology-manager/pom.xml
@@ -39,12 +39,18 @@
         <dependency>
             <groupId>org.osgi</groupId>
             <artifactId>org.osgi.core</artifactId>
+            <scope>provided</scope>
         </dependency>
         <dependency>
             <groupId>org.osgi</groupId>
             <artifactId>org.osgi.compendium</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.cxf.dosgi</groupId>
+            <artifactId>cxf-dosgi-ri-provider-api</artifactId>
+            <version>${project.version}</version>
         </dependency>
-
         <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-api</artifactId>

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/8c7fbc8c/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/Activator.java
----------------------------------------------------------------------
diff --git a/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/Activator.java b/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/Activator.java
index 7c0e035..62ec1a9 100644
--- a/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/Activator.java
+++ b/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/Activator.java
@@ -18,17 +18,24 @@
  */
 package org.apache.cxf.dosgi.topologymanager;
 
+import java.util.Dictionary;
+import java.util.Hashtable;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.cxf.dosgi.dsw.api.ExportPolicy;
+import org.apache.cxf.dosgi.topologymanager.exporter.DefaultExportPolicy;
 import org.apache.cxf.dosgi.topologymanager.exporter.EndpointListenerNotifier;
 import org.apache.cxf.dosgi.topologymanager.exporter.EndpointRepository;
 import org.apache.cxf.dosgi.topologymanager.exporter.TopologyManagerExport;
 import org.apache.cxf.dosgi.topologymanager.importer.TopologyManagerImport;
 import org.osgi.framework.BundleActivator;
 import org.osgi.framework.BundleContext;
+import org.osgi.framework.Filter;
+import org.osgi.framework.FrameworkUtil;
 import org.osgi.framework.InvalidSyntaxException;
+import org.osgi.framework.ServiceEvent;
 import org.osgi.framework.ServiceReference;
 import org.osgi.service.remoteserviceadmin.EndpointListener;
 import org.osgi.service.remoteserviceadmin.RemoteConstants;
@@ -39,6 +46,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class Activator implements BundleActivator {
+    public static final String RSA_EXPORT_POLICY_FILTER = "rsa.export.policy.filter";
     static final String DOSGI_SERVICES = "(" + RemoteConstants.SERVICE_EXPORTED_INTERFACES + "=*)";
     private static final Logger LOG = LoggerFactory.getLogger(Activator.class);
 
@@ -48,36 +56,52 @@ public class Activator implements BundleActivator {
     private ServiceTracker<RemoteServiceAdmin, RemoteServiceAdmin> rsaTracker;
     private ThreadPoolExecutor exportExecutor;
     private ServiceTracker<EndpointListener, EndpointListener> epListenerTracker;
+    private ServiceTracker<ExportPolicy, ExportPolicy> policyTracker;
 
     public void start(final BundleContext bc) throws Exception {
-        LOG.debug("TopologyManager: start()");
-        EndpointRepository endpointRepo = new EndpointRepository();
-        notifier = new EndpointListenerNotifier(endpointRepo);
-        epListenerTracker = new ServiceTracker<EndpointListener, EndpointListener>(bc, EndpointListener.class, null) {
-            @Override
-            public EndpointListener addingService(ServiceReference<EndpointListener> reference) {
-                EndpointListener listener = super.addingService(reference);
-                notifier.add(listener, EndpointListenerNotifier.getFiltersFromEndpointListenerScope(reference));
-                return listener;
-            }
-            
+        Dictionary<String, String> props = new Hashtable<String, String>();
+        props.put("name", "default");
+        bc.registerService(ExportPolicy.class, new DefaultExportPolicy(), props);
+
+        Filter policyFilter = exportPolicyFilter(bc);
+        policyTracker = new ServiceTracker<ExportPolicy, ExportPolicy>(bc, policyFilter, null) {
+
             @Override
-            public void modifiedService(ServiceReference<EndpointListener> reference,
-                                        EndpointListener listener) {
-                super.modifiedService(reference, listener);
-                notifier.add(listener, EndpointListenerNotifier.getFiltersFromEndpointListenerScope(reference));
+            public ExportPolicy addingService(ServiceReference<ExportPolicy> reference) {
+                ExportPolicy policy = super.addingService(reference);
+                if (exportManager == null) {
+                    doStart(bc, policy);
+                }
+                return policy;
             }
-            
+
             @Override
-            public void removedService(ServiceReference<EndpointListener> reference,
-                                       EndpointListener listener) {
-                notifier.remove(listener);
-                super.removedService(reference, listener);
+            public void removedService(ServiceReference<ExportPolicy> reference, ExportPolicy service) {
+                if (exportManager != null) {
+                    doStop(bc);
+                }
+                super.removedService(reference, service);
             }
         };
+        policyTracker.open();
+    }
+
+    private Filter exportPolicyFilter(BundleContext bc) throws InvalidSyntaxException {
+        String filter = bc.getProperty(RSA_EXPORT_POLICY_FILTER);
+        if (filter == null) {
+            filter = "(name=default)";
+        }
+        return FrameworkUtil.createFilter(String.format("(&(objectClass=%s)%s)", ExportPolicy.class.getName(), filter));
+    }
+
+    public void doStart(final BundleContext bc, ExportPolicy policy) {
+        LOG.debug("TopologyManager: start()");
+        EndpointRepository endpointRepo = new EndpointRepository();
+        notifier = new EndpointListenerNotifier(endpointRepo);
+        epListenerTracker = new EndpointListenerTracker(bc);
         endpointRepo.setNotifier(notifier);
         exportExecutor = new ThreadPoolExecutor(5, 10, 50, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
-        exportManager = new TopologyManagerExport(endpointRepo, exportExecutor);
+        exportManager = new TopologyManagerExport(endpointRepo, exportExecutor, policy);
         importManager = new TopologyManagerImport(bc);
         rsaTracker = new RSATracker(bc, RemoteServiceAdmin.class, null);
         bc.addServiceListener(exportManager);
@@ -88,12 +112,17 @@ public class Activator implements BundleActivator {
     }
 
     public void stop(BundleContext bc) throws Exception {
+        policyTracker.close();
+    }
+
+    public void doStop(BundleContext bc) {
         LOG.debug("TopologyManager: stop()");
         epListenerTracker.close();
         bc.removeServiceListener(exportManager);
         exportExecutor.shutdown();
         importManager.stop();
         rsaTracker.close();
+        exportManager = null;
     }
 
     public void exportExistingServices(BundleContext context) {
@@ -102,7 +131,7 @@ public class Activator implements BundleActivator {
             ServiceReference<?>[] references = context.getServiceReferences((String)null, DOSGI_SERVICES);
             if (references != null) {
                 for (ServiceReference<?> sref : references) {
-                    exportManager.export(sref);
+                    exportManager.serviceChanged(new ServiceEvent(ServiceEvent.REGISTERED, sref));
                 }
             }
         } catch (InvalidSyntaxException e) {
@@ -110,6 +139,33 @@ public class Activator implements BundleActivator {
         }
     }
     
+    private final class EndpointListenerTracker extends ServiceTracker<EndpointListener, EndpointListener> {
+        private EndpointListenerTracker(BundleContext context) {
+            super(context, EndpointListener.class, null);
+        }
+
+        @Override
+        public EndpointListener addingService(ServiceReference<EndpointListener> reference) {
+            EndpointListener listener = super.addingService(reference);
+            notifier.add(listener, EndpointListenerNotifier.getFiltersFromEndpointListenerScope(reference));
+            return listener;
+        }
+
+        @Override
+        public void modifiedService(ServiceReference<EndpointListener> reference,
+                                    EndpointListener listener) {
+            super.modifiedService(reference, listener);
+            notifier.add(listener, EndpointListenerNotifier.getFiltersFromEndpointListenerScope(reference));
+        }
+
+        @Override
+        public void removedService(ServiceReference<EndpointListener> reference,
+                                   EndpointListener listener) {
+            notifier.remove(listener);
+            super.removedService(reference, listener);
+        }
+    }
+
     private final class RSATracker extends ServiceTracker<RemoteServiceAdmin, RemoteServiceAdmin> {
         private RSATracker(BundleContext context, Class<RemoteServiceAdmin> clazz,
                            ServiceTrackerCustomizer<RemoteServiceAdmin, RemoteServiceAdmin> customizer) {

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/8c7fbc8c/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/exporter/DefaultExportPolicy.java
----------------------------------------------------------------------
diff --git a/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/exporter/DefaultExportPolicy.java b/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/exporter/DefaultExportPolicy.java
new file mode 100644
index 0000000..689ebab
--- /dev/null
+++ b/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/exporter/DefaultExportPolicy.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.dosgi.topologymanager.exporter;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.cxf.dosgi.dsw.api.ExportPolicy;
+import org.osgi.framework.ServiceReference;
+
+/**
+ * The default is to not customize the way services are exported
+ */
+public class DefaultExportPolicy implements ExportPolicy {
+
+    @Override
+    public Map<String, ?> additionalParameters(ServiceReference<?> sref) {
+        return new HashMap<String, Object>();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/8c7fbc8c/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/exporter/EndpointListenerNotifier.java
----------------------------------------------------------------------
diff --git a/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/exporter/EndpointListenerNotifier.java b/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/exporter/EndpointListenerNotifier.java
index cf0da0d..13d7dab 100644
--- a/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/exporter/EndpointListenerNotifier.java
+++ b/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/exporter/EndpointListenerNotifier.java
@@ -18,7 +18,6 @@
  */
 package org.apache.cxf.dosgi.topologymanager.exporter;
 
-import java.util.Collection;
 import java.util.Dictionary;
 import java.util.HashSet;
 import java.util.Hashtable;
@@ -38,9 +37,9 @@ import org.slf4j.LoggerFactory;
 /**
  * Tracks EndpointListeners and allows to notify them of endpoints.
  */
-public class EndpointListenerNotifier {
+public class EndpointListenerNotifier implements EndpointListener {
     private static final Logger LOG = LoggerFactory.getLogger(EndpointListenerNotifier.class);
-    public enum NotifyType { ADDED, REMOVED };
+    private enum NotifyType { ADDED, REMOVED };
     private Map<EndpointListener, Set<Filter>> listeners;
     private EndpointRepository endpointRepo;
 
@@ -48,11 +47,26 @@ public class EndpointListenerNotifier {
         this.endpointRepo = endpointRepo;
         this.listeners = new ConcurrentHashMap<EndpointListener, Set<Filter>>();
     }
+    
+    public static Set<Filter> getFiltersFromEndpointListenerScope(ServiceReference<EndpointListener> sref) {
+        Set<Filter> filters = new HashSet<Filter>();
+        String[] scopes = StringPlus.parse(sref.getProperty(EndpointListener.ENDPOINT_LISTENER_SCOPE));
+        for (String scope : scopes) {
+            try {
+                filters.add(FrameworkUtil.createFilter(scope));
+            } catch (InvalidSyntaxException e) {
+                LOG.error("invalid endpoint listener scope: {}", scope, e);
+            }
+        }
+        return filters;
+    }
 
     public void add(EndpointListener ep, Set<Filter> filters) {
         LOG.debug("new EndpointListener detected");
         listeners.put(ep, filters);
-        notifyListener(NotifyType.ADDED, ep, filters, endpointRepo.getAllEndpoints());
+        for (EndpointDescription endpoint : endpointRepo.getAllEndpoints()) {
+            notifyListener(NotifyType.ADDED, ep, filters, endpoint);
+        }
     }
     
     public void remove(EndpointListener ep) {
@@ -60,15 +74,25 @@ public class EndpointListenerNotifier {
         listeners.remove(ep);
     }
     
+    @Override
+    public void endpointAdded(EndpointDescription endpoint, String matchedFilter) {
+        notifyListeners(NotifyType.ADDED, endpoint);
+    }
+
+    @Override
+    public void endpointRemoved(EndpointDescription endpoint, String matchedFilter) {
+        notifyListeners(NotifyType.REMOVED, endpoint);
+    }
+
     /**
      * Notifies all endpoint listeners about endpoints being added or removed.
      *
      * @param added specifies whether endpoints were added (true) or removed (false)
      * @param endpoints the endpoints the listeners should be notified about
      */
-    public void notifyListeners(NotifyType type, Collection<EndpointDescription> endpoints) {
+    private void notifyListeners(NotifyType type, EndpointDescription endpoint) {
         for (EndpointListener listener : listeners.keySet()) {
-            notifyListener(type, listener, listeners.get(listener), endpoints);
+            notifyListener(type, listener, listeners.get(listener), endpoint);
         }
     }
 
@@ -79,34 +103,19 @@ public class EndpointListenerNotifier {
      * @param endpointListenerRef the ServiceReference of an EndpointListener to notify
      * @param endpoints the endpoints the listener should be notified about
      */
-    void notifyListener(NotifyType type, EndpointListener listener, Set<Filter> filters, 
-                        Collection<EndpointDescription> endpoints) {
+    private void notifyListener(NotifyType type, EndpointListener listener, Set<Filter> filters, 
+                        EndpointDescription endpoint) {
         LOG.debug("Endpoint {}", type);
-        for (EndpointDescription endpoint : endpoints) {
-            Set<Filter> matchingFilters = getMatchingFilters(filters, endpoint);
-            for (Filter filter : matchingFilters) {
-                if (type == NotifyType.ADDED) {
-                    listener.endpointAdded(endpoint, filter.toString());
-                } else {
-                    listener.endpointRemoved(endpoint, filter.toString());
-                }
-            }
-        }
-    }
-
-    public static Set<Filter> getFiltersFromEndpointListenerScope(ServiceReference<EndpointListener> sref) {
-        Set<Filter> filters = new HashSet<Filter>();
-        String[] scopes = StringPlus.parse(sref.getProperty(EndpointListener.ENDPOINT_LISTENER_SCOPE));
-        for (String scope : scopes) {
-            try {
-                filters.add(FrameworkUtil.createFilter(scope));
-            } catch (InvalidSyntaxException e) {
-                LOG.error("invalid endpoint listener scope: {}", scope, e);
+        Set<Filter> matchingFilters = getMatchingFilters(filters, endpoint);
+        for (Filter filter : matchingFilters) {
+            if (type == NotifyType.ADDED) {
+                listener.endpointAdded(endpoint, filter.toString());
+            } else {
+                listener.endpointRemoved(endpoint, filter.toString());
             }
         }
-        return filters;
     }
-
+    
     private static Set<Filter> getMatchingFilters(Set<Filter> filters, EndpointDescription endpoint) {
         Set<Filter> matchingFilters = new HashSet<Filter>();
         Dictionary<String, Object> dict = new Hashtable<String, Object>(endpoint.getProperties());
@@ -120,4 +129,5 @@ public class EndpointListenerNotifier {
         }
         return matchingFilters;
     }
+
 }

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/8c7fbc8c/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/exporter/EndpointRepository.java
----------------------------------------------------------------------
diff --git a/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/exporter/EndpointRepository.java b/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/exporter/EndpointRepository.java
index 7984822..2a7bab3 100644
--- a/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/exporter/EndpointRepository.java
+++ b/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/exporter/EndpointRepository.java
@@ -26,9 +26,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.cxf.dosgi.topologymanager.exporter.EndpointListenerNotifier.NotifyType;
 import org.osgi.framework.ServiceReference;
 import org.osgi.service.remoteserviceadmin.EndpointDescription;
+import org.osgi.service.remoteserviceadmin.EndpointListener;
 import org.osgi.service.remoteserviceadmin.RemoteServiceAdmin;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -46,9 +46,9 @@ public class EndpointRepository {
     private final Map<ServiceReference, Map<RemoteServiceAdmin, Collection<EndpointDescription>>> exportedServices
         = new LinkedHashMap<ServiceReference, Map<RemoteServiceAdmin, Collection<EndpointDescription>>>();
 
-    private EndpointListenerNotifier notifier;
+    private EndpointListener notifier;
     
-    public void setNotifier(EndpointListenerNotifier notifier) {
+    public void setNotifier(EndpointListener notifier) {
         this.notifier = notifier;
     }
     
@@ -69,38 +69,35 @@ public class EndpointRepository {
                 exports.remove(rsa);
             }
         }
-        notifier.notifyListeners(NotifyType.REMOVED, removedEndpoints);
+        endpointsRemoved(removedEndpoints);
         return removedEndpoints;
     }
 
-    synchronized void removeService(ServiceReference sref) {
+    public synchronized void removeService(ServiceReference sref) {
         List<EndpointDescription> removedEndpoints = new ArrayList<EndpointDescription>();
-        Map<RemoteServiceAdmin, Collection<EndpointDescription>> rsas = exportedServices.get(sref);
-        if (rsas != null) {
-            for (Collection<EndpointDescription> endpoints : rsas.values()) {
+        Map<RemoteServiceAdmin, Collection<EndpointDescription>> rsaToEndpoints = exportedServices.get(sref);
+        if (rsaToEndpoints != null) {
+            for (Collection<EndpointDescription> endpoints : rsaToEndpoints.values()) {
                 removedEndpoints.addAll(endpoints);
             }
             exportedServices.remove(sref);
         }
-        notifier.notifyListeners(NotifyType.REMOVED, removedEndpoints);
+        endpointsRemoved(removedEndpoints);
     }
 
-    synchronized void addService(ServiceReference sref) {
+    public synchronized void addService(ServiceReference sref) {
         if (!exportedServices.containsKey(sref)) {
             LOG.info("Marking service from bundle {} for export", sref.getBundle().getSymbolicName());
             exportedServices.put(sref, new LinkedHashMap<RemoteServiceAdmin, Collection<EndpointDescription>>());
         }
     }
 
-    synchronized void addEndpoints(ServiceReference sref, RemoteServiceAdmin rsa,
+    public synchronized void addEndpoints(ServiceReference sref, RemoteServiceAdmin rsa,
                                    List<EndpointDescription> endpoints) {
-        if (endpoints == null) {
-            throw new NullPointerException();
-        }
         addService(sref);
         Map<RemoteServiceAdmin, Collection<EndpointDescription>> exports = exportedServices.get(sref);
         exports.put(rsa, endpoints);
-        notifier.notifyListeners(NotifyType.ADDED, endpoints);
+        endpointsAdded(endpoints);
     }
 
     synchronized boolean isAlreadyExportedForRsa(ServiceReference sref, RemoteServiceAdmin rsa) {
@@ -129,4 +126,15 @@ public class EndpointRepository {
         return servicesToBeExported;
     }
 
+    private void endpointsAdded(List<EndpointDescription> endpoints) {
+        for (EndpointDescription epd : endpoints) {
+            notifier.endpointAdded(epd, null);
+        }
+    }
+    
+    private void endpointsRemoved(List<EndpointDescription> endpoints) {
+        for (EndpointDescription epd : endpoints) {
+            notifier.endpointRemoved(epd, null);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/8c7fbc8c/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/exporter/TopologyManagerExport.java
----------------------------------------------------------------------
diff --git a/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/exporter/TopologyManagerExport.java b/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/exporter/TopologyManagerExport.java
index 2b1a281..ad3736c 100644
--- a/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/exporter/TopologyManagerExport.java
+++ b/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/exporter/TopologyManagerExport.java
@@ -22,9 +22,11 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Executor;
 
+import org.apache.cxf.dosgi.dsw.api.ExportPolicy;
 import org.osgi.framework.Bundle;
 import org.osgi.framework.ServiceEvent;
 import org.osgi.framework.ServiceListener;
@@ -53,10 +55,13 @@ public class TopologyManagerExport implements ServiceListener {
 
     private final Executor execService;
     private final EndpointRepository endpointRepo;
+    private ExportPolicy policy;
     private final Set<RemoteServiceAdmin> rsaSet;
 
-    public TopologyManagerExport(final EndpointRepository endpointRepo, Executor executor) {
+
+    public TopologyManagerExport(final EndpointRepository endpointRepo, Executor executor, ExportPolicy policy) {
         this.endpointRepo = endpointRepo;
+        this.policy = policy;
         this.rsaSet = new HashSet<RemoteServiceAdmin>();
         this.execService = executor;
     }
@@ -67,22 +72,13 @@ public class TopologyManagerExport implements ServiceListener {
         ServiceReference<?> sref = event.getServiceReference();
         if (event.getType() == ServiceEvent.REGISTERED) {
             LOG.debug("Received REGISTERED ServiceEvent: {}", event);
-            if (shouldExportService(sref)) {
-                export(sref);
-            }
+            export(sref);
         } else if (event.getType() == ServiceEvent.UNREGISTERING) {
             LOG.debug("Received UNREGISTERING ServiceEvent: {}", event);
             endpointRepo.removeService(sref);
         }
     }
 
-    /**
-     * checks if a Service is intended to be exported
-     */
-    private boolean shouldExportService(ServiceReference<?> sref) {
-        return sref.getProperty(RemoteConstants.SERVICE_EXPORTED_INTERFACES) != null;
-    }
-
     public void add(RemoteServiceAdmin rsa) {
         rsaSet.add(rsa);
         for (ServiceReference<?> serviceRef : endpointRepo.getServicesToBeExportedFor(rsa)) {
@@ -95,7 +91,7 @@ public class TopologyManagerExport implements ServiceListener {
         endpointRepo.removeRemoteServiceAdmin(rsa);
     };
 
-    public void export(final ServiceReference<?> sref) {
+    private void export(final ServiceReference<?> sref) {
         execService.execute(new Runnable() {
             public void run() {
                 doExport(sref);
@@ -104,6 +100,11 @@ public class TopologyManagerExport implements ServiceListener {
     }
 
     private void doExport(final ServiceReference<?> sref) {
+        Map<String, ?> addProps = policy.additionalParameters(sref);
+        if (!shouldExport(sref, addProps)) {
+            LOG.debug("Skipping service {}", sref);
+            return;
+        }
         LOG.debug("Exporting service {}", sref);
         endpointRepo.addService(sref); // mark for future export even if there are currently no RSAs
         if (rsaSet.size() == 0) {
@@ -119,17 +120,26 @@ public class TopologyManagerExport implements ServiceListener {
                 // already handled by this remoteServiceAdmin
                 LOG.debug("already handled by this remoteServiceAdmin -> skipping");
             } else {
-                exportServiceUsingRemoteServiceAdmin(sref, remoteServiceAdmin);
+                
+                exportServiceUsingRemoteServiceAdmin(sref, remoteServiceAdmin, addProps);
             }
         }
     }
 
+    private boolean shouldExport(ServiceReference<?> sref, Map<String, ?> addProps) {
+        String exported = (String)sref.getProperty(RemoteConstants.SERVICE_EXPORTED_INTERFACES);
+        String addExported = (String)addProps.get(RemoteConstants.SERVICE_EXPORTED_INTERFACES);
+        String effectiveExported = addExported != null ? addExported : exported;
+        return (effectiveExported != null) && !effectiveExported.isEmpty();
+    }
+
     private Object getSymbolicName(Bundle bundle) {
         return bundle == null ? null : bundle.getSymbolicName();
     }
 
     private void exportServiceUsingRemoteServiceAdmin(final ServiceReference<?> sref,
-                                                      final RemoteServiceAdmin remoteServiceAdmin) {
+                                                      final RemoteServiceAdmin remoteServiceAdmin, 
+                                                      Map<String, ?> addProps) {
         // abort if the service was unregistered by the time we got here
         // (we check again at the end, but this optimization saves unnecessary heavy processing)
         if (sref.getBundle() == null) {
@@ -140,7 +150,7 @@ public class TopologyManagerExport implements ServiceListener {
         // do the export
         LOG.debug("exporting {}...", sref);
         // TODO: additional parameter Map?
-        Collection<ExportRegistration> exportRegs = remoteServiceAdmin.exportService(sref, null);
+        Collection<ExportRegistration> exportRegs = remoteServiceAdmin.exportService(sref, addProps);
         // process successful/failed registrations
         List<EndpointDescription> endpoints = new ArrayList<EndpointDescription>();
         for (ExportRegistration reg : exportRegs) {

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/8c7fbc8c/dsw/cxf-topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/ActivatorTest.java
----------------------------------------------------------------------
diff --git a/dsw/cxf-topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/ActivatorTest.java b/dsw/cxf-topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/ActivatorTest.java
index f33199c..100e3a3 100644
--- a/dsw/cxf-topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/ActivatorTest.java
+++ b/dsw/cxf-topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/ActivatorTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.cxf.dosgi.topologymanager;
 
+import org.apache.cxf.dosgi.topologymanager.exporter.DefaultExportPolicy;
 import org.apache.cxf.dosgi.topologymanager.exporter.TopologyManagerExport;
 import org.easymock.Capture;
 import org.easymock.EasyMock;
@@ -55,12 +56,12 @@ public class ActivatorTest {
 
         c.replay();
         Activator activator = new Activator();
-        activator.start(context);
+        activator.doStart(context, new DefaultExportPolicy());
         c.verify();
         
         c.reset();
         c.replay();
-        activator.stop(context);
+        activator.doStop(context);
         c.verify();
     }
 

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/8c7fbc8c/dsw/cxf-topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/exporter/EndpointListenerNotifierTest.java
----------------------------------------------------------------------
diff --git a/dsw/cxf-topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/exporter/EndpointListenerNotifierTest.java b/dsw/cxf-topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/exporter/EndpointListenerNotifierTest.java
index 95d35d9..04bd017 100644
--- a/dsw/cxf-topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/exporter/EndpointListenerNotifierTest.java
+++ b/dsw/cxf-topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/exporter/EndpointListenerNotifierTest.java
@@ -24,13 +24,9 @@ import java.util.Dictionary;
 import java.util.HashSet;
 import java.util.Hashtable;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import static java.util.Arrays.asList;
-
-import org.apache.cxf.dosgi.topologymanager.exporter.EndpointListenerNotifier.NotifyType;
 import org.easymock.EasyMock;
 import org.junit.Assert;
 import org.junit.Test;
@@ -63,11 +59,13 @@ public class EndpointListenerNotifierTest {
         EndpointListenerNotifier tm = new EndpointListenerNotifier(exportRepository);
 
         EasyMock.replay(epl);
-        List<EndpointDescription> endpoints = Arrays.asList(endpoint1, endpoint2);
         Set<Filter> filters = new HashSet<Filter>();
         filters.add(FrameworkUtil.createFilter("(objectClass=myClass)"));
-        tm.notifyListener(NotifyType.ADDED, epl, filters, endpoints);
-        tm.notifyListener(NotifyType.REMOVED, epl, filters, endpoints);
+        tm.add(epl, filters);
+        tm.endpointAdded(endpoint1, null);
+        tm.endpointAdded(endpoint2, null);
+        tm.endpointRemoved(endpoint1, null);
+        tm.endpointRemoved(endpoint2, null);
         EasyMock.verify(epl);
     }
 
@@ -97,8 +95,8 @@ public class EndpointListenerNotifierTest {
         Set<Filter> filters = new HashSet<Filter>();
         filters.add(FrameworkUtil.createFilter("(objectClass=myClass)"));
         tm.add(epl, filters);
-        tm.notifyListeners(NotifyType.ADDED, asList(endpoint1));
-        tm.notifyListeners(NotifyType.REMOVED, asList(endpoint1));
+        tm.endpointAdded(endpoint1, null);
+        tm.endpointRemoved(endpoint1, null);
         tm.remove(epl);
         EasyMock.verify(epl);
     }

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/8c7fbc8c/dsw/cxf-topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/exporter/EndpointRepositoryTest.java
----------------------------------------------------------------------
diff --git a/dsw/cxf-topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/exporter/EndpointRepositoryTest.java b/dsw/cxf-topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/exporter/EndpointRepositoryTest.java
new file mode 100644
index 0000000..cb07f43
--- /dev/null
+++ b/dsw/cxf-topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/exporter/EndpointRepositoryTest.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.dosgi.topologymanager.exporter;
+
+import java.util.Arrays;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+
+import org.easymock.EasyMock;
+import org.easymock.IMocksControl;
+import org.junit.Test;
+import org.osgi.framework.Bundle;
+import org.osgi.framework.InvalidSyntaxException;
+import org.osgi.framework.ServiceReference;
+import org.osgi.service.remoteserviceadmin.EndpointDescription;
+import org.osgi.service.remoteserviceadmin.EndpointListener;
+import org.osgi.service.remoteserviceadmin.RemoteConstants;
+import org.osgi.service.remoteserviceadmin.RemoteServiceAdmin;
+
+public class EndpointRepositoryTest {
+
+    @Test
+    public void testAddRemove() throws InvalidSyntaxException {
+        EndpointDescription ep1 = createEndpoint("my");
+        
+        IMocksControl c = EasyMock.createControl();
+        ServiceReference<?> sref = createService(c);
+        RemoteServiceAdmin rsa = c.createMock(RemoteServiceAdmin.class);
+        EndpointListener notifier = c.createMock(EndpointListener.class);
+        
+        notifier.endpointAdded(ep1, null);
+        EasyMock.expectLastCall();
+        
+        c.replay();
+        EndpointRepository repo = new EndpointRepository();
+        repo.setNotifier(notifier);
+        List<EndpointDescription> endpoints = Arrays.asList(ep1);
+        repo.addEndpoints(sref, rsa, endpoints);
+        c.verify();
+
+        c.reset();
+        notifier.endpointRemoved(ep1, null);
+        EasyMock.expectLastCall();
+
+        c.replay();
+        repo.removeRemoteServiceAdmin(rsa);
+        c.verify();
+    }
+
+    private ServiceReference<?> createService(IMocksControl c) {
+        ServiceReference<?> sref = c.createMock(ServiceReference.class);
+        Bundle bundle = c.createMock(Bundle.class);
+        EasyMock.expect(bundle.getSymbolicName()).andReturn("myBundle");
+        EasyMock.expect(sref.getBundle()).andReturn(bundle);
+        return sref;
+    }
+
+    public EndpointDescription createEndpoint(String iface) {
+        Map<String, Object> props = new Hashtable<String, Object>(); 
+        props.put("objectClass", new String[]{iface});
+        props.put(RemoteConstants.ENDPOINT_ID, iface);
+        props.put(RemoteConstants.SERVICE_IMPORTED_CONFIGS, "any");
+        return new EndpointDescription(props);
+    }
+}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/8c7fbc8c/dsw/cxf-topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/exporter/TopologyManagerExportTest.java
----------------------------------------------------------------------
diff --git a/dsw/cxf-topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/exporter/TopologyManagerExportTest.java b/dsw/cxf-topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/exporter/TopologyManagerExportTest.java
index 1355925..0eda150 100644
--- a/dsw/cxf-topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/exporter/TopologyManagerExportTest.java
+++ b/dsw/cxf-topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/exporter/TopologyManagerExportTest.java
@@ -18,16 +18,12 @@
  */
 package org.apache.cxf.dosgi.topologymanager.exporter;
 
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.Executor;
 
-import static java.util.Collections.emptyList;
-import static java.util.Collections.singletonList;
-
-import org.apache.cxf.dosgi.topologymanager.exporter.EndpointListenerNotifier.NotifyType;
+import org.apache.cxf.dosgi.dsw.api.ExportPolicy;
 import org.easymock.EasyMock;
 import org.easymock.IMocksControl;
 import org.junit.Test;
@@ -36,12 +32,12 @@ import org.osgi.framework.Constants;
 import org.osgi.framework.ServiceEvent;
 import org.osgi.framework.ServiceReference;
 import org.osgi.service.remoteserviceadmin.EndpointDescription;
+import org.osgi.service.remoteserviceadmin.EndpointListener;
 import org.osgi.service.remoteserviceadmin.ExportReference;
 import org.osgi.service.remoteserviceadmin.ExportRegistration;
 import org.osgi.service.remoteserviceadmin.RemoteConstants;
 import org.osgi.service.remoteserviceadmin.RemoteServiceAdmin;
 
-import static org.easymock.EasyMock.eq;
 import static org.easymock.EasyMock.expectLastCall;
 
 @SuppressWarnings({"rawtypes", "unchecked"})
@@ -57,7 +53,7 @@ public class TopologyManagerExportTest {
     public void testServiceExportUnexport() throws Exception {
         IMocksControl c = EasyMock.createControl();
         RemoteServiceAdmin rsa = c.createMock(RemoteServiceAdmin.class);
-        final EndpointListenerNotifier notifier = c.createMock(EndpointListenerNotifier.class);
+        final EndpointListener notifier = c.createMock(EndpointListener.class);
         final ServiceReference sref = createUserService(c);
         EndpointDescription epd = createEndpoint();
         expectServiceExported(c, rsa, notifier, sref, epd);
@@ -66,13 +62,14 @@ public class TopologyManagerExportTest {
         EndpointRepository endpointRepo = new EndpointRepository();
         endpointRepo.setNotifier(notifier);
         Executor executor = syncExecutor();
-        TopologyManagerExport exportManager = new TopologyManagerExport(endpointRepo, executor);
+        ExportPolicy policy = new DefaultExportPolicy();
+        TopologyManagerExport exportManager = new TopologyManagerExport(endpointRepo, executor, policy);
         exportManager.add(rsa);
         exportManager.serviceChanged(new ServiceEvent(ServiceEvent.REGISTERED, sref));
         c.verify();
         
         c.reset();
-        notifier.notifyListeners(eq(NotifyType.REMOVED), eq(singletonList(epd)));
+        notifier.endpointRemoved(epd, null);
         expectLastCall().once();
         c.replay();
         exportManager.serviceChanged(new ServiceEvent(ServiceEvent.UNREGISTERING, sref));
@@ -84,8 +81,6 @@ public class TopologyManagerExportTest {
         c.verify();
         
         c.reset();
-        notifier.notifyListeners(eq(NotifyType.REMOVED), eq((Collection)emptyList()));
-        expectLastCall().once();
         c.replay();
         exportManager.remove(rsa);
         c.verify();
@@ -102,20 +97,20 @@ public class TopologyManagerExportTest {
 
         EndpointRepository endpointRepo = new EndpointRepository();
         endpointRepo.setNotifier(mockEpListenerNotifier);
-        TopologyManagerExport exportManager = new TopologyManagerExport(endpointRepo, syncExecutor());
-        exportManager.export(sref);
+        ExportPolicy policy = new DefaultExportPolicy();
+        TopologyManagerExport exportManager = new TopologyManagerExport(endpointRepo, syncExecutor(), policy);
+        exportManager.serviceChanged(new ServiceEvent(ServiceEvent.REGISTERED, sref));
         exportManager.add(rsa);
         c.verify();
     }
 
     private void expectServiceExported(IMocksControl c, RemoteServiceAdmin rsa,
-                                       final EndpointListenerNotifier mockEpListenerNotifier,
+                                       final EndpointListener listener,
                                        final ServiceReference sref, EndpointDescription epd) {
         ExportRegistration exportRegistration = createExportRegistration(c, epd);
         EasyMock.expect(rsa.exportService(EasyMock.same(sref), (Map<String, Object>)EasyMock.anyObject()))
             .andReturn(Collections.singletonList(exportRegistration)).once();
-        mockEpListenerNotifier.notifyListeners(EasyMock.eq(NotifyType.ADDED), 
-                                               EasyMock.eq(Collections.singletonList(epd)));
+        listener.endpointAdded(epd, null); 
         EasyMock.expectLastCall().once();
     }