You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@karaf.apache.org by jb...@apache.org on 2015/09/20 11:33:38 UTC
[1/2] karaf-cellar git commit: [KARAF-1842] Implemented Service
Tracker to remove unavailable distributed service.
Repository: karaf-cellar
Updated Branches:
refs/heads/master 9e4259790 -> 778fcd4c2
[KARAF-1842] Implemented Service Tracker to remove unavailable distributed service.
Project: http://git-wip-us.apache.org/repos/asf/karaf-cellar/repo
Commit: http://git-wip-us.apache.org/repos/asf/karaf-cellar/commit/de8162fb
Tree: http://git-wip-us.apache.org/repos/asf/karaf-cellar/tree/de8162fb
Diff: http://git-wip-us.apache.org/repos/asf/karaf-cellar/diff/de8162fb
Branch: refs/heads/master
Commit: de8162fba4542a53050eccf16908981c54131ab6
Parents: b4a2504
Author: Flávio Ferreira <ff...@bikeemotion.com>
Authored: Wed Apr 22 18:46:42 2015 +0100
Committer: Flávio Ferreira <ff...@bikeemotion.com>
Committed: Wed Apr 22 18:46:42 2015 +0100
----------------------------------------------------------------------
.../karaf/cellar/dosgi/ServiceTracker.java | 110 +++++++++++++++++++
.../resources/OSGI-INF/blueprint/blueprint.xml | 4 +
.../hazelcast/HazelcastClusterManager.java | 6 +-
.../cellar/hazelcast/HazelcastGroupManager.java | 2 +-
.../hazelcast/HazelcastInstanceAware.java | 2 +-
.../karaf/cellar/hazelcast/QueueConsumer.java | 36 +++---
6 files changed, 140 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/karaf-cellar/blob/de8162fb/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/ServiceTracker.java
----------------------------------------------------------------------
diff --git a/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/ServiceTracker.java b/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/ServiceTracker.java
new file mode 100644
index 0000000..45c897c
--- /dev/null
+++ b/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/ServiceTracker.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.karaf.cellar.dosgi;
+
+import org.apache.karaf.cellar.core.ClusterManager;
+import org.apache.karaf.cellar.core.Node;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Listener called when a new service is exported.
+ */
+public class ServiceTracker implements Runnable {
+
+ private static final transient Logger LOGGER = LoggerFactory.getLogger(ServiceTracker.class);
+
+ private ClusterManager clusterManager;
+
+ private Map<String, EndpointDescription> remoteEndpoints;
+
+ private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
+
+ public void init() {
+ LOGGER.info("CELLAR SERVICE TRACKER: a new Task initialized");
+ remoteEndpoints = clusterManager.getMap(Constants.REMOTE_ENDPOINTS);
+ scheduler.scheduleWithFixedDelay(this, 10, 10, TimeUnit.SECONDS);
+ }
+
+ public void destroy() {
+ LOGGER.info("CELLAR SERVICE TRACKER: task is being destroyed");
+ scheduler.shutdown();
+ }
+
+ public ClusterManager getClusterManager() {
+ return clusterManager;
+ }
+
+ public void setClusterManager(ClusterManager clusterManager) {
+ this.clusterManager = clusterManager;
+ }
+
+
+ @Override
+ public void run() {
+ LOGGER.trace("SERVICE TRACKER: running the service tracker task");
+ ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();
+ try {
+ if (!remoteEndpoints.isEmpty()) {
+ LOGGER.trace("SERVICE TRACKER: Founded {} remote endpoints", remoteEndpoints.size());
+ Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+ final Set<Node> activeNodes = clusterManager.listNodes();
+
+ // create a clone of remote endpoint to avoid concurrency concerns while iterating it
+ final Set<Map.Entry<String, EndpointDescription>> list = new HashSet<Map.Entry<String, EndpointDescription>>(remoteEndpoints.entrySet());
+
+ for (Map.Entry<String, EndpointDescription> entry : list) {
+ final EndpointDescription endpointDescription = entry.getValue();
+ final String key = entry.getKey();
+
+ // create a clone of nodes to avoid concurrency concerns while iterating it
+ final Set<Node> nodes = new HashSet<Node>(endpointDescription.getNodes());
+
+ boolean endpointChanged = false;
+ for(Node n : nodes) {
+ if(!activeNodes.contains(n)) {
+ LOGGER.debug("SERVICE TRACKER: Removing node with id {} since it is not active", n.getId());
+ endpointDescription.getNodes().remove(n);
+ endpointChanged = true;
+ }
+ }
+
+ if(endpointChanged) {
+ // if the endpoint is used for export from other nodes too, then update it
+ if (endpointDescription.getNodes().size() > 0) {
+ LOGGER.debug("SERVICE TRACKER: Updating remote endpoint {}", key);
+ remoteEndpoints.put(key, endpointDescription);
+ } else { // remove endpoint permanently
+ LOGGER.debug("SERVICE TRACKER: Removing remote endpoint {}", key);
+ remoteEndpoints.remove(key);
+ }
+ }
+ }
+ } else {
+ LOGGER.debug("SERVICE TRACKER: no remote endpoints found");
+ }
+ } catch (Exception e) {
+ LOGGER.error("SERVICE TRACKER: failed to run service tracker task",e);
+ } finally {
+ Thread.currentThread().setContextClassLoader(originalClassLoader);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/karaf-cellar/blob/de8162fb/dosgi/src/main/resources/OSGI-INF/blueprint/blueprint.xml
----------------------------------------------------------------------
diff --git a/dosgi/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/dosgi/src/main/resources/OSGI-INF/blueprint/blueprint.xml
index 51171fc..75ee4c0 100644
--- a/dosgi/src/main/resources/OSGI-INF/blueprint/blueprint.xml
+++ b/dosgi/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -55,4 +55,8 @@
<reference id="commandStore" interface="org.apache.karaf.cellar.core.command.CommandStore"/>
<reference id="configurationAdmin" interface="org.osgi.service.cm.ConfigurationAdmin"/>
+
+ <bean id="serviceTracker" class="org.apache.karaf.cellar.dosgi.ServiceTracker" init-method="init" destroy-method="destroy">
+ <property name="clusterManager" ref="clusterManager"/>
+ </bean>
</blueprint>
http://git-wip-us.apache.org/repos/asf/karaf-cellar/blob/de8162fb/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastClusterManager.java
----------------------------------------------------------------------
diff --git a/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastClusterManager.java b/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastClusterManager.java
index b9a9619..9059eb5 100644
--- a/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastClusterManager.java
+++ b/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastClusterManager.java
@@ -87,7 +87,7 @@ public class HazelcastClusterManager extends HazelcastInstanceAware implements C
Set<Member> members = cluster.getMembers();
if (members != null && !members.isEmpty()) {
for (Member member : members) {
- HazelcastNode node = new HazelcastNode(member.getInetSocketAddress().getHostName(), member.getInetSocketAddress().getPort());
+ HazelcastNode node = new HazelcastNode(member.getSocketAddress().getHostString(), member.getSocketAddress().getPort());
nodes.add(node);
}
}
@@ -110,7 +110,7 @@ public class HazelcastClusterManager extends HazelcastInstanceAware implements C
Set<Member> members = cluster.getMembers();
if (members != null && !members.isEmpty()) {
for (Member member : members) {
- HazelcastNode node = new HazelcastNode(member.getInetSocketAddress().getHostName(), member.getInetSocketAddress().getPort());
+ HazelcastNode node = new HazelcastNode(member.getSocketAddress().getHostString(), member.getSocketAddress().getPort());
if (ids.contains(node.getId())) {
nodes.add(node);
}
@@ -135,7 +135,7 @@ public class HazelcastClusterManager extends HazelcastInstanceAware implements C
Set<Member> members = cluster.getMembers();
if (members != null && !members.isEmpty()) {
for (Member member : members) {
- HazelcastNode node = new HazelcastNode(member.getInetSocketAddress().getHostName(), member.getInetSocketAddress().getPort());
+ HazelcastNode node = new HazelcastNode(member.getSocketAddress().getHostString(), member.getSocketAddress().getPort());
if (id.equals(node.getId())) {
return node;
}
http://git-wip-us.apache.org/repos/asf/karaf-cellar/blob/de8162fb/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastGroupManager.java
----------------------------------------------------------------------
diff --git a/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastGroupManager.java b/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastGroupManager.java
index 57a5d71..78370dd 100644
--- a/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastGroupManager.java
+++ b/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastGroupManager.java
@@ -140,7 +140,7 @@ public class HazelcastGroupManager implements GroupManager, EntryListener, Confi
Cluster cluster = instance.getCluster();
if (cluster != null) {
Member member = cluster.getLocalMember();
- node = new HazelcastNode(member.getInetSocketAddress().getHostName(), member.getInetSocketAddress().getPort());
+ node = new HazelcastNode(member.getSocketAddress().getHostString(), member.getSocketAddress().getPort());
}
return node;
} finally {
http://git-wip-us.apache.org/repos/asf/karaf-cellar/blob/de8162fb/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastInstanceAware.java
----------------------------------------------------------------------
diff --git a/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastInstanceAware.java b/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastInstanceAware.java
index 87e1288..7c9f14d 100644
--- a/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastInstanceAware.java
+++ b/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastInstanceAware.java
@@ -42,7 +42,7 @@ public class HazelcastInstanceAware {
Cluster cluster = instance.getCluster();
if (cluster != null) {
Member member = cluster.getLocalMember();
- return new HazelcastNode(member.getInetSocketAddress().getHostName(), member.getInetSocketAddress().getPort());
+ return new HazelcastNode(member.getSocketAddress().getHostString(), member.getSocketAddress().getPort());
} else {
return null;
}
http://git-wip-us.apache.org/repos/asf/karaf-cellar/blob/de8162fb/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/QueueConsumer.java
----------------------------------------------------------------------
diff --git a/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/QueueConsumer.java b/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/QueueConsumer.java
index 85c7534..d19461d 100644
--- a/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/QueueConsumer.java
+++ b/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/QueueConsumer.java
@@ -86,26 +86,32 @@ public class QueueConsumer<E extends Event> implements EventConsumer<E>, ItemLis
@Override
public void run() {
ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();
- try {
- while (isConsuming) {
- if (combinedClassLoader != null) {
- Thread.currentThread().setContextClassLoader(combinedClassLoader);
- } else Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
- E e = null;
- try {
- e = getQueue().poll(10, TimeUnit.SECONDS);
- } catch (InterruptedException e1) {
- LOGGER.warn("CELLAR HAZELCAST: consume task interrupted");
- }
+ E e;
+ while (isConsuming) {
+ if (combinedClassLoader != null) {
+ Thread.currentThread().setContextClassLoader(combinedClassLoader);
+ } else Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+ e = null;
+ try {
+ e = getQueue().poll(10, TimeUnit.SECONDS);
+ } catch (InterruptedException e1) {
+ LOGGER.warn("CELLAR HAZELCAST: consume task interrupted");
+ } catch (Exception e2) {
+ // catch everything from Hazelcast to prevent the death of Queue Consumer task
+ LOGGER.warn("CELLAR HAZELCAST: consumer task failed to poll the queue", e2);
+ }
+
+ try {
if (e != null) {
consume(e);
}
+ } catch (Exception e1) {
+ LOGGER.error("CELLAR HAZELCAST: failed to consume from queue", e1);
}
- } catch (Exception ex) {
- LOGGER.error("CELLAR HAZELCAST: failed to consume from queue", ex);
- } finally {
- Thread.currentThread().setContextClassLoader(originalClassLoader);
+
}
+
+ Thread.currentThread().setContextClassLoader(originalClassLoader);
}
/**
[2/2] karaf-cellar git commit: Merge branch
'feature_add_service_tracker_to_unregist_remove_service' of
https://github.com/FlavioF/karaf-cellar
Posted by jb...@apache.org.
Merge branch 'feature_add_service_tracker_to_unregist_remove_service' of https://github.com/FlavioF/karaf-cellar
Conflicts:
dosgi/src/main/resources/OSGI-INF/blueprint/blueprint.xml
Project: http://git-wip-us.apache.org/repos/asf/karaf-cellar/repo
Commit: http://git-wip-us.apache.org/repos/asf/karaf-cellar/commit/778fcd4c
Tree: http://git-wip-us.apache.org/repos/asf/karaf-cellar/tree/778fcd4c
Diff: http://git-wip-us.apache.org/repos/asf/karaf-cellar/diff/778fcd4c
Branch: refs/heads/master
Commit: 778fcd4c2fbc852ac9b1b0d78673b2a07590ea47
Parents: 9e42597 de8162f
Author: Jean-Baptiste Onofré <jb...@apache.org>
Authored: Sun Sep 20 11:33:28 2015 +0200
Committer: Jean-Baptiste Onofré <jb...@apache.org>
Committed: Sun Sep 20 11:33:28 2015 +0200
----------------------------------------------------------------------
.../cellar/dosgi/internal/osgi/Activator.java | 12 ++
.../osgi/RemovedNodeServiceTracker.java | 111 +++++++++++++++++++
.../hazelcast/HazelcastClusterManager.java | 6 +-
.../cellar/hazelcast/HazelcastGroupManager.java | 2 +-
.../hazelcast/HazelcastInstanceAware.java | 2 +-
.../karaf/cellar/hazelcast/QueueConsumer.java | 36 +++---
6 files changed, 149 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/karaf-cellar/blob/778fcd4c/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/internal/osgi/Activator.java
----------------------------------------------------------------------
diff --cc dosgi/src/main/java/org/apache/karaf/cellar/dosgi/internal/osgi/Activator.java
index b3c924f,0000000..b7b360e
mode 100644,000000..100644
--- a/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/internal/osgi/Activator.java
+++ b/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/internal/osgi/Activator.java
@@@ -1,127 -1,0 +1,139 @@@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.karaf.cellar.dosgi.internal.osgi;
+
+import org.apache.karaf.cellar.core.ClusterManager;
+import org.apache.karaf.cellar.core.command.CommandStore;
+import org.apache.karaf.cellar.core.event.EventHandler;
+import org.apache.karaf.cellar.core.event.EventTransportFactory;
+import org.apache.karaf.cellar.dosgi.*;
+import org.apache.karaf.cellar.dosgi.management.internal.ServiceMBeanImpl;
+import org.apache.karaf.util.tracker.BaseActivator;
+import org.apache.karaf.util.tracker.annotation.ProvideService;
+import org.apache.karaf.util.tracker.annotation.RequireService;
+import org.apache.karaf.util.tracker.annotation.Services;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.framework.hooks.service.ListenerHook;
+import org.osgi.service.cm.ConfigurationAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Hashtable;
+
+@Services(
+ provides = {
+ @ProvideService(ListenerHook.class),
+ @ProvideService(EventHandler.class)
+ },
+ requires = {
+ @RequireService(ClusterManager.class),
+ @RequireService(EventTransportFactory.class),
+ @RequireService(CommandStore.class),
+ @RequireService(ConfigurationAdmin.class)
+ }
+)
+public class Activator extends BaseActivator {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(Activator.class);
+
+ private ImportServiceListener importServiceListener;
+ private ExportServiceListener exportServiceListener;
++ private RemovedNodeServiceTracker removedNodeServiceTracker;
+ private ServiceRegistration mbeanRegistration;
+
+ @Override
+ public void doStart() throws Exception {
+
+ ClusterManager clusterManager = getTrackedService(ClusterManager.class);
+ if (clusterManager == null)
+ return;
+ EventTransportFactory eventTransportFactory = getTrackedService(EventTransportFactory.class);
+ if (eventTransportFactory == null)
+ return;
+ CommandStore commandStore = getTrackedService(CommandStore.class);
+ if (commandStore == null)
+ return;
+ ConfigurationAdmin configurationAdmin = getTrackedService(ConfigurationAdmin.class);
+ if (configurationAdmin == null)
+ return;
+
+ LOGGER.debug("CELLAR DOSGI: init remote service call handler");
+ RemoteServiceCallHandler remoteServiceCallHandler = new RemoteServiceCallHandler();
+ remoteServiceCallHandler.setEventTransportFactory(eventTransportFactory);
+ remoteServiceCallHandler.setClusterManager(clusterManager);
+ remoteServiceCallHandler.setBundleContext(bundleContext);
+ remoteServiceCallHandler.setConfigurationAdmin(configurationAdmin);
+ Hashtable props = new Hashtable();
+ props.put("managed", "true");
+ register(EventHandler.class, remoteServiceCallHandler, props);
+
+ LOGGER.debug("CELLAR DOSGI: init remote service result handler");
+ RemoteServiceResultHandler remoteServiceResultHandler = new RemoteServiceResultHandler();
+ remoteServiceResultHandler.setCommandStore(commandStore);
+ register(EventHandler.class, remoteServiceResultHandler);
+
+ LOGGER.debug("CELLAR DOSGI: init import service listener");
+ importServiceListener = new ImportServiceListener();
+ importServiceListener.setClusterManager(clusterManager);
+ importServiceListener.setEventTransportFactory(eventTransportFactory);
+ importServiceListener.setCommandStore(commandStore);
+ importServiceListener.setBundleContext(bundleContext);
+ importServiceListener.init();
+ register(ListenerHook.class, importServiceListener);
+
+ LOGGER.debug("CELLAR DOSGI: init export service listener");
+ exportServiceListener = new ExportServiceListener();
+ exportServiceListener.setClusterManager(clusterManager);
+ exportServiceListener.setEventTransportFactory(eventTransportFactory);
+ exportServiceListener.setBundleContext(bundleContext);
+ exportServiceListener.init();
+
++ LOGGER.debug("CELLAR DOSGI: start removed nodes service tracker");
++ removedNodeServiceTracker = new RemovedNodeServiceTracker();
++ removedNodeServiceTracker.setClusterManager(clusterManager);
++ removedNodeServiceTracker.init();
++
+ LOGGER.debug("CELLAR DOSGI: register MBean");
+ ServiceMBeanImpl mbean = new ServiceMBeanImpl();
+ mbean.setClusterManager(clusterManager);
+ props = new Hashtable();
+ props.put("jmx.objectname", "org.apache.karaf.cellar:type=service,name=" + System.getProperty("karaf.name"));
+ mbeanRegistration = bundleContext.registerService(getInterfaceNames(mbean), mbean, props);
+ }
+
+ @Override
+ public void doStop() {
+ super.doStop();
+
+ if (mbeanRegistration != null) {
+ mbeanRegistration.unregister();
+ mbeanRegistration = null;
+ }
++
++ if (removedNodeServiceTracker != null) {
++ removedNodeServiceTracker.destroy();
++ removedNodeServiceTracker = null;
++ }
++
+ if (exportServiceListener != null) {
+ exportServiceListener.destroy();
+ exportServiceListener = null;
+ }
+ if (importServiceListener != null) {
+ importServiceListener.destroy();
+ importServiceListener = null;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/karaf-cellar/blob/778fcd4c/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/internal/osgi/RemovedNodeServiceTracker.java
----------------------------------------------------------------------
diff --cc dosgi/src/main/java/org/apache/karaf/cellar/dosgi/internal/osgi/RemovedNodeServiceTracker.java
index 0000000,0000000..1bad57a
new file mode 100644
--- /dev/null
+++ b/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/internal/osgi/RemovedNodeServiceTracker.java
@@@ -1,0 -1,0 +1,111 @@@
++/*
++ * Licensed under the Apache License, Version 2.0 (the "License");
++ * you may not use this file except in compliance with the License.
++ * You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.karaf.cellar.dosgi.internal.osgi;
++
++import org.apache.karaf.cellar.core.ClusterManager;
++import org.apache.karaf.cellar.core.Node;
++import org.apache.karaf.cellar.dosgi.Constants;
++import org.apache.karaf.cellar.dosgi.EndpointDescription;
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
++
++import java.util.HashSet;
++import java.util.Map;
++import java.util.Set;
++import java.util.concurrent.Executors;
++import java.util.concurrent.ScheduledExecutorService;
++import java.util.concurrent.TimeUnit;
++
++/**
++ * Listener called when a new service is exported.
++ */
++public class RemovedNodeServiceTracker implements Runnable {
++
++ private static final transient Logger LOGGER = LoggerFactory.getLogger(RemovedNodeServiceTracker.class);
++
++ private ClusterManager clusterManager;
++
++ private Map<String, EndpointDescription> remoteEndpoints;
++
++ private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
++
++ public void init() {
++ remoteEndpoints = clusterManager.getMap(Constants.REMOTE_ENDPOINTS);
++ scheduler.scheduleWithFixedDelay(this, 10, 10, TimeUnit.SECONDS);
++ }
++
++ public void destroy() {
++ scheduler.shutdown();
++ }
++
++ public ClusterManager getClusterManager() {
++ return clusterManager;
++ }
++
++ public void setClusterManager(ClusterManager clusterManager) {
++ this.clusterManager = clusterManager;
++ }
++
++
++ @Override
++ public void run() {
++ LOGGER.trace("CELLAR DOSGI: running the service tracker task");
++ ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();
++ try {
++ if (!remoteEndpoints.isEmpty()) {
++ LOGGER.trace("CELLAR DOSGI: found {} remote endpoints", remoteEndpoints.size());
++ Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
++ final Set<Node> activeNodes = clusterManager.listNodes();
++
++ // create a clone of remote endpoint to avoid concurrency concerns while iterating it
++ final Set<Map.Entry<String, EndpointDescription>> list = new HashSet<Map.Entry<String, EndpointDescription>>(remoteEndpoints.entrySet());
++
++ for (Map.Entry<String, EndpointDescription> entry : list) {
++ final EndpointDescription endpointDescription = entry.getValue();
++ final String key = entry.getKey();
++
++ // create a clone of nodes to avoid concurrency concerns while iterating it
++ final Set<Node> nodes = new HashSet<Node>(endpointDescription.getNodes());
++
++ boolean endpointChanged = false;
++ for (Node n : nodes) {
++ if (!activeNodes.contains(n)) {
++ LOGGER.debug("CELLAR DOSGI: removing node with id {} since it is not active", n.getId());
++ endpointDescription.getNodes().remove(n);
++ endpointChanged = true;
++ }
++ }
++
++ if (endpointChanged) {
++ // if the endpoint is used for export from other nodes too, then update it
++ if (endpointDescription.getNodes().size() > 0) {
++ LOGGER.debug("CELLAR DOSGI: updating remote endpoint {}", key);
++ remoteEndpoints.put(key, endpointDescription);
++ } else { // remove endpoint permanently
++ LOGGER.debug("CELLAR DOSGI: removing remote endpoint {}", key);
++ remoteEndpoints.remove(key);
++ }
++ }
++ }
++ } else {
++ LOGGER.trace("CELLAR DOSGI: no remote endpoints found");
++ }
++ } catch (Exception e) {
++ LOGGER.warn("CELLAR DOSGI: failed to run service tracker task", e);
++ } finally {
++ Thread.currentThread().setContextClassLoader(originalClassLoader);
++ }
++ }
++
++}