You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@karaf.apache.org by jb...@apache.org on 2015/09/20 11:33:39 UTC
[2/2] karaf-cellar git commit: Merge branch
'feature_add_service_tracker_to_unregist_remove_service' of
https://github.com/FlavioF/karaf-cellar
Merge branch 'feature_add_service_tracker_to_unregist_remove_service' of https://github.com/FlavioF/karaf-cellar
Conflicts:
dosgi/src/main/resources/OSGI-INF/blueprint/blueprint.xml
Project: http://git-wip-us.apache.org/repos/asf/karaf-cellar/repo
Commit: http://git-wip-us.apache.org/repos/asf/karaf-cellar/commit/778fcd4c
Tree: http://git-wip-us.apache.org/repos/asf/karaf-cellar/tree/778fcd4c
Diff: http://git-wip-us.apache.org/repos/asf/karaf-cellar/diff/778fcd4c
Branch: refs/heads/master
Commit: 778fcd4c2fbc852ac9b1b0d78673b2a07590ea47
Parents: 9e42597 de8162f
Author: Jean-Baptiste Onofré <jb...@apache.org>
Authored: Sun Sep 20 11:33:28 2015 +0200
Committer: Jean-Baptiste Onofré <jb...@apache.org>
Committed: Sun Sep 20 11:33:28 2015 +0200
----------------------------------------------------------------------
.../cellar/dosgi/internal/osgi/Activator.java | 12 ++
.../osgi/RemovedNodeServiceTracker.java | 111 +++++++++++++++++++
.../hazelcast/HazelcastClusterManager.java | 6 +-
.../cellar/hazelcast/HazelcastGroupManager.java | 2 +-
.../hazelcast/HazelcastInstanceAware.java | 2 +-
.../karaf/cellar/hazelcast/QueueConsumer.java | 36 +++---
6 files changed, 149 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/karaf-cellar/blob/778fcd4c/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/internal/osgi/Activator.java
----------------------------------------------------------------------
diff --cc dosgi/src/main/java/org/apache/karaf/cellar/dosgi/internal/osgi/Activator.java
index b3c924f,0000000..b7b360e
mode 100644,000000..100644
--- a/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/internal/osgi/Activator.java
+++ b/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/internal/osgi/Activator.java
@@@ -1,127 -1,0 +1,139 @@@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.karaf.cellar.dosgi.internal.osgi;
+
+import org.apache.karaf.cellar.core.ClusterManager;
+import org.apache.karaf.cellar.core.command.CommandStore;
+import org.apache.karaf.cellar.core.event.EventHandler;
+import org.apache.karaf.cellar.core.event.EventTransportFactory;
+import org.apache.karaf.cellar.dosgi.*;
+import org.apache.karaf.cellar.dosgi.management.internal.ServiceMBeanImpl;
+import org.apache.karaf.util.tracker.BaseActivator;
+import org.apache.karaf.util.tracker.annotation.ProvideService;
+import org.apache.karaf.util.tracker.annotation.RequireService;
+import org.apache.karaf.util.tracker.annotation.Services;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.framework.hooks.service.ListenerHook;
+import org.osgi.service.cm.ConfigurationAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Hashtable;
+
+@Services(
+ provides = {
+ @ProvideService(ListenerHook.class),
+ @ProvideService(EventHandler.class)
+ },
+ requires = {
+ @RequireService(ClusterManager.class),
+ @RequireService(EventTransportFactory.class),
+ @RequireService(CommandStore.class),
+ @RequireService(ConfigurationAdmin.class)
+ }
+)
+public class Activator extends BaseActivator {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(Activator.class);
+
+ private ImportServiceListener importServiceListener;
+ private ExportServiceListener exportServiceListener;
++ private RemovedNodeServiceTracker removedNodeServiceTracker;
+ private ServiceRegistration mbeanRegistration;
+
+ @Override
+ public void doStart() throws Exception {
+
+ ClusterManager clusterManager = getTrackedService(ClusterManager.class);
+ if (clusterManager == null)
+ return;
+ EventTransportFactory eventTransportFactory = getTrackedService(EventTransportFactory.class);
+ if (eventTransportFactory == null)
+ return;
+ CommandStore commandStore = getTrackedService(CommandStore.class);
+ if (commandStore == null)
+ return;
+ ConfigurationAdmin configurationAdmin = getTrackedService(ConfigurationAdmin.class);
+ if (configurationAdmin == null)
+ return;
+
+ LOGGER.debug("CELLAR DOSGI: init remote service call handler");
+ RemoteServiceCallHandler remoteServiceCallHandler = new RemoteServiceCallHandler();
+ remoteServiceCallHandler.setEventTransportFactory(eventTransportFactory);
+ remoteServiceCallHandler.setClusterManager(clusterManager);
+ remoteServiceCallHandler.setBundleContext(bundleContext);
+ remoteServiceCallHandler.setConfigurationAdmin(configurationAdmin);
+ Hashtable props = new Hashtable();
+ props.put("managed", "true");
+ register(EventHandler.class, remoteServiceCallHandler, props);
+
+ LOGGER.debug("CELLAR DOSGI: init remote service result handler");
+ RemoteServiceResultHandler remoteServiceResultHandler = new RemoteServiceResultHandler();
+ remoteServiceResultHandler.setCommandStore(commandStore);
+ register(EventHandler.class, remoteServiceResultHandler);
+
+ LOGGER.debug("CELLAR DOSGI: init import service listener");
+ importServiceListener = new ImportServiceListener();
+ importServiceListener.setClusterManager(clusterManager);
+ importServiceListener.setEventTransportFactory(eventTransportFactory);
+ importServiceListener.setCommandStore(commandStore);
+ importServiceListener.setBundleContext(bundleContext);
+ importServiceListener.init();
+ register(ListenerHook.class, importServiceListener);
+
+ LOGGER.debug("CELLAR DOSGI: init export service listener");
+ exportServiceListener = new ExportServiceListener();
+ exportServiceListener.setClusterManager(clusterManager);
+ exportServiceListener.setEventTransportFactory(eventTransportFactory);
+ exportServiceListener.setBundleContext(bundleContext);
+ exportServiceListener.init();
+
++ LOGGER.debug("CELLAR DOSGI: start removed nodes service tracker");
++ removedNodeServiceTracker = new RemovedNodeServiceTracker();
++ removedNodeServiceTracker.setClusterManager(clusterManager);
++ removedNodeServiceTracker.init();
++
+ LOGGER.debug("CELLAR DOSGI: register MBean");
+ ServiceMBeanImpl mbean = new ServiceMBeanImpl();
+ mbean.setClusterManager(clusterManager);
+ props = new Hashtable();
+ props.put("jmx.objectname", "org.apache.karaf.cellar:type=service,name=" + System.getProperty("karaf.name"));
+ mbeanRegistration = bundleContext.registerService(getInterfaceNames(mbean), mbean, props);
+ }
+
+ @Override
+ public void doStop() {
+ super.doStop();
+
+ if (mbeanRegistration != null) {
+ mbeanRegistration.unregister();
+ mbeanRegistration = null;
+ }
++
++ if (removedNodeServiceTracker != null) {
++ removedNodeServiceTracker.destroy();
++ removedNodeServiceTracker = null;
++ }
++
+ if (exportServiceListener != null) {
+ exportServiceListener.destroy();
+ exportServiceListener = null;
+ }
+ if (importServiceListener != null) {
+ importServiceListener.destroy();
+ importServiceListener = null;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/karaf-cellar/blob/778fcd4c/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/internal/osgi/RemovedNodeServiceTracker.java
----------------------------------------------------------------------
diff --cc dosgi/src/main/java/org/apache/karaf/cellar/dosgi/internal/osgi/RemovedNodeServiceTracker.java
index 0000000,0000000..1bad57a
new file mode 100644
--- /dev/null
+++ b/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/internal/osgi/RemovedNodeServiceTracker.java
@@@ -1,0 -1,0 +1,111 @@@
++/*
++ * Licensed under the Apache License, Version 2.0 (the "License");
++ * you may not use this file except in compliance with the License.
++ * You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.karaf.cellar.dosgi.internal.osgi;
++
++import org.apache.karaf.cellar.core.ClusterManager;
++import org.apache.karaf.cellar.core.Node;
++import org.apache.karaf.cellar.dosgi.Constants;
++import org.apache.karaf.cellar.dosgi.EndpointDescription;
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
++
++import java.util.HashSet;
++import java.util.Map;
++import java.util.Set;
++import java.util.concurrent.Executors;
++import java.util.concurrent.ScheduledExecutorService;
++import java.util.concurrent.TimeUnit;
++
++/**
++ * Listener called when a new service is exported.
++ */
++public class RemovedNodeServiceTracker implements Runnable {
++
++ private static final transient Logger LOGGER = LoggerFactory.getLogger(RemovedNodeServiceTracker.class);
++
++ private ClusterManager clusterManager;
++
++ private Map<String, EndpointDescription> remoteEndpoints;
++
++ private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
++
++ public void init() {
++ remoteEndpoints = clusterManager.getMap(Constants.REMOTE_ENDPOINTS);
++ scheduler.scheduleWithFixedDelay(this, 10, 10, TimeUnit.SECONDS);
++ }
++
++ public void destroy() {
++ scheduler.shutdown();
++ }
++
++ public ClusterManager getClusterManager() {
++ return clusterManager;
++ }
++
++ public void setClusterManager(ClusterManager clusterManager) {
++ this.clusterManager = clusterManager;
++ }
++
++
++ @Override
++ public void run() {
++ LOGGER.trace("CELLAR DOSGI: running the service tracker task");
++ ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();
++ try {
++ if (!remoteEndpoints.isEmpty()) {
++ LOGGER.trace("CELLAR DOSGI: found {} remote endpoints", remoteEndpoints.size());
++ Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
++ final Set<Node> activeNodes = clusterManager.listNodes();
++
++ // create a clone of remote endpoint to avoid concurrency concerns while iterating it
++ final Set<Map.Entry<String, EndpointDescription>> list = new HashSet<Map.Entry<String, EndpointDescription>>(remoteEndpoints.entrySet());
++
++ for (Map.Entry<String, EndpointDescription> entry : list) {
++ final EndpointDescription endpointDescription = entry.getValue();
++ final String key = entry.getKey();
++
++ // create a clone of nodes to avoid concurrency concerns while iterating it
++ final Set<Node> nodes = new HashSet<Node>(endpointDescription.getNodes());
++
++ boolean endpointChanged = false;
++ for (Node n : nodes) {
++ if (!activeNodes.contains(n)) {
++ LOGGER.debug("CELLAR DOSGI: removing node with id {} since it is not active", n.getId());
++ endpointDescription.getNodes().remove(n);
++ endpointChanged = true;
++ }
++ }
++
++ if (endpointChanged) {
++ // if the endpoint is used for export from other nodes too, then update it
++ if (endpointDescription.getNodes().size() > 0) {
++ LOGGER.debug("CELLAR DOSGI: updating remote endpoint {}", key);
++ remoteEndpoints.put(key, endpointDescription);
++ } else { // remove endpoint permanently
++ LOGGER.debug("CELLAR DOSGI: removing remote endpoint {}", key);
++ remoteEndpoints.remove(key);
++ }
++ }
++ }
++ } else {
++ LOGGER.trace("CELLAR DOSGI: no remote endpoints found");
++ }
++ } catch (Exception e) {
++ LOGGER.warn("CELLAR DOSGI: failed to run service tracker task", e);
++ } finally {
++ Thread.currentThread().setContextClassLoader(originalClassLoader);
++ }
++ }
++
++}