You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@karaf.apache.org by jb...@apache.org on 2015/09/20 11:33:39 UTC

[2/2] karaf-cellar git commit: Merge branch 'feature_add_service_tracker_to_unregist_remove_service' of https://github.com/FlavioF/karaf-cellar

Merge branch 'feature_add_service_tracker_to_unregist_remove_service' of https://github.com/FlavioF/karaf-cellar

Conflicts:
	dosgi/src/main/resources/OSGI-INF/blueprint/blueprint.xml


Project: http://git-wip-us.apache.org/repos/asf/karaf-cellar/repo
Commit: http://git-wip-us.apache.org/repos/asf/karaf-cellar/commit/778fcd4c
Tree: http://git-wip-us.apache.org/repos/asf/karaf-cellar/tree/778fcd4c
Diff: http://git-wip-us.apache.org/repos/asf/karaf-cellar/diff/778fcd4c

Branch: refs/heads/master
Commit: 778fcd4c2fbc852ac9b1b0d78673b2a07590ea47
Parents: 9e42597 de8162f
Author: Jean-Baptiste Onofré <jb...@apache.org>
Authored: Sun Sep 20 11:33:28 2015 +0200
Committer: Jean-Baptiste Onofré <jb...@apache.org>
Committed: Sun Sep 20 11:33:28 2015 +0200

----------------------------------------------------------------------
 .../cellar/dosgi/internal/osgi/Activator.java   |  12 ++
 .../osgi/RemovedNodeServiceTracker.java         | 111 +++++++++++++++++++
 .../hazelcast/HazelcastClusterManager.java      |   6 +-
 .../cellar/hazelcast/HazelcastGroupManager.java |   2 +-
 .../hazelcast/HazelcastInstanceAware.java       |   2 +-
 .../karaf/cellar/hazelcast/QueueConsumer.java   |  36 +++---
 6 files changed, 149 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/karaf-cellar/blob/778fcd4c/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/internal/osgi/Activator.java
----------------------------------------------------------------------
diff --cc dosgi/src/main/java/org/apache/karaf/cellar/dosgi/internal/osgi/Activator.java
index b3c924f,0000000..b7b360e
mode 100644,000000..100644
--- a/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/internal/osgi/Activator.java
+++ b/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/internal/osgi/Activator.java
@@@ -1,127 -1,0 +1,139 @@@
 +/*
 + * Licensed under the Apache License, Version 2.0 (the "License");
 + * you may not use this file except in compliance with the License.
 + * You may obtain a copy of the License at
 + *
 + *       http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.karaf.cellar.dosgi.internal.osgi;
 +
 +import org.apache.karaf.cellar.core.ClusterManager;
 +import org.apache.karaf.cellar.core.command.CommandStore;
 +import org.apache.karaf.cellar.core.event.EventHandler;
 +import org.apache.karaf.cellar.core.event.EventTransportFactory;
 +import org.apache.karaf.cellar.dosgi.*;
 +import org.apache.karaf.cellar.dosgi.management.internal.ServiceMBeanImpl;
 +import org.apache.karaf.util.tracker.BaseActivator;
 +import org.apache.karaf.util.tracker.annotation.ProvideService;
 +import org.apache.karaf.util.tracker.annotation.RequireService;
 +import org.apache.karaf.util.tracker.annotation.Services;
 +import org.osgi.framework.ServiceRegistration;
 +import org.osgi.framework.hooks.service.ListenerHook;
 +import org.osgi.service.cm.ConfigurationAdmin;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import java.util.Hashtable;
 +
 +@Services(
 +        provides = {
 +                @ProvideService(ListenerHook.class),
 +                @ProvideService(EventHandler.class)
 +        },
 +        requires = {
 +                @RequireService(ClusterManager.class),
 +                @RequireService(EventTransportFactory.class),
 +                @RequireService(CommandStore.class),
 +                @RequireService(ConfigurationAdmin.class)
 +        }
 +)
 +public class Activator extends BaseActivator {
 +
 +    private final static Logger LOGGER = LoggerFactory.getLogger(Activator.class);
 +
 +    private ImportServiceListener importServiceListener;
 +    private ExportServiceListener exportServiceListener;
++    private RemovedNodeServiceTracker removedNodeServiceTracker;
 +    private ServiceRegistration mbeanRegistration;
 +
 +    @Override
 +    public void doStart() throws Exception {
 +
 +        ClusterManager clusterManager = getTrackedService(ClusterManager.class);
 +        if (clusterManager == null)
 +            return;
 +        EventTransportFactory eventTransportFactory = getTrackedService(EventTransportFactory.class);
 +        if (eventTransportFactory == null)
 +            return;
 +        CommandStore commandStore = getTrackedService(CommandStore.class);
 +        if (commandStore == null)
 +            return;
 +        ConfigurationAdmin configurationAdmin = getTrackedService(ConfigurationAdmin.class);
 +        if (configurationAdmin == null)
 +            return;
 +
 +        LOGGER.debug("CELLAR DOSGI: init remote service call handler");
 +        RemoteServiceCallHandler remoteServiceCallHandler = new RemoteServiceCallHandler();
 +        remoteServiceCallHandler.setEventTransportFactory(eventTransportFactory);
 +        remoteServiceCallHandler.setClusterManager(clusterManager);
 +        remoteServiceCallHandler.setBundleContext(bundleContext);
 +        remoteServiceCallHandler.setConfigurationAdmin(configurationAdmin);
 +        Hashtable props = new Hashtable();
 +        props.put("managed", "true");
 +        register(EventHandler.class, remoteServiceCallHandler, props);
 +
 +        LOGGER.debug("CELLAR DOSGI: init remote service result handler");
 +        RemoteServiceResultHandler remoteServiceResultHandler = new RemoteServiceResultHandler();
 +        remoteServiceResultHandler.setCommandStore(commandStore);
 +        register(EventHandler.class, remoteServiceResultHandler);
 +
 +        LOGGER.debug("CELLAR DOSGI: init import service listener");
 +        importServiceListener = new ImportServiceListener();
 +        importServiceListener.setClusterManager(clusterManager);
 +        importServiceListener.setEventTransportFactory(eventTransportFactory);
 +        importServiceListener.setCommandStore(commandStore);
 +        importServiceListener.setBundleContext(bundleContext);
 +        importServiceListener.init();
 +        register(ListenerHook.class, importServiceListener);
 +
 +        LOGGER.debug("CELLAR DOSGI: init export service listener");
 +        exportServiceListener = new ExportServiceListener();
 +        exportServiceListener.setClusterManager(clusterManager);
 +        exportServiceListener.setEventTransportFactory(eventTransportFactory);
 +        exportServiceListener.setBundleContext(bundleContext);
 +        exportServiceListener.init();
 +
++        LOGGER.debug("CELLAR DOSGI: start removed nodes service tracker");
++        removedNodeServiceTracker = new RemovedNodeServiceTracker();
++        removedNodeServiceTracker.setClusterManager(clusterManager);
++        removedNodeServiceTracker.init();
++
 +        LOGGER.debug("CELLAR DOSGI: register MBean");
 +        ServiceMBeanImpl mbean = new ServiceMBeanImpl();
 +        mbean.setClusterManager(clusterManager);
 +        props = new Hashtable();
 +        props.put("jmx.objectname", "org.apache.karaf.cellar:type=service,name=" + System.getProperty("karaf.name"));
 +        mbeanRegistration = bundleContext.registerService(getInterfaceNames(mbean), mbean, props);
 +    }
 +
 +    @Override
 +    public void doStop() {
 +        super.doStop();
 +
 +        if (mbeanRegistration != null) {
 +            mbeanRegistration.unregister();
 +            mbeanRegistration = null;
 +        }
++
++        if (removedNodeServiceTracker != null) {
++            removedNodeServiceTracker.destroy();
++            removedNodeServiceTracker = null;
++        }
++
 +        if (exportServiceListener != null) {
 +            exportServiceListener.destroy();
 +            exportServiceListener = null;
 +        }
 +        if (importServiceListener != null) {
 +            importServiceListener.destroy();
 +            importServiceListener = null;
 +        }
 +    }
 +
 +}

http://git-wip-us.apache.org/repos/asf/karaf-cellar/blob/778fcd4c/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/internal/osgi/RemovedNodeServiceTracker.java
----------------------------------------------------------------------
diff --cc dosgi/src/main/java/org/apache/karaf/cellar/dosgi/internal/osgi/RemovedNodeServiceTracker.java
index 0000000,0000000..1bad57a
new file mode 100644
--- /dev/null
+++ b/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/internal/osgi/RemovedNodeServiceTracker.java
@@@ -1,0 -1,0 +1,111 @@@
++/*
++ * Licensed under the Apache License, Version 2.0 (the "License");
++ * you may not use this file except in compliance with the License.
++ * You may obtain a copy of the License at
++ *
++ *       http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.karaf.cellar.dosgi.internal.osgi;
++
++import org.apache.karaf.cellar.core.ClusterManager;
++import org.apache.karaf.cellar.core.Node;
++import org.apache.karaf.cellar.dosgi.Constants;
++import org.apache.karaf.cellar.dosgi.EndpointDescription;
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
++
++import java.util.HashSet;
++import java.util.Map;
++import java.util.Set;
++import java.util.concurrent.Executors;
++import java.util.concurrent.ScheduledExecutorService;
++import java.util.concurrent.TimeUnit;
++
++/**
++ * Listener called when a new service is exported.
++ */
++public class RemovedNodeServiceTracker implements Runnable {
++
++    private static final transient Logger LOGGER = LoggerFactory.getLogger(RemovedNodeServiceTracker.class);
++
++    private ClusterManager clusterManager;
++
++    private Map<String, EndpointDescription> remoteEndpoints;
++
++    private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
++
++    public void init() {
++        remoteEndpoints = clusterManager.getMap(Constants.REMOTE_ENDPOINTS);
++        scheduler.scheduleWithFixedDelay(this, 10, 10, TimeUnit.SECONDS);
++    }
++
++    public void destroy() {
++        scheduler.shutdown();
++    }
++
++    public ClusterManager getClusterManager() {
++        return clusterManager;
++    }
++
++    public void setClusterManager(ClusterManager clusterManager) {
++        this.clusterManager = clusterManager;
++    }
++
++
++    @Override
++    public void run() {
++        LOGGER.trace("CELLAR DOSGI: running the service tracker task");
++        ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();
++        try {
++            if (!remoteEndpoints.isEmpty()) {
++                LOGGER.trace("CELLAR DOSGI: found {} remote endpoints", remoteEndpoints.size());
++                Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
++                final Set<Node> activeNodes = clusterManager.listNodes();
++
++                // create a clone of remote endpoint to avoid concurrency concerns while iterating it
++                final Set<Map.Entry<String, EndpointDescription>> list = new HashSet<Map.Entry<String, EndpointDescription>>(remoteEndpoints.entrySet());
++
++                for (Map.Entry<String, EndpointDescription> entry : list) {
++                    final EndpointDescription endpointDescription = entry.getValue();
++                    final String key = entry.getKey();
++
++                    // create a clone of nodes to avoid concurrency concerns while iterating it
++                    final Set<Node> nodes = new HashSet<Node>(endpointDescription.getNodes());
++
++                    boolean endpointChanged = false;
++                    for (Node n : nodes) {
++                        if (!activeNodes.contains(n)) {
++                            LOGGER.debug("CELLAR DOSGI: removing node with id {} since it is not active", n.getId());
++                            endpointDescription.getNodes().remove(n);
++                            endpointChanged = true;
++                        }
++                    }
++
++                    if (endpointChanged) {
++                        // if the endpoint is used for export from other nodes too, then update it
++                        if (endpointDescription.getNodes().size() > 0) {
++                            LOGGER.debug("CELLAR DOSGI: updating remote endpoint {}", key);
++                            remoteEndpoints.put(key, endpointDescription);
++                        } else { // remove endpoint permanently
++                            LOGGER.debug("CELLAR DOSGI: removing remote endpoint {}", key);
++                            remoteEndpoints.remove(key);
++                        }
++                    }
++                }
++            } else {
++                LOGGER.trace("CELLAR DOSGI: no remote endpoints found");
++            }
++        } catch (Exception e) {
++            LOGGER.warn("CELLAR DOSGI: failed to run service tracker task", e);
++        } finally {
++            Thread.currentThread().setContextClassLoader(originalClassLoader);
++        }
++    }
++
++}