You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@karaf.apache.org by jb...@apache.org on 2015/09/20 11:33:38 UTC

[1/2] karaf-cellar git commit: [KARAF-1842] Implemented Service Tracker to remove unavailable distributed service.

Repository: karaf-cellar
Updated Branches:
  refs/heads/master 9e4259790 -> 778fcd4c2


[KARAF-1842] Implemented Service Tracker to remove unavailable distributed service.


Project: http://git-wip-us.apache.org/repos/asf/karaf-cellar/repo
Commit: http://git-wip-us.apache.org/repos/asf/karaf-cellar/commit/de8162fb
Tree: http://git-wip-us.apache.org/repos/asf/karaf-cellar/tree/de8162fb
Diff: http://git-wip-us.apache.org/repos/asf/karaf-cellar/diff/de8162fb

Branch: refs/heads/master
Commit: de8162fba4542a53050eccf16908981c54131ab6
Parents: b4a2504
Author: Flávio Ferreira <ff...@bikeemotion.com>
Authored: Wed Apr 22 18:46:42 2015 +0100
Committer: Flávio Ferreira <ff...@bikeemotion.com>
Committed: Wed Apr 22 18:46:42 2015 +0100

----------------------------------------------------------------------
 .../karaf/cellar/dosgi/ServiceTracker.java      | 110 +++++++++++++++++++
 .../resources/OSGI-INF/blueprint/blueprint.xml  |   4 +
 .../hazelcast/HazelcastClusterManager.java      |   6 +-
 .../cellar/hazelcast/HazelcastGroupManager.java |   2 +-
 .../hazelcast/HazelcastInstanceAware.java       |   2 +-
 .../karaf/cellar/hazelcast/QueueConsumer.java   |  36 +++---
 6 files changed, 140 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/karaf-cellar/blob/de8162fb/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/ServiceTracker.java
----------------------------------------------------------------------
diff --git a/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/ServiceTracker.java b/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/ServiceTracker.java
new file mode 100644
index 0000000..45c897c
--- /dev/null
+++ b/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/ServiceTracker.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.karaf.cellar.dosgi;
+
+import org.apache.karaf.cellar.core.ClusterManager;
+import org.apache.karaf.cellar.core.Node;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Listener called when a new service is exported.
+ */
+public class ServiceTracker implements Runnable {
+
+    private static final transient Logger LOGGER = LoggerFactory.getLogger(ServiceTracker.class);
+
+    private ClusterManager clusterManager;
+
+    private Map<String, EndpointDescription> remoteEndpoints;
+
+    private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
+    
+    public void init() {
+        LOGGER.info("CELLAR SERVICE TRACKER: a new Task initialized");
+        remoteEndpoints = clusterManager.getMap(Constants.REMOTE_ENDPOINTS);
+        scheduler.scheduleWithFixedDelay(this, 10, 10, TimeUnit.SECONDS);
+    }
+
+    public void destroy() {
+        LOGGER.info("CELLAR SERVICE TRACKER: task is being destroyed");
+        scheduler.shutdown();
+    }
+
+    public ClusterManager getClusterManager() {
+        return clusterManager;
+    }
+
+    public void setClusterManager(ClusterManager clusterManager) {
+        this.clusterManager = clusterManager;
+    }
+
+
+    @Override
+    public void run() {
+        LOGGER.trace("SERVICE TRACKER: running the service tracker task");
+        ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();
+        try {
+            if (!remoteEndpoints.isEmpty()) {
+                LOGGER.trace("SERVICE TRACKER: Founded {} remote endpoints", remoteEndpoints.size());
+                Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+                final Set<Node> activeNodes = clusterManager.listNodes();
+                
+                // create a clone of remote endpoint to avoid concurrency concerns while iterating it
+                final Set<Map.Entry<String, EndpointDescription>> list = new HashSet<Map.Entry<String, EndpointDescription>>(remoteEndpoints.entrySet());
+                
+                for (Map.Entry<String, EndpointDescription> entry : list) {
+                    final EndpointDescription endpointDescription = entry.getValue();
+                    final String key = entry.getKey();
+                    
+                    // create a clone of nodes to avoid concurrency concerns while iterating it
+                    final Set<Node> nodes = new HashSet<Node>(endpointDescription.getNodes());
+                    
+                    boolean endpointChanged = false;
+                    for(Node n : nodes) {
+                        if(!activeNodes.contains(n)) {
+                            LOGGER.debug("SERVICE TRACKER: Removing node with id {} since it is not active", n.getId());
+                            endpointDescription.getNodes().remove(n);
+                            endpointChanged = true;                        
+                        }
+                    }
+
+                    if(endpointChanged) {
+                        // if the endpoint is used for export from other nodes too, then update it
+                        if (endpointDescription.getNodes().size() > 0) {
+                            LOGGER.debug("SERVICE TRACKER: Updating remote endpoint {}", key);
+                            remoteEndpoints.put(key, endpointDescription);
+                        } else { // remove endpoint permanently
+                            LOGGER.debug("SERVICE TRACKER: Removing remote endpoint {}", key);
+                            remoteEndpoints.remove(key);
+                        }
+                    } 
+                }
+            } else {
+                LOGGER.debug("SERVICE TRACKER: no remote endpoints found");
+            }
+        } catch (Exception e) {
+            LOGGER.error("SERVICE TRACKER: failed to run service tracker task",e);
+        } finally {
+           Thread.currentThread().setContextClassLoader(originalClassLoader);
+        }
+    }
+  
+}

http://git-wip-us.apache.org/repos/asf/karaf-cellar/blob/de8162fb/dosgi/src/main/resources/OSGI-INF/blueprint/blueprint.xml
----------------------------------------------------------------------
diff --git a/dosgi/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/dosgi/src/main/resources/OSGI-INF/blueprint/blueprint.xml
index 51171fc..75ee4c0 100644
--- a/dosgi/src/main/resources/OSGI-INF/blueprint/blueprint.xml
+++ b/dosgi/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -55,4 +55,8 @@
     <reference id="commandStore" interface="org.apache.karaf.cellar.core.command.CommandStore"/>
     <reference id="configurationAdmin" interface="org.osgi.service.cm.ConfigurationAdmin"/>
 
+
+    <bean id="serviceTracker" class="org.apache.karaf.cellar.dosgi.ServiceTracker" init-method="init" destroy-method="destroy">
+        <property name="clusterManager" ref="clusterManager"/>
+    </bean>
 </blueprint>

http://git-wip-us.apache.org/repos/asf/karaf-cellar/blob/de8162fb/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastClusterManager.java
----------------------------------------------------------------------
diff --git a/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastClusterManager.java b/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastClusterManager.java
index b9a9619..9059eb5 100644
--- a/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastClusterManager.java
+++ b/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastClusterManager.java
@@ -87,7 +87,7 @@ public class HazelcastClusterManager extends HazelcastInstanceAware implements C
             Set<Member> members = cluster.getMembers();
             if (members != null && !members.isEmpty()) {
                 for (Member member : members) {
-                    HazelcastNode node = new HazelcastNode(member.getInetSocketAddress().getHostName(), member.getInetSocketAddress().getPort());
+                    HazelcastNode node = new HazelcastNode(member.getSocketAddress().getHostString(), member.getSocketAddress().getPort());
                     nodes.add(node);
                 }
             }
@@ -110,7 +110,7 @@ public class HazelcastClusterManager extends HazelcastInstanceAware implements C
                 Set<Member> members = cluster.getMembers();
                 if (members != null && !members.isEmpty()) {
                     for (Member member : members) {
-                        HazelcastNode node = new HazelcastNode(member.getInetSocketAddress().getHostName(), member.getInetSocketAddress().getPort());
+                        HazelcastNode node = new HazelcastNode(member.getSocketAddress().getHostString(), member.getSocketAddress().getPort());
                         if (ids.contains(node.getId())) {
                             nodes.add(node);
                         }
@@ -135,7 +135,7 @@ public class HazelcastClusterManager extends HazelcastInstanceAware implements C
                 Set<Member> members = cluster.getMembers();
                 if (members != null && !members.isEmpty()) {
                     for (Member member : members) {
-                        HazelcastNode node = new HazelcastNode(member.getInetSocketAddress().getHostName(), member.getInetSocketAddress().getPort());
+                        HazelcastNode node = new HazelcastNode(member.getSocketAddress().getHostString(), member.getSocketAddress().getPort());
                         if (id.equals(node.getId())) {
                             return node;
                         }

http://git-wip-us.apache.org/repos/asf/karaf-cellar/blob/de8162fb/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastGroupManager.java
----------------------------------------------------------------------
diff --git a/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastGroupManager.java b/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastGroupManager.java
index 57a5d71..78370dd 100644
--- a/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastGroupManager.java
+++ b/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastGroupManager.java
@@ -140,7 +140,7 @@ public class HazelcastGroupManager implements GroupManager, EntryListener, Confi
             Cluster cluster = instance.getCluster();
             if (cluster != null) {
                 Member member = cluster.getLocalMember();
-                node = new HazelcastNode(member.getInetSocketAddress().getHostName(), member.getInetSocketAddress().getPort());
+                node = new HazelcastNode(member.getSocketAddress().getHostString(), member.getSocketAddress().getPort());
             }
             return node;
         } finally {

http://git-wip-us.apache.org/repos/asf/karaf-cellar/blob/de8162fb/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastInstanceAware.java
----------------------------------------------------------------------
diff --git a/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastInstanceAware.java b/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastInstanceAware.java
index 87e1288..7c9f14d 100644
--- a/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastInstanceAware.java
+++ b/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastInstanceAware.java
@@ -42,7 +42,7 @@ public class HazelcastInstanceAware {
         Cluster cluster = instance.getCluster();
         if (cluster != null) {
             Member member = cluster.getLocalMember();
-            return new HazelcastNode(member.getInetSocketAddress().getHostName(), member.getInetSocketAddress().getPort());
+            return new HazelcastNode(member.getSocketAddress().getHostString(), member.getSocketAddress().getPort());
         } else {
             return null;
         }

http://git-wip-us.apache.org/repos/asf/karaf-cellar/blob/de8162fb/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/QueueConsumer.java
----------------------------------------------------------------------
diff --git a/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/QueueConsumer.java b/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/QueueConsumer.java
index 85c7534..d19461d 100644
--- a/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/QueueConsumer.java
+++ b/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/QueueConsumer.java
@@ -86,26 +86,32 @@ public class QueueConsumer<E extends Event> implements EventConsumer<E>, ItemLis
     @Override
     public void run() {
         ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();
-        try {
-            while (isConsuming) {
-                if (combinedClassLoader != null) {
-                    Thread.currentThread().setContextClassLoader(combinedClassLoader);
-                } else Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
-                E e = null;
-                try {
-                    e = getQueue().poll(10, TimeUnit.SECONDS);
-                } catch (InterruptedException e1) {
-                    LOGGER.warn("CELLAR HAZELCAST: consume task interrupted");
-                }
+        E e;
+        while (isConsuming) {
+            if (combinedClassLoader != null) {
+                Thread.currentThread().setContextClassLoader(combinedClassLoader);
+            } else Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+            e = null;
+            try {
+                e = getQueue().poll(10, TimeUnit.SECONDS);
+            } catch (InterruptedException e1) {
+                LOGGER.warn("CELLAR HAZELCAST: consume task interrupted");
+            } catch (Exception e2) {
+                // catch everything from Hazelcast to prevent the death of Queue Consumer task
+                LOGGER.warn("CELLAR HAZELCAST: consumer task failed to poll the queue", e2);
+            }
+            
+            try {
                 if (e != null) {
                     consume(e);
                 }
+            } catch (Exception e1) {
+                LOGGER.error("CELLAR HAZELCAST: failed to consume from queue", e1);
             }
-        } catch (Exception ex) {
-            LOGGER.error("CELLAR HAZELCAST: failed to consume from queue", ex);
-        } finally {
-            Thread.currentThread().setContextClassLoader(originalClassLoader);
+
         }
+        
+        Thread.currentThread().setContextClassLoader(originalClassLoader);
     }
 
     /**


[2/2] karaf-cellar git commit: Merge branch 'feature_add_service_tracker_to_unregist_remove_service' of https://github.com/FlavioF/karaf-cellar

Posted by jb...@apache.org.
Merge branch 'feature_add_service_tracker_to_unregist_remove_service' of https://github.com/FlavioF/karaf-cellar

Conflicts:
	dosgi/src/main/resources/OSGI-INF/blueprint/blueprint.xml


Project: http://git-wip-us.apache.org/repos/asf/karaf-cellar/repo
Commit: http://git-wip-us.apache.org/repos/asf/karaf-cellar/commit/778fcd4c
Tree: http://git-wip-us.apache.org/repos/asf/karaf-cellar/tree/778fcd4c
Diff: http://git-wip-us.apache.org/repos/asf/karaf-cellar/diff/778fcd4c

Branch: refs/heads/master
Commit: 778fcd4c2fbc852ac9b1b0d78673b2a07590ea47
Parents: 9e42597 de8162f
Author: Jean-Baptiste Onofré <jb...@apache.org>
Authored: Sun Sep 20 11:33:28 2015 +0200
Committer: Jean-Baptiste Onofré <jb...@apache.org>
Committed: Sun Sep 20 11:33:28 2015 +0200

----------------------------------------------------------------------
 .../cellar/dosgi/internal/osgi/Activator.java   |  12 ++
 .../osgi/RemovedNodeServiceTracker.java         | 111 +++++++++++++++++++
 .../hazelcast/HazelcastClusterManager.java      |   6 +-
 .../cellar/hazelcast/HazelcastGroupManager.java |   2 +-
 .../hazelcast/HazelcastInstanceAware.java       |   2 +-
 .../karaf/cellar/hazelcast/QueueConsumer.java   |  36 +++---
 6 files changed, 149 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/karaf-cellar/blob/778fcd4c/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/internal/osgi/Activator.java
----------------------------------------------------------------------
diff --cc dosgi/src/main/java/org/apache/karaf/cellar/dosgi/internal/osgi/Activator.java
index b3c924f,0000000..b7b360e
mode 100644,000000..100644
--- a/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/internal/osgi/Activator.java
+++ b/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/internal/osgi/Activator.java
@@@ -1,127 -1,0 +1,139 @@@
 +/*
 + * Licensed under the Apache License, Version 2.0 (the "License");
 + * you may not use this file except in compliance with the License.
 + * You may obtain a copy of the License at
 + *
 + *       http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.karaf.cellar.dosgi.internal.osgi;
 +
 +import org.apache.karaf.cellar.core.ClusterManager;
 +import org.apache.karaf.cellar.core.command.CommandStore;
 +import org.apache.karaf.cellar.core.event.EventHandler;
 +import org.apache.karaf.cellar.core.event.EventTransportFactory;
 +import org.apache.karaf.cellar.dosgi.*;
 +import org.apache.karaf.cellar.dosgi.management.internal.ServiceMBeanImpl;
 +import org.apache.karaf.util.tracker.BaseActivator;
 +import org.apache.karaf.util.tracker.annotation.ProvideService;
 +import org.apache.karaf.util.tracker.annotation.RequireService;
 +import org.apache.karaf.util.tracker.annotation.Services;
 +import org.osgi.framework.ServiceRegistration;
 +import org.osgi.framework.hooks.service.ListenerHook;
 +import org.osgi.service.cm.ConfigurationAdmin;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import java.util.Hashtable;
 +
 +@Services(
 +        provides = {
 +                @ProvideService(ListenerHook.class),
 +                @ProvideService(EventHandler.class)
 +        },
 +        requires = {
 +                @RequireService(ClusterManager.class),
 +                @RequireService(EventTransportFactory.class),
 +                @RequireService(CommandStore.class),
 +                @RequireService(ConfigurationAdmin.class)
 +        }
 +)
 +public class Activator extends BaseActivator {
 +
 +    private final static Logger LOGGER = LoggerFactory.getLogger(Activator.class);
 +
 +    private ImportServiceListener importServiceListener;
 +    private ExportServiceListener exportServiceListener;
++    private RemovedNodeServiceTracker removedNodeServiceTracker;
 +    private ServiceRegistration mbeanRegistration;
 +
 +    @Override
 +    public void doStart() throws Exception {
 +
 +        ClusterManager clusterManager = getTrackedService(ClusterManager.class);
 +        if (clusterManager == null)
 +            return;
 +        EventTransportFactory eventTransportFactory = getTrackedService(EventTransportFactory.class);
 +        if (eventTransportFactory == null)
 +            return;
 +        CommandStore commandStore = getTrackedService(CommandStore.class);
 +        if (commandStore == null)
 +            return;
 +        ConfigurationAdmin configurationAdmin = getTrackedService(ConfigurationAdmin.class);
 +        if (configurationAdmin == null)
 +            return;
 +
 +        LOGGER.debug("CELLAR DOSGI: init remote service call handler");
 +        RemoteServiceCallHandler remoteServiceCallHandler = new RemoteServiceCallHandler();
 +        remoteServiceCallHandler.setEventTransportFactory(eventTransportFactory);
 +        remoteServiceCallHandler.setClusterManager(clusterManager);
 +        remoteServiceCallHandler.setBundleContext(bundleContext);
 +        remoteServiceCallHandler.setConfigurationAdmin(configurationAdmin);
 +        Hashtable props = new Hashtable();
 +        props.put("managed", "true");
 +        register(EventHandler.class, remoteServiceCallHandler, props);
 +
 +        LOGGER.debug("CELLAR DOSGI: init remote service result handler");
 +        RemoteServiceResultHandler remoteServiceResultHandler = new RemoteServiceResultHandler();
 +        remoteServiceResultHandler.setCommandStore(commandStore);
 +        register(EventHandler.class, remoteServiceResultHandler);
 +
 +        LOGGER.debug("CELLAR DOSGI: init import service listener");
 +        importServiceListener = new ImportServiceListener();
 +        importServiceListener.setClusterManager(clusterManager);
 +        importServiceListener.setEventTransportFactory(eventTransportFactory);
 +        importServiceListener.setCommandStore(commandStore);
 +        importServiceListener.setBundleContext(bundleContext);
 +        importServiceListener.init();
 +        register(ListenerHook.class, importServiceListener);
 +
 +        LOGGER.debug("CELLAR DOSGI: init export service listener");
 +        exportServiceListener = new ExportServiceListener();
 +        exportServiceListener.setClusterManager(clusterManager);
 +        exportServiceListener.setEventTransportFactory(eventTransportFactory);
 +        exportServiceListener.setBundleContext(bundleContext);
 +        exportServiceListener.init();
 +
++        LOGGER.debug("CELLAR DOSGI: start removed nodes service tracker");
++        removedNodeServiceTracker = new RemovedNodeServiceTracker();
++        removedNodeServiceTracker.setClusterManager(clusterManager);
++        removedNodeServiceTracker.init();
++
 +        LOGGER.debug("CELLAR DOSGI: register MBean");
 +        ServiceMBeanImpl mbean = new ServiceMBeanImpl();
 +        mbean.setClusterManager(clusterManager);
 +        props = new Hashtable();
 +        props.put("jmx.objectname", "org.apache.karaf.cellar:type=service,name=" + System.getProperty("karaf.name"));
 +        mbeanRegistration = bundleContext.registerService(getInterfaceNames(mbean), mbean, props);
 +    }
 +
 +    @Override
 +    public void doStop() {
 +        super.doStop();
 +
 +        if (mbeanRegistration != null) {
 +            mbeanRegistration.unregister();
 +            mbeanRegistration = null;
 +        }
++
++        if (removedNodeServiceTracker != null) {
++            removedNodeServiceTracker.destroy();
++            removedNodeServiceTracker = null;
++        }
++
 +        if (exportServiceListener != null) {
 +            exportServiceListener.destroy();
 +            exportServiceListener = null;
 +        }
 +        if (importServiceListener != null) {
 +            importServiceListener.destroy();
 +            importServiceListener = null;
 +        }
 +    }
 +
 +}

http://git-wip-us.apache.org/repos/asf/karaf-cellar/blob/778fcd4c/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/internal/osgi/RemovedNodeServiceTracker.java
----------------------------------------------------------------------
diff --cc dosgi/src/main/java/org/apache/karaf/cellar/dosgi/internal/osgi/RemovedNodeServiceTracker.java
index 0000000,0000000..1bad57a
new file mode 100644
--- /dev/null
+++ b/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/internal/osgi/RemovedNodeServiceTracker.java
@@@ -1,0 -1,0 +1,111 @@@
++/*
++ * Licensed under the Apache License, Version 2.0 (the "License");
++ * you may not use this file except in compliance with the License.
++ * You may obtain a copy of the License at
++ *
++ *       http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.karaf.cellar.dosgi.internal.osgi;
++
++import org.apache.karaf.cellar.core.ClusterManager;
++import org.apache.karaf.cellar.core.Node;
++import org.apache.karaf.cellar.dosgi.Constants;
++import org.apache.karaf.cellar.dosgi.EndpointDescription;
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
++
++import java.util.HashSet;
++import java.util.Map;
++import java.util.Set;
++import java.util.concurrent.Executors;
++import java.util.concurrent.ScheduledExecutorService;
++import java.util.concurrent.TimeUnit;
++
++/**
++ * Listener called when a new service is exported.
++ */
++public class RemovedNodeServiceTracker implements Runnable {
++
++    private static final transient Logger LOGGER = LoggerFactory.getLogger(RemovedNodeServiceTracker.class);
++
++    private ClusterManager clusterManager;
++
++    private Map<String, EndpointDescription> remoteEndpoints;
++
++    private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
++
++    public void init() {
++        remoteEndpoints = clusterManager.getMap(Constants.REMOTE_ENDPOINTS);
++        scheduler.scheduleWithFixedDelay(this, 10, 10, TimeUnit.SECONDS);
++    }
++
++    public void destroy() {
++        scheduler.shutdown();
++    }
++
++    public ClusterManager getClusterManager() {
++        return clusterManager;
++    }
++
++    public void setClusterManager(ClusterManager clusterManager) {
++        this.clusterManager = clusterManager;
++    }
++
++
++    @Override
++    public void run() {
++        LOGGER.trace("CELLAR DOSGI: running the service tracker task");
++        ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();
++        try {
++            if (!remoteEndpoints.isEmpty()) {
++                LOGGER.trace("CELLAR DOSGI: found {} remote endpoints", remoteEndpoints.size());
++                Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
++                final Set<Node> activeNodes = clusterManager.listNodes();
++
++                // create a clone of remote endpoint to avoid concurrency concerns while iterating it
++                final Set<Map.Entry<String, EndpointDescription>> list = new HashSet<Map.Entry<String, EndpointDescription>>(remoteEndpoints.entrySet());
++
++                for (Map.Entry<String, EndpointDescription> entry : list) {
++                    final EndpointDescription endpointDescription = entry.getValue();
++                    final String key = entry.getKey();
++
++                    // create a clone of nodes to avoid concurrency concerns while iterating it
++                    final Set<Node> nodes = new HashSet<Node>(endpointDescription.getNodes());
++
++                    boolean endpointChanged = false;
++                    for (Node n : nodes) {
++                        if (!activeNodes.contains(n)) {
++                            LOGGER.debug("CELLAR DOSGI: removing node with id {} since it is not active", n.getId());
++                            endpointDescription.getNodes().remove(n);
++                            endpointChanged = true;
++                        }
++                    }
++
++                    if (endpointChanged) {
++                        // if the endpoint is used for export from other nodes too, then update it
++                        if (endpointDescription.getNodes().size() > 0) {
++                            LOGGER.debug("CELLAR DOSGI: updating remote endpoint {}", key);
++                            remoteEndpoints.put(key, endpointDescription);
++                        } else { // remove endpoint permanently
++                            LOGGER.debug("CELLAR DOSGI: removing remote endpoint {}", key);
++                            remoteEndpoints.remove(key);
++                        }
++                    }
++                }
++            } else {
++                LOGGER.trace("CELLAR DOSGI: no remote endpoints found");
++            }
++        } catch (Exception e) {
++            LOGGER.warn("CELLAR DOSGI: failed to run service tracker task", e);
++        } finally {
++            Thread.currentThread().setContextClassLoader(originalClassLoader);
++        }
++    }
++
++}