You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aries.apache.org by cs...@apache.org on 2016/03/11 20:43:18 UTC
[19/50] [abbrv] aries-rsa git commit: [DOSGI-231] Create ExportPolicy
SPI
[DOSGI-231] Create ExportPolicy SPI
Project: http://git-wip-us.apache.org/repos/asf/aries-rsa/repo
Commit: http://git-wip-us.apache.org/repos/asf/aries-rsa/commit/8c7fbc8c
Tree: http://git-wip-us.apache.org/repos/asf/aries-rsa/tree/8c7fbc8c
Diff: http://git-wip-us.apache.org/repos/asf/aries-rsa/diff/8c7fbc8c
Branch: refs/heads/master
Commit: 8c7fbc8c5a76069eb5604cb0405fc888adb38139
Parents: c5ba7e4
Author: Christian Schneider <ch...@die-schneider.net>
Authored: Fri Mar 11 10:01:08 2016 +0100
Committer: Christian Schneider <ch...@die-schneider.net>
Committed: Fri Mar 11 10:01:08 2016 +0100
----------------------------------------------------------------------
.../apache/cxf/dosgi/dsw/api/ExportPolicy.java | 46 +++++++++
dsw/cxf-topology-manager/pom.xml | 8 +-
.../cxf/dosgi/topologymanager/Activator.java | 100 +++++++++++++++----
.../exporter/DefaultExportPolicy.java | 37 +++++++
.../exporter/EndpointListenerNotifier.java | 70 +++++++------
.../exporter/EndpointRepository.java | 38 ++++---
.../exporter/TopologyManagerExport.java | 40 +++++---
.../dosgi/topologymanager/ActivatorTest.java | 5 +-
.../exporter/EndpointListenerNotifierTest.java | 16 ++-
.../exporter/EndpointRepositoryTest.java | 82 +++++++++++++++
.../exporter/TopologyManagerExportTest.java | 27 ++---
11 files changed, 359 insertions(+), 110 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/8c7fbc8c/dsw/cxf-dosgi-provider-api/src/main/java/org/apache/cxf/dosgi/dsw/api/ExportPolicy.java
----------------------------------------------------------------------
diff --git a/dsw/cxf-dosgi-provider-api/src/main/java/org/apache/cxf/dosgi/dsw/api/ExportPolicy.java b/dsw/cxf-dosgi-provider-api/src/main/java/org/apache/cxf/dosgi/dsw/api/ExportPolicy.java
new file mode 100644
index 0000000..9e82c04
--- /dev/null
+++ b/dsw/cxf-dosgi-provider-api/src/main/java/org/apache/cxf/dosgi/dsw/api/ExportPolicy.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.dosgi.dsw.api;
+
+import java.util.Map;
+
+import org.osgi.framework.ServiceReference;
+
+
+/**
+ * SPI for TopologyManagerExport. Allows to export services that are
+ * not marked for export as well customize the way services are exported.
+ *
+ * Use cases:
+ * - Mandate SSL and basic authoriziation by adding the respective intents and configs
+ * - Add logging interceptor as intent
+ * - Remove exported interfaces to filter out services
+ */
+public interface ExportPolicy {
+ /**
+ * Allows to supply additional properties for a service that are then
+ * given to RemoteServiceAdmin. The service will be exported
+ * if the original service or the additional properties contain the
+ * non empty property service.exported.interfaces.
+ *
+ * @param service to export
+ * @return additional properties for exported Service (must not be null)
+ */
+ Map<String, ?> additionalParameters(ServiceReference<?> sref);
+}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/8c7fbc8c/dsw/cxf-topology-manager/pom.xml
----------------------------------------------------------------------
diff --git a/dsw/cxf-topology-manager/pom.xml b/dsw/cxf-topology-manager/pom.xml
index 2c64d28..b6c143e 100644
--- a/dsw/cxf-topology-manager/pom.xml
+++ b/dsw/cxf-topology-manager/pom.xml
@@ -39,12 +39,18 @@
<dependency>
<groupId>org.osgi</groupId>
<artifactId>org.osgi.core</artifactId>
+ <scope>provided</scope>
</dependency>
<dependency>
<groupId>org.osgi</groupId>
<artifactId>org.osgi.compendium</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.cxf.dosgi</groupId>
+ <artifactId>cxf-dosgi-ri-provider-api</artifactId>
+ <version>${project.version}</version>
</dependency>
-
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/8c7fbc8c/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/Activator.java
----------------------------------------------------------------------
diff --git a/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/Activator.java b/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/Activator.java
index 7c0e035..62ec1a9 100644
--- a/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/Activator.java
+++ b/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/Activator.java
@@ -18,17 +18,24 @@
*/
package org.apache.cxf.dosgi.topologymanager;
+import java.util.Dictionary;
+import java.util.Hashtable;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import org.apache.cxf.dosgi.dsw.api.ExportPolicy;
+import org.apache.cxf.dosgi.topologymanager.exporter.DefaultExportPolicy;
import org.apache.cxf.dosgi.topologymanager.exporter.EndpointListenerNotifier;
import org.apache.cxf.dosgi.topologymanager.exporter.EndpointRepository;
import org.apache.cxf.dosgi.topologymanager.exporter.TopologyManagerExport;
import org.apache.cxf.dosgi.topologymanager.importer.TopologyManagerImport;
import org.osgi.framework.BundleActivator;
import org.osgi.framework.BundleContext;
+import org.osgi.framework.Filter;
+import org.osgi.framework.FrameworkUtil;
import org.osgi.framework.InvalidSyntaxException;
+import org.osgi.framework.ServiceEvent;
import org.osgi.framework.ServiceReference;
import org.osgi.service.remoteserviceadmin.EndpointListener;
import org.osgi.service.remoteserviceadmin.RemoteConstants;
@@ -39,6 +46,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Activator implements BundleActivator {
+ public static final String RSA_EXPORT_POLICY_FILTER = "rsa.export.policy.filter";
static final String DOSGI_SERVICES = "(" + RemoteConstants.SERVICE_EXPORTED_INTERFACES + "=*)";
private static final Logger LOG = LoggerFactory.getLogger(Activator.class);
@@ -48,36 +56,52 @@ public class Activator implements BundleActivator {
private ServiceTracker<RemoteServiceAdmin, RemoteServiceAdmin> rsaTracker;
private ThreadPoolExecutor exportExecutor;
private ServiceTracker<EndpointListener, EndpointListener> epListenerTracker;
+ private ServiceTracker<ExportPolicy, ExportPolicy> policyTracker;
public void start(final BundleContext bc) throws Exception {
- LOG.debug("TopologyManager: start()");
- EndpointRepository endpointRepo = new EndpointRepository();
- notifier = new EndpointListenerNotifier(endpointRepo);
- epListenerTracker = new ServiceTracker<EndpointListener, EndpointListener>(bc, EndpointListener.class, null) {
- @Override
- public EndpointListener addingService(ServiceReference<EndpointListener> reference) {
- EndpointListener listener = super.addingService(reference);
- notifier.add(listener, EndpointListenerNotifier.getFiltersFromEndpointListenerScope(reference));
- return listener;
- }
-
+ Dictionary<String, String> props = new Hashtable<String, String>();
+ props.put("name", "default");
+ bc.registerService(ExportPolicy.class, new DefaultExportPolicy(), props);
+
+ Filter policyFilter = exportPolicyFilter(bc);
+ policyTracker = new ServiceTracker<ExportPolicy, ExportPolicy>(bc, policyFilter, null) {
+
@Override
- public void modifiedService(ServiceReference<EndpointListener> reference,
- EndpointListener listener) {
- super.modifiedService(reference, listener);
- notifier.add(listener, EndpointListenerNotifier.getFiltersFromEndpointListenerScope(reference));
+ public ExportPolicy addingService(ServiceReference<ExportPolicy> reference) {
+ ExportPolicy policy = super.addingService(reference);
+ if (exportManager == null) {
+ doStart(bc, policy);
+ }
+ return policy;
}
-
+
@Override
- public void removedService(ServiceReference<EndpointListener> reference,
- EndpointListener listener) {
- notifier.remove(listener);
- super.removedService(reference, listener);
+ public void removedService(ServiceReference<ExportPolicy> reference, ExportPolicy service) {
+ if (exportManager != null) {
+ doStop(bc);
+ }
+ super.removedService(reference, service);
}
};
+ policyTracker.open();
+ }
+
+ private Filter exportPolicyFilter(BundleContext bc) throws InvalidSyntaxException {
+ String filter = bc.getProperty(RSA_EXPORT_POLICY_FILTER);
+ if (filter == null) {
+ filter = "(name=default)";
+ }
+ return FrameworkUtil.createFilter(String.format("(&(objectClass=%s)%s)", ExportPolicy.class.getName(), filter));
+ }
+
+ public void doStart(final BundleContext bc, ExportPolicy policy) {
+ LOG.debug("TopologyManager: start()");
+ EndpointRepository endpointRepo = new EndpointRepository();
+ notifier = new EndpointListenerNotifier(endpointRepo);
+ epListenerTracker = new EndpointListenerTracker(bc);
endpointRepo.setNotifier(notifier);
exportExecutor = new ThreadPoolExecutor(5, 10, 50, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
- exportManager = new TopologyManagerExport(endpointRepo, exportExecutor);
+ exportManager = new TopologyManagerExport(endpointRepo, exportExecutor, policy);
importManager = new TopologyManagerImport(bc);
rsaTracker = new RSATracker(bc, RemoteServiceAdmin.class, null);
bc.addServiceListener(exportManager);
@@ -88,12 +112,17 @@ public class Activator implements BundleActivator {
}
public void stop(BundleContext bc) throws Exception {
+ policyTracker.close();
+ }
+
+ public void doStop(BundleContext bc) {
LOG.debug("TopologyManager: stop()");
epListenerTracker.close();
bc.removeServiceListener(exportManager);
exportExecutor.shutdown();
importManager.stop();
rsaTracker.close();
+ exportManager = null;
}
public void exportExistingServices(BundleContext context) {
@@ -102,7 +131,7 @@ public class Activator implements BundleActivator {
ServiceReference<?>[] references = context.getServiceReferences((String)null, DOSGI_SERVICES);
if (references != null) {
for (ServiceReference<?> sref : references) {
- exportManager.export(sref);
+ exportManager.serviceChanged(new ServiceEvent(ServiceEvent.REGISTERED, sref));
}
}
} catch (InvalidSyntaxException e) {
@@ -110,6 +139,33 @@ public class Activator implements BundleActivator {
}
}
+ private final class EndpointListenerTracker extends ServiceTracker<EndpointListener, EndpointListener> {
+ private EndpointListenerTracker(BundleContext context) {
+ super(context, EndpointListener.class, null);
+ }
+
+ @Override
+ public EndpointListener addingService(ServiceReference<EndpointListener> reference) {
+ EndpointListener listener = super.addingService(reference);
+ notifier.add(listener, EndpointListenerNotifier.getFiltersFromEndpointListenerScope(reference));
+ return listener;
+ }
+
+ @Override
+ public void modifiedService(ServiceReference<EndpointListener> reference,
+ EndpointListener listener) {
+ super.modifiedService(reference, listener);
+ notifier.add(listener, EndpointListenerNotifier.getFiltersFromEndpointListenerScope(reference));
+ }
+
+ @Override
+ public void removedService(ServiceReference<EndpointListener> reference,
+ EndpointListener listener) {
+ notifier.remove(listener);
+ super.removedService(reference, listener);
+ }
+ }
+
private final class RSATracker extends ServiceTracker<RemoteServiceAdmin, RemoteServiceAdmin> {
private RSATracker(BundleContext context, Class<RemoteServiceAdmin> clazz,
ServiceTrackerCustomizer<RemoteServiceAdmin, RemoteServiceAdmin> customizer) {
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/8c7fbc8c/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/exporter/DefaultExportPolicy.java
----------------------------------------------------------------------
diff --git a/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/exporter/DefaultExportPolicy.java b/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/exporter/DefaultExportPolicy.java
new file mode 100644
index 0000000..689ebab
--- /dev/null
+++ b/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/exporter/DefaultExportPolicy.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.dosgi.topologymanager.exporter;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.cxf.dosgi.dsw.api.ExportPolicy;
+import org.osgi.framework.ServiceReference;
+
+/**
+ * The default is to not customize the way services are exported
+ */
+public class DefaultExportPolicy implements ExportPolicy {
+
+ @Override
+ public Map<String, ?> additionalParameters(ServiceReference<?> sref) {
+ return new HashMap<String, Object>();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/8c7fbc8c/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/exporter/EndpointListenerNotifier.java
----------------------------------------------------------------------
diff --git a/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/exporter/EndpointListenerNotifier.java b/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/exporter/EndpointListenerNotifier.java
index cf0da0d..13d7dab 100644
--- a/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/exporter/EndpointListenerNotifier.java
+++ b/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/exporter/EndpointListenerNotifier.java
@@ -18,7 +18,6 @@
*/
package org.apache.cxf.dosgi.topologymanager.exporter;
-import java.util.Collection;
import java.util.Dictionary;
import java.util.HashSet;
import java.util.Hashtable;
@@ -38,9 +37,9 @@ import org.slf4j.LoggerFactory;
/**
* Tracks EndpointListeners and allows to notify them of endpoints.
*/
-public class EndpointListenerNotifier {
+public class EndpointListenerNotifier implements EndpointListener {
private static final Logger LOG = LoggerFactory.getLogger(EndpointListenerNotifier.class);
- public enum NotifyType { ADDED, REMOVED };
+ private enum NotifyType { ADDED, REMOVED };
private Map<EndpointListener, Set<Filter>> listeners;
private EndpointRepository endpointRepo;
@@ -48,11 +47,26 @@ public class EndpointListenerNotifier {
this.endpointRepo = endpointRepo;
this.listeners = new ConcurrentHashMap<EndpointListener, Set<Filter>>();
}
+
+ public static Set<Filter> getFiltersFromEndpointListenerScope(ServiceReference<EndpointListener> sref) {
+ Set<Filter> filters = new HashSet<Filter>();
+ String[] scopes = StringPlus.parse(sref.getProperty(EndpointListener.ENDPOINT_LISTENER_SCOPE));
+ for (String scope : scopes) {
+ try {
+ filters.add(FrameworkUtil.createFilter(scope));
+ } catch (InvalidSyntaxException e) {
+ LOG.error("invalid endpoint listener scope: {}", scope, e);
+ }
+ }
+ return filters;
+ }
public void add(EndpointListener ep, Set<Filter> filters) {
LOG.debug("new EndpointListener detected");
listeners.put(ep, filters);
- notifyListener(NotifyType.ADDED, ep, filters, endpointRepo.getAllEndpoints());
+ for (EndpointDescription endpoint : endpointRepo.getAllEndpoints()) {
+ notifyListener(NotifyType.ADDED, ep, filters, endpoint);
+ }
}
public void remove(EndpointListener ep) {
@@ -60,15 +74,25 @@ public class EndpointListenerNotifier {
listeners.remove(ep);
}
+ @Override
+ public void endpointAdded(EndpointDescription endpoint, String matchedFilter) {
+ notifyListeners(NotifyType.ADDED, endpoint);
+ }
+
+ @Override
+ public void endpointRemoved(EndpointDescription endpoint, String matchedFilter) {
+ notifyListeners(NotifyType.REMOVED, endpoint);
+ }
+
/**
* Notifies all endpoint listeners about endpoints being added or removed.
*
* @param added specifies whether endpoints were added (true) or removed (false)
* @param endpoints the endpoints the listeners should be notified about
*/
- public void notifyListeners(NotifyType type, Collection<EndpointDescription> endpoints) {
+ private void notifyListeners(NotifyType type, EndpointDescription endpoint) {
for (EndpointListener listener : listeners.keySet()) {
- notifyListener(type, listener, listeners.get(listener), endpoints);
+ notifyListener(type, listener, listeners.get(listener), endpoint);
}
}
@@ -79,34 +103,19 @@ public class EndpointListenerNotifier {
* @param endpointListenerRef the ServiceReference of an EndpointListener to notify
* @param endpoints the endpoints the listener should be notified about
*/
- void notifyListener(NotifyType type, EndpointListener listener, Set<Filter> filters,
- Collection<EndpointDescription> endpoints) {
+ private void notifyListener(NotifyType type, EndpointListener listener, Set<Filter> filters,
+ EndpointDescription endpoint) {
LOG.debug("Endpoint {}", type);
- for (EndpointDescription endpoint : endpoints) {
- Set<Filter> matchingFilters = getMatchingFilters(filters, endpoint);
- for (Filter filter : matchingFilters) {
- if (type == NotifyType.ADDED) {
- listener.endpointAdded(endpoint, filter.toString());
- } else {
- listener.endpointRemoved(endpoint, filter.toString());
- }
- }
- }
- }
-
- public static Set<Filter> getFiltersFromEndpointListenerScope(ServiceReference<EndpointListener> sref) {
- Set<Filter> filters = new HashSet<Filter>();
- String[] scopes = StringPlus.parse(sref.getProperty(EndpointListener.ENDPOINT_LISTENER_SCOPE));
- for (String scope : scopes) {
- try {
- filters.add(FrameworkUtil.createFilter(scope));
- } catch (InvalidSyntaxException e) {
- LOG.error("invalid endpoint listener scope: {}", scope, e);
+ Set<Filter> matchingFilters = getMatchingFilters(filters, endpoint);
+ for (Filter filter : matchingFilters) {
+ if (type == NotifyType.ADDED) {
+ listener.endpointAdded(endpoint, filter.toString());
+ } else {
+ listener.endpointRemoved(endpoint, filter.toString());
}
}
- return filters;
}
-
+
private static Set<Filter> getMatchingFilters(Set<Filter> filters, EndpointDescription endpoint) {
Set<Filter> matchingFilters = new HashSet<Filter>();
Dictionary<String, Object> dict = new Hashtable<String, Object>(endpoint.getProperties());
@@ -120,4 +129,5 @@ public class EndpointListenerNotifier {
}
return matchingFilters;
}
+
}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/8c7fbc8c/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/exporter/EndpointRepository.java
----------------------------------------------------------------------
diff --git a/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/exporter/EndpointRepository.java b/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/exporter/EndpointRepository.java
index 7984822..2a7bab3 100644
--- a/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/exporter/EndpointRepository.java
+++ b/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/exporter/EndpointRepository.java
@@ -26,9 +26,9 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-import org.apache.cxf.dosgi.topologymanager.exporter.EndpointListenerNotifier.NotifyType;
import org.osgi.framework.ServiceReference;
import org.osgi.service.remoteserviceadmin.EndpointDescription;
+import org.osgi.service.remoteserviceadmin.EndpointListener;
import org.osgi.service.remoteserviceadmin.RemoteServiceAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,9 +46,9 @@ public class EndpointRepository {
private final Map<ServiceReference, Map<RemoteServiceAdmin, Collection<EndpointDescription>>> exportedServices
= new LinkedHashMap<ServiceReference, Map<RemoteServiceAdmin, Collection<EndpointDescription>>>();
- private EndpointListenerNotifier notifier;
+ private EndpointListener notifier;
- public void setNotifier(EndpointListenerNotifier notifier) {
+ public void setNotifier(EndpointListener notifier) {
this.notifier = notifier;
}
@@ -69,38 +69,35 @@ public class EndpointRepository {
exports.remove(rsa);
}
}
- notifier.notifyListeners(NotifyType.REMOVED, removedEndpoints);
+ endpointsRemoved(removedEndpoints);
return removedEndpoints;
}
- synchronized void removeService(ServiceReference sref) {
+ public synchronized void removeService(ServiceReference sref) {
List<EndpointDescription> removedEndpoints = new ArrayList<EndpointDescription>();
- Map<RemoteServiceAdmin, Collection<EndpointDescription>> rsas = exportedServices.get(sref);
- if (rsas != null) {
- for (Collection<EndpointDescription> endpoints : rsas.values()) {
+ Map<RemoteServiceAdmin, Collection<EndpointDescription>> rsaToEndpoints = exportedServices.get(sref);
+ if (rsaToEndpoints != null) {
+ for (Collection<EndpointDescription> endpoints : rsaToEndpoints.values()) {
removedEndpoints.addAll(endpoints);
}
exportedServices.remove(sref);
}
- notifier.notifyListeners(NotifyType.REMOVED, removedEndpoints);
+ endpointsRemoved(removedEndpoints);
}
- synchronized void addService(ServiceReference sref) {
+ public synchronized void addService(ServiceReference sref) {
if (!exportedServices.containsKey(sref)) {
LOG.info("Marking service from bundle {} for export", sref.getBundle().getSymbolicName());
exportedServices.put(sref, new LinkedHashMap<RemoteServiceAdmin, Collection<EndpointDescription>>());
}
}
- synchronized void addEndpoints(ServiceReference sref, RemoteServiceAdmin rsa,
+ public synchronized void addEndpoints(ServiceReference sref, RemoteServiceAdmin rsa,
List<EndpointDescription> endpoints) {
- if (endpoints == null) {
- throw new NullPointerException();
- }
addService(sref);
Map<RemoteServiceAdmin, Collection<EndpointDescription>> exports = exportedServices.get(sref);
exports.put(rsa, endpoints);
- notifier.notifyListeners(NotifyType.ADDED, endpoints);
+ endpointsAdded(endpoints);
}
synchronized boolean isAlreadyExportedForRsa(ServiceReference sref, RemoteServiceAdmin rsa) {
@@ -129,4 +126,15 @@ public class EndpointRepository {
return servicesToBeExported;
}
+ private void endpointsAdded(List<EndpointDescription> endpoints) {
+ for (EndpointDescription epd : endpoints) {
+ notifier.endpointAdded(epd, null);
+ }
+ }
+
+ private void endpointsRemoved(List<EndpointDescription> endpoints) {
+ for (EndpointDescription epd : endpoints) {
+ notifier.endpointRemoved(epd, null);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/8c7fbc8c/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/exporter/TopologyManagerExport.java
----------------------------------------------------------------------
diff --git a/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/exporter/TopologyManagerExport.java b/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/exporter/TopologyManagerExport.java
index 2b1a281..ad3736c 100644
--- a/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/exporter/TopologyManagerExport.java
+++ b/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/exporter/TopologyManagerExport.java
@@ -22,9 +22,11 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
+import org.apache.cxf.dosgi.dsw.api.ExportPolicy;
import org.osgi.framework.Bundle;
import org.osgi.framework.ServiceEvent;
import org.osgi.framework.ServiceListener;
@@ -53,10 +55,13 @@ public class TopologyManagerExport implements ServiceListener {
private final Executor execService;
private final EndpointRepository endpointRepo;
+ private ExportPolicy policy;
private final Set<RemoteServiceAdmin> rsaSet;
- public TopologyManagerExport(final EndpointRepository endpointRepo, Executor executor) {
+
+ public TopologyManagerExport(final EndpointRepository endpointRepo, Executor executor, ExportPolicy policy) {
this.endpointRepo = endpointRepo;
+ this.policy = policy;
this.rsaSet = new HashSet<RemoteServiceAdmin>();
this.execService = executor;
}
@@ -67,22 +72,13 @@ public class TopologyManagerExport implements ServiceListener {
ServiceReference<?> sref = event.getServiceReference();
if (event.getType() == ServiceEvent.REGISTERED) {
LOG.debug("Received REGISTERED ServiceEvent: {}", event);
- if (shouldExportService(sref)) {
- export(sref);
- }
+ export(sref);
} else if (event.getType() == ServiceEvent.UNREGISTERING) {
LOG.debug("Received UNREGISTERING ServiceEvent: {}", event);
endpointRepo.removeService(sref);
}
}
- /**
- * checks if a Service is intended to be exported
- */
- private boolean shouldExportService(ServiceReference<?> sref) {
- return sref.getProperty(RemoteConstants.SERVICE_EXPORTED_INTERFACES) != null;
- }
-
public void add(RemoteServiceAdmin rsa) {
rsaSet.add(rsa);
for (ServiceReference<?> serviceRef : endpointRepo.getServicesToBeExportedFor(rsa)) {
@@ -95,7 +91,7 @@ public class TopologyManagerExport implements ServiceListener {
endpointRepo.removeRemoteServiceAdmin(rsa);
};
- public void export(final ServiceReference<?> sref) {
+ private void export(final ServiceReference<?> sref) {
execService.execute(new Runnable() {
public void run() {
doExport(sref);
@@ -104,6 +100,11 @@ public class TopologyManagerExport implements ServiceListener {
}
private void doExport(final ServiceReference<?> sref) {
+ Map<String, ?> addProps = policy.additionalParameters(sref);
+ if (!shouldExport(sref, addProps)) {
+ LOG.debug("Skipping service {}", sref);
+ return;
+ }
LOG.debug("Exporting service {}", sref);
endpointRepo.addService(sref); // mark for future export even if there are currently no RSAs
if (rsaSet.size() == 0) {
@@ -119,17 +120,26 @@ public class TopologyManagerExport implements ServiceListener {
// already handled by this remoteServiceAdmin
LOG.debug("already handled by this remoteServiceAdmin -> skipping");
} else {
- exportServiceUsingRemoteServiceAdmin(sref, remoteServiceAdmin);
+
+ exportServiceUsingRemoteServiceAdmin(sref, remoteServiceAdmin, addProps);
}
}
}
+ private boolean shouldExport(ServiceReference<?> sref, Map<String, ?> addProps) {
+ String exported = (String)sref.getProperty(RemoteConstants.SERVICE_EXPORTED_INTERFACES);
+ String addExported = (String)addProps.get(RemoteConstants.SERVICE_EXPORTED_INTERFACES);
+ String effectiveExported = addExported != null ? addExported : exported;
+ return (effectiveExported != null) && !effectiveExported.isEmpty();
+ }
+
private Object getSymbolicName(Bundle bundle) {
return bundle == null ? null : bundle.getSymbolicName();
}
private void exportServiceUsingRemoteServiceAdmin(final ServiceReference<?> sref,
- final RemoteServiceAdmin remoteServiceAdmin) {
+ final RemoteServiceAdmin remoteServiceAdmin,
+ Map<String, ?> addProps) {
// abort if the service was unregistered by the time we got here
// (we check again at the end, but this optimization saves unnecessary heavy processing)
if (sref.getBundle() == null) {
@@ -140,7 +150,7 @@ public class TopologyManagerExport implements ServiceListener {
// do the export
LOG.debug("exporting {}...", sref);
// TODO: additional parameter Map?
- Collection<ExportRegistration> exportRegs = remoteServiceAdmin.exportService(sref, null);
+ Collection<ExportRegistration> exportRegs = remoteServiceAdmin.exportService(sref, addProps);
// process successful/failed registrations
List<EndpointDescription> endpoints = new ArrayList<EndpointDescription>();
for (ExportRegistration reg : exportRegs) {
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/8c7fbc8c/dsw/cxf-topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/ActivatorTest.java
----------------------------------------------------------------------
diff --git a/dsw/cxf-topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/ActivatorTest.java b/dsw/cxf-topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/ActivatorTest.java
index f33199c..100e3a3 100644
--- a/dsw/cxf-topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/ActivatorTest.java
+++ b/dsw/cxf-topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/ActivatorTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.cxf.dosgi.topologymanager;
+import org.apache.cxf.dosgi.topologymanager.exporter.DefaultExportPolicy;
import org.apache.cxf.dosgi.topologymanager.exporter.TopologyManagerExport;
import org.easymock.Capture;
import org.easymock.EasyMock;
@@ -55,12 +56,12 @@ public class ActivatorTest {
c.replay();
Activator activator = new Activator();
- activator.start(context);
+ activator.doStart(context, new DefaultExportPolicy());
c.verify();
c.reset();
c.replay();
- activator.stop(context);
+ activator.doStop(context);
c.verify();
}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/8c7fbc8c/dsw/cxf-topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/exporter/EndpointListenerNotifierTest.java
----------------------------------------------------------------------
diff --git a/dsw/cxf-topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/exporter/EndpointListenerNotifierTest.java b/dsw/cxf-topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/exporter/EndpointListenerNotifierTest.java
index 95d35d9..04bd017 100644
--- a/dsw/cxf-topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/exporter/EndpointListenerNotifierTest.java
+++ b/dsw/cxf-topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/exporter/EndpointListenerNotifierTest.java
@@ -24,13 +24,9 @@ import java.util.Dictionary;
import java.util.HashSet;
import java.util.Hashtable;
import java.util.Iterator;
-import java.util.List;
import java.util.Map;
import java.util.Set;
-import static java.util.Arrays.asList;
-
-import org.apache.cxf.dosgi.topologymanager.exporter.EndpointListenerNotifier.NotifyType;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Test;
@@ -63,11 +59,13 @@ public class EndpointListenerNotifierTest {
EndpointListenerNotifier tm = new EndpointListenerNotifier(exportRepository);
EasyMock.replay(epl);
- List<EndpointDescription> endpoints = Arrays.asList(endpoint1, endpoint2);
Set<Filter> filters = new HashSet<Filter>();
filters.add(FrameworkUtil.createFilter("(objectClass=myClass)"));
- tm.notifyListener(NotifyType.ADDED, epl, filters, endpoints);
- tm.notifyListener(NotifyType.REMOVED, epl, filters, endpoints);
+ tm.add(epl, filters);
+ tm.endpointAdded(endpoint1, null);
+ tm.endpointAdded(endpoint2, null);
+ tm.endpointRemoved(endpoint1, null);
+ tm.endpointRemoved(endpoint2, null);
EasyMock.verify(epl);
}
@@ -97,8 +95,8 @@ public class EndpointListenerNotifierTest {
Set<Filter> filters = new HashSet<Filter>();
filters.add(FrameworkUtil.createFilter("(objectClass=myClass)"));
tm.add(epl, filters);
- tm.notifyListeners(NotifyType.ADDED, asList(endpoint1));
- tm.notifyListeners(NotifyType.REMOVED, asList(endpoint1));
+ tm.endpointAdded(endpoint1, null);
+ tm.endpointRemoved(endpoint1, null);
tm.remove(epl);
EasyMock.verify(epl);
}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/8c7fbc8c/dsw/cxf-topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/exporter/EndpointRepositoryTest.java
----------------------------------------------------------------------
diff --git a/dsw/cxf-topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/exporter/EndpointRepositoryTest.java b/dsw/cxf-topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/exporter/EndpointRepositoryTest.java
new file mode 100644
index 0000000..cb07f43
--- /dev/null
+++ b/dsw/cxf-topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/exporter/EndpointRepositoryTest.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.dosgi.topologymanager.exporter;
+
+import java.util.Arrays;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+
+import org.easymock.EasyMock;
+import org.easymock.IMocksControl;
+import org.junit.Test;
+import org.osgi.framework.Bundle;
+import org.osgi.framework.InvalidSyntaxException;
+import org.osgi.framework.ServiceReference;
+import org.osgi.service.remoteserviceadmin.EndpointDescription;
+import org.osgi.service.remoteserviceadmin.EndpointListener;
+import org.osgi.service.remoteserviceadmin.RemoteConstants;
+import org.osgi.service.remoteserviceadmin.RemoteServiceAdmin;
+
+public class EndpointRepositoryTest {
+
+ @Test
+ public void testAddRemove() throws InvalidSyntaxException {
+ EndpointDescription ep1 = createEndpoint("my");
+
+ IMocksControl c = EasyMock.createControl();
+ ServiceReference<?> sref = createService(c);
+ RemoteServiceAdmin rsa = c.createMock(RemoteServiceAdmin.class);
+ EndpointListener notifier = c.createMock(EndpointListener.class);
+
+ notifier.endpointAdded(ep1, null);
+ EasyMock.expectLastCall();
+
+ c.replay();
+ EndpointRepository repo = new EndpointRepository();
+ repo.setNotifier(notifier);
+ List<EndpointDescription> endpoints = Arrays.asList(ep1);
+ repo.addEndpoints(sref, rsa, endpoints);
+ c.verify();
+
+ c.reset();
+ notifier.endpointRemoved(ep1, null);
+ EasyMock.expectLastCall();
+
+ c.replay();
+ repo.removeRemoteServiceAdmin(rsa);
+ c.verify();
+ }
+
+ private ServiceReference<?> createService(IMocksControl c) {
+ ServiceReference<?> sref = c.createMock(ServiceReference.class);
+ Bundle bundle = c.createMock(Bundle.class);
+ EasyMock.expect(bundle.getSymbolicName()).andReturn("myBundle");
+ EasyMock.expect(sref.getBundle()).andReturn(bundle);
+ return sref;
+ }
+
+ public EndpointDescription createEndpoint(String iface) {
+ Map<String, Object> props = new Hashtable<String, Object>();
+ props.put("objectClass", new String[]{iface});
+ props.put(RemoteConstants.ENDPOINT_ID, iface);
+ props.put(RemoteConstants.SERVICE_IMPORTED_CONFIGS, "any");
+ return new EndpointDescription(props);
+ }
+}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/8c7fbc8c/dsw/cxf-topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/exporter/TopologyManagerExportTest.java
----------------------------------------------------------------------
diff --git a/dsw/cxf-topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/exporter/TopologyManagerExportTest.java b/dsw/cxf-topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/exporter/TopologyManagerExportTest.java
index 1355925..0eda150 100644
--- a/dsw/cxf-topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/exporter/TopologyManagerExportTest.java
+++ b/dsw/cxf-topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/exporter/TopologyManagerExportTest.java
@@ -18,16 +18,12 @@
*/
package org.apache.cxf.dosgi.topologymanager.exporter;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executor;
-import static java.util.Collections.emptyList;
-import static java.util.Collections.singletonList;
-
-import org.apache.cxf.dosgi.topologymanager.exporter.EndpointListenerNotifier.NotifyType;
+import org.apache.cxf.dosgi.dsw.api.ExportPolicy;
import org.easymock.EasyMock;
import org.easymock.IMocksControl;
import org.junit.Test;
@@ -36,12 +32,12 @@ import org.osgi.framework.Constants;
import org.osgi.framework.ServiceEvent;
import org.osgi.framework.ServiceReference;
import org.osgi.service.remoteserviceadmin.EndpointDescription;
+import org.osgi.service.remoteserviceadmin.EndpointListener;
import org.osgi.service.remoteserviceadmin.ExportReference;
import org.osgi.service.remoteserviceadmin.ExportRegistration;
import org.osgi.service.remoteserviceadmin.RemoteConstants;
import org.osgi.service.remoteserviceadmin.RemoteServiceAdmin;
-import static org.easymock.EasyMock.eq;
import static org.easymock.EasyMock.expectLastCall;
@SuppressWarnings({"rawtypes", "unchecked"})
@@ -57,7 +53,7 @@ public class TopologyManagerExportTest {
public void testServiceExportUnexport() throws Exception {
IMocksControl c = EasyMock.createControl();
RemoteServiceAdmin rsa = c.createMock(RemoteServiceAdmin.class);
- final EndpointListenerNotifier notifier = c.createMock(EndpointListenerNotifier.class);
+ final EndpointListener notifier = c.createMock(EndpointListener.class);
final ServiceReference sref = createUserService(c);
EndpointDescription epd = createEndpoint();
expectServiceExported(c, rsa, notifier, sref, epd);
@@ -66,13 +62,14 @@ public class TopologyManagerExportTest {
EndpointRepository endpointRepo = new EndpointRepository();
endpointRepo.setNotifier(notifier);
Executor executor = syncExecutor();
- TopologyManagerExport exportManager = new TopologyManagerExport(endpointRepo, executor);
+ ExportPolicy policy = new DefaultExportPolicy();
+ TopologyManagerExport exportManager = new TopologyManagerExport(endpointRepo, executor, policy);
exportManager.add(rsa);
exportManager.serviceChanged(new ServiceEvent(ServiceEvent.REGISTERED, sref));
c.verify();
c.reset();
- notifier.notifyListeners(eq(NotifyType.REMOVED), eq(singletonList(epd)));
+ notifier.endpointRemoved(epd, null);
expectLastCall().once();
c.replay();
exportManager.serviceChanged(new ServiceEvent(ServiceEvent.UNREGISTERING, sref));
@@ -84,8 +81,6 @@ public class TopologyManagerExportTest {
c.verify();
c.reset();
- notifier.notifyListeners(eq(NotifyType.REMOVED), eq((Collection)emptyList()));
- expectLastCall().once();
c.replay();
exportManager.remove(rsa);
c.verify();
@@ -102,20 +97,20 @@ public class TopologyManagerExportTest {
EndpointRepository endpointRepo = new EndpointRepository();
endpointRepo.setNotifier(mockEpListenerNotifier);
- TopologyManagerExport exportManager = new TopologyManagerExport(endpointRepo, syncExecutor());
- exportManager.export(sref);
+ ExportPolicy policy = new DefaultExportPolicy();
+ TopologyManagerExport exportManager = new TopologyManagerExport(endpointRepo, syncExecutor(), policy);
+ exportManager.serviceChanged(new ServiceEvent(ServiceEvent.REGISTERED, sref));
exportManager.add(rsa);
c.verify();
}
private void expectServiceExported(IMocksControl c, RemoteServiceAdmin rsa,
- final EndpointListenerNotifier mockEpListenerNotifier,
+ final EndpointListener listener,
final ServiceReference sref, EndpointDescription epd) {
ExportRegistration exportRegistration = createExportRegistration(c, epd);
EasyMock.expect(rsa.exportService(EasyMock.same(sref), (Map<String, Object>)EasyMock.anyObject()))
.andReturn(Collections.singletonList(exportRegistration)).once();
- mockEpListenerNotifier.notifyListeners(EasyMock.eq(NotifyType.ADDED),
- EasyMock.eq(Collections.singletonList(epd)));
+ listener.endpointAdded(epd, null);
EasyMock.expectLastCall().once();
}