You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@karaf.apache.org by io...@apache.org on 2011/12/26 17:06:25 UTC
svn commit: r1224744 - in /karaf/cellar/trunk:
dosgi/src/main/java/org/apache/karaf/cellar/dosgi/ImportServiceListener.java
itests/src/test/java/org/apache/karaf/cellar/itests/CellarSampleDosgiGreeterTest.java
Author: iocanel
Date: Mon Dec 26 16:06:25 2011
New Revision: 1224744
URL: http://svn.apache.org/viewvc?rev=1224744&view=rev
Log:
[KARAF-823] Cellar will import remote services, even if they are not present at the time of the listener registration.
Modified:
karaf/cellar/trunk/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/ImportServiceListener.java
karaf/cellar/trunk/itests/src/test/java/org/apache/karaf/cellar/itests/CellarSampleDosgiGreeterTest.java
Modified: karaf/cellar/trunk/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/ImportServiceListener.java
URL: http://svn.apache.org/viewvc/karaf/cellar/trunk/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/ImportServiceListener.java?rev=1224744&r1=1224743&r2=1224744&view=diff
==============================================================================
--- karaf/cellar/trunk/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/ImportServiceListener.java (original)
+++ karaf/cellar/trunk/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/ImportServiceListener.java Mon Dec 26 16:06:25 2011
@@ -33,11 +33,15 @@ import java.util.Hashtable;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
/**
* Listener for the service import.
*/
-public class ImportServiceListener implements ListenerHook {
+public class ImportServiceListener implements ListenerHook, Runnable {
private static final transient Logger LOGGER = LoggerFactory.getLogger(ImportServiceListener.class);
@@ -45,18 +49,24 @@ public class ImportServiceListener imple
private ClusterManager clusterManager;
private CommandStore commandStore;
private EventTransportFactory eventTransportFactory;
- private Map<String,EndpointDescription> remoteEndpoints;
+ private Map<String, EndpointDescription> remoteEndpoints;
+
+ private Set<ListenerInfo> pendingListeners = new LinkedHashSet<ListenerInfo>();
private final Map<EndpointDescription, ServiceRegistration> registrations = new HashMap<EndpointDescription, ServiceRegistration>();
private final Map<String, EventProducer> producers = new HashMap<String, EventProducer>();
private final Map<String, EventConsumer> consumers = new HashMap<String, EventConsumer>();
+ private final ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
+
public void init() {
remoteEndpoints = clusterManager.getMap(Constants.REMOTE_ENDPOINTS);
+ service.scheduleAtFixedRate(this, 0, 5, TimeUnit.SECONDS);
}
public void destroy() {
+ service.shutdown();
for (Map.Entry<EndpointDescription, ServiceRegistration> entry : registrations.entrySet()) {
ServiceRegistration registration = entry.getValue();
registration.unregister();
@@ -67,7 +77,13 @@ public class ImportServiceListener imple
}
consumers.clear();
producers.clear();
+ }
+
+ public void run() {
+ for (ListenerInfo listener : pendingListeners) {
+ checkListener(listener);
+ }
}
@Override
@@ -81,20 +97,10 @@ public class ImportServiceListener imple
continue;
}
+ pendingListeners.add(listenerInfo);
// Make sure we only import remote services
+ checkListener(listenerInfo);
- // Iterate through known services and import them if needed
- Set<EndpointDescription> matches = new LinkedHashSet<EndpointDescription>();
- for (Map.Entry<String,EndpointDescription> entry : remoteEndpoints.entrySet()) {
- EndpointDescription endpointDescription = entry.getValue();
- if (endpointDescription.matches(listenerInfo.getFilter()) && !endpointDescription.getNodes().contains(clusterManager.getNode().getId())) {
- matches.add(endpointDescription);
- }
- }
-
- for (EndpointDescription endpoint : matches) {
- importService(endpoint, listenerInfo);
- }
}
} finally {
Thread.currentThread().setContextClassLoader(originalClassLoader);
@@ -115,7 +121,7 @@ public class ImportServiceListener imple
String filter = "(&" + listenerInfo.getFilter() + "(!(" + Constants.ENDPOINT_FRAMEWORK_UUID + "=" + clusterManager.getNode().getId() + ")))";
// Iterate through known services and import them if needed
Set<EndpointDescription> matches = new LinkedHashSet<EndpointDescription>();
- for (Map.Entry<String,EndpointDescription> entry : remoteEndpoints.entrySet()) {
+ for (Map.Entry<String, EndpointDescription> entry : remoteEndpoints.entrySet()) {
EndpointDescription endpointDescription = entry.getValue();
if (endpointDescription.matches(filter)) {
matches.add(endpointDescription);
@@ -125,6 +131,34 @@ public class ImportServiceListener imple
for (EndpointDescription endpoint : matches) {
unimportService(endpoint);
}
+
+ pendingListeners.remove(listenerInfo);
+ }
+ } finally {
+ Thread.currentThread().setContextClassLoader(originalClassLoader);
+ }
+ }
+
+ /**
+ * Checks if there is a match for the current {@link ListenerInfo}.
+ *
+ * @param listenerInfo
+ */
+ private void checkListener(ListenerInfo listenerInfo) {
+ ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();
+ try {
+ Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+ // Iterate through known services and import them if needed
+ Set<EndpointDescription> matches = new LinkedHashSet<EndpointDescription>();
+ for (Map.Entry<String, EndpointDescription> entry : remoteEndpoints.entrySet()) {
+ EndpointDescription endpointDescription = entry.getValue();
+ if (endpointDescription.matches(listenerInfo.getFilter()) && !endpointDescription.getNodes().contains(clusterManager.getNode().getId())) {
+ matches.add(endpointDescription);
+ }
+ }
+
+ for (EndpointDescription endpoint : matches) {
+ importService(endpoint, listenerInfo);
}
} finally {
Thread.currentThread().setContextClassLoader(originalClassLoader);
@@ -141,29 +175,30 @@ public class ImportServiceListener imple
LOGGER.debug("CELLAR DOSGI: importing remote service");
EventProducer requestProducer = producers.get(endpoint.getId());
- if(requestProducer == null) {
- requestProducer = eventTransportFactory.getEventProducer(Constants.INTERFACE_PREFIX + Constants.SEPARATOR + endpoint.getId(),Boolean.FALSE);
- producers.put(endpoint.getId(),requestProducer);
+ if (requestProducer == null) {
+ requestProducer = eventTransportFactory.getEventProducer(Constants.INTERFACE_PREFIX + Constants.SEPARATOR + endpoint.getId(), Boolean.FALSE);
+ producers.put(endpoint.getId(), requestProducer);
}
EventConsumer resultConsumer = consumers.get(endpoint.getId());
- if(resultConsumer == null) {
+ if (resultConsumer == null) {
resultConsumer = eventTransportFactory.getEventConsumer(Constants.RESULT_PREFIX + Constants.SEPARATOR + clusterManager.getNode().getId() + endpoint.getId(), Boolean.FALSE);
- consumers.put(endpoint.getId(),resultConsumer);
- } else if(!resultConsumer.isConsuming()) {
+ consumers.put(endpoint.getId(), resultConsumer);
+ } else if (!resultConsumer.isConsuming()) {
resultConsumer.start();
}
- producers.put(endpoint.getId(),requestProducer);
- consumers.put(endpoint.getId(),resultConsumer);
+ producers.put(endpoint.getId(), requestProducer);
+ consumers.put(endpoint.getId(), resultConsumer);
- ExecutionContext executionContext = new ClusteredExecutionContext(requestProducer,commandStore);
+ ExecutionContext executionContext = new ClusteredExecutionContext(requestProducer, commandStore);
RemoteServiceFactory remoteServiceFactory = new RemoteServiceFactory(endpoint, clusterManager, executionContext);
ServiceRegistration registration = listenerInfo.getBundleContext().registerService(endpoint.getServiceClass(),
remoteServiceFactory,
new Hashtable<String, Object>(endpoint.getProperties()));
registrations.put(endpoint, registration);
+ pendingListeners.remove(listenerInfo);
}
/**
@@ -177,7 +212,7 @@ public class ImportServiceListener imple
producers.remove(endpoint.getId());
EventConsumer consumer = consumers.remove(endpoint.getId());
- if(consumer != null) {
+ if (consumer != null) {
consumer.stop();
}
}
Modified: karaf/cellar/trunk/itests/src/test/java/org/apache/karaf/cellar/itests/CellarSampleDosgiGreeterTest.java
URL: http://svn.apache.org/viewvc/karaf/cellar/trunk/itests/src/test/java/org/apache/karaf/cellar/itests/CellarSampleDosgiGreeterTest.java?rev=1224744&r1=1224743&r2=1224744&view=diff
==============================================================================
--- karaf/cellar/trunk/itests/src/test/java/org/apache/karaf/cellar/itests/CellarSampleDosgiGreeterTest.java (original)
+++ karaf/cellar/trunk/itests/src/test/java/org/apache/karaf/cellar/itests/CellarSampleDosgiGreeterTest.java Mon Dec 26 16:06:25 2011
@@ -72,13 +72,12 @@ public class CellarSampleDosgiGreeterTes
System.err.println(executeCommand("cluster:group-set service-grp "+node1));
System.err.println(executeCommand("cluster:group-list"));
+ System.err.println(executeCommand("cluster:features-install client-grp greeter-client"));
+ Thread.sleep(10000);
System.err.println(executeCommand("cluster:features-install service-grp greeter-service"));
Thread.sleep(10000);
System.err.println(executeCommand("cluster:list-services"));
- System.err.println(executeCommand("cluster:features-install client-grp greeter-client"));
- Thread.sleep(10000);
- System.err.println(executeCommand("features:list"));
- System.err.println(executeCommand("osgi:list"));
+
String greetOutput = executeCommand("dosgi-greeter:greet Hi 10");
System.err.println(greetOutput);
assertEquals("Expected 10 greets", 10, countGreetsFromNode(greetOutput, node1));
@@ -120,6 +119,6 @@ public class CellarSampleDosgiGreeterTes
@Configuration
public Option[] config() {
return new Option[]{
- cellarDistributionConfiguration(), keepRuntimeFolder(),logLevel(LogLevelOption.LogLevel.ERROR)};
+ cellarDistributionConfiguration(), keepRuntimeFolder(),logLevel(LogLevelOption.LogLevel.ERROR),debugConfiguration("5005",true)};
}
}