You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aries.apache.org by cs...@apache.org on 2016/03/14 17:49:38 UTC
[4/5] aries-rsa git commit: Adding roundtrip test
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitor.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitor.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitor.java
new file mode 100644
index 0000000..f50848f
--- /dev/null
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitor.java
@@ -0,0 +1,262 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.discovery.zookeeper.subscribe;
+
+import java.io.ByteArrayInputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.aries.rsa.discovery.endpoint.EndpointDescriptionParser;
+import org.apache.aries.rsa.discovery.endpoint.PropertiesMapper;
+import org.apache.aries.rsa.discovery.zookeeper.util.Utils;
+import org.apache.zookeeper.AsyncCallback.StatCallback;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+import org.osgi.service.remoteserviceadmin.EndpointDescription;
+import org.osgi.service.remoteserviceadmin.EndpointListener;
+import org.osgi.xmlns.rsa.v1_0.EndpointDescriptionType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Monitors ZooKeeper for changes in published endpoints.
+ * <p>
+ * Specifically, it monitors the node path associated with a given interface class,
+ * whose data is a serialized version of an EndpointDescription, and notifies an
+ * EndpointListener when changes are detected (which can then propagate the
+ * notification to other EndpointListeners with a matching scope).
+ * <p>
+ * Note that the EndpointListener is used here as a decoupling interface for
+ * convenience, and is not necessarily used according to its documented contract.
+ */
+public class InterfaceMonitor implements Watcher, StatCallback {
+
+ private static final Logger LOG = LoggerFactory.getLogger(InterfaceMonitor.class);
+
+ private final String znode;
+ private final ZooKeeper zk;
+ private final EndpointListener endpointListener;
+ private final boolean recursive;
+ private volatile boolean closed;
+
+ // This map reference changes, so don't synchronize on it
+ private Map<String, EndpointDescription> nodes = new HashMap<String, EndpointDescription>();
+
+ private EndpointDescriptionParser parser;
+
+ public InterfaceMonitor(ZooKeeper zk, String objClass, EndpointListener endpointListener, String scope) {
+ this.zk = zk;
+ this.znode = Utils.getZooKeeperPath(objClass);
+ this.recursive = objClass == null || objClass.isEmpty();
+ this.endpointListener = endpointListener;
+ this.parser = new EndpointDescriptionParser();
+ LOG.debug("Creating new InterfaceMonitor {} for scope [{}] and objectClass [{}]",
+ new Object[] {recursive ? "(recursive)" : "", scope, objClass});
+ }
+
+ /**
+ * Returns all endpoints that are currently known to this monitor.
+ *
+ * @return all endpoints that are currently known to this monitor
+ */
+ public synchronized List<EndpointDescription> getEndpoints() {
+ return new ArrayList<EndpointDescription>(nodes.values());
+ }
+
+ public void start() {
+ watch();
+ }
+
+ private void watch() {
+ LOG.debug("registering a ZooKeeper.exists({}) callback", znode);
+ zk.exists(znode, this, this, null);
+ }
+
+ /**
+ * Zookeeper Watcher interface callback.
+ */
+ public void process(WatchedEvent event) {
+ LOG.debug("ZooKeeper watcher callback on node {} for event {}", znode, event);
+ processDelta();
+ }
+
+ /**
+ * Zookeeper StatCallback interface callback.
+ */
+ @SuppressWarnings("deprecation")
+ public void processResult(int rc, String path, Object ctx, Stat stat) {
+ LOG.debug("ZooKeeper callback on node: {} code: {}", znode, rc);
+
+ switch (rc) {
+ case Code.Ok:
+ case Code.NoNode:
+ processDelta();
+ return;
+
+ case Code.SessionExpired:
+ case Code.NoAuth:
+ case Code.ConnectionLoss:
+ return;
+
+ default:
+ watch();
+ }
+ }
+
+ private void processDelta() {
+ if (closed) {
+ return;
+ }
+
+ if (zk.getState() != ZooKeeper.States.CONNECTED) {
+ LOG.debug("ZooKeeper connection was already closed! Not processing changed event.");
+ return;
+ }
+
+ try {
+ if (zk.exists(znode, false) != null) {
+ zk.getChildren(znode, this);
+ refreshNodes();
+ } else {
+ LOG.debug("znode {} doesn't exist -> not processing any changes", znode);
+ }
+ } catch (Exception e) {
+ if (zk.getState() != ZooKeeper.States.CONNECTED) {
+ LOG.debug("Error getting Zookeeper data: " + e); // e.g. session expired, handled by ZooKeeperDiscovery
+ } else {
+ LOG.error("Error getting ZooKeeper data.", e);
+ }
+ }
+ }
+
+ public synchronized void close() {
+ closed = true;
+ for (EndpointDescription endpoint : nodes.values()) {
+ endpointListener.endpointRemoved(endpoint, null);
+ }
+ nodes.clear();
+ }
+
+ private synchronized void refreshNodes() {
+ if (closed) {
+ return;
+ }
+ LOG.info("Processing change on node: {}", znode);
+
+ Map<String, EndpointDescription> newNodes = new HashMap<String, EndpointDescription>();
+ Map<String, EndpointDescription> prevNodes = new HashMap<String, EndpointDescription>(nodes);
+ processChildren(znode, newNodes, prevNodes);
+
+ // whatever is left in prevNodes now has been removed from Discovery
+ LOG.debug("processChildren done. Nodes that are missing now and need to be removed: {}", prevNodes.values());
+ for (EndpointDescription endpoint : prevNodes.values()) {
+ endpointListener.endpointRemoved(endpoint, null);
+ }
+ nodes = newNodes;
+ }
+
+ /**
+ * Iterates through all child nodes of the given node and tries to find
+ * endpoints. If the recursive flag is set it also traverses into the child
+ * nodes.
+ *
+ * @return true if an endpoint was found and if the node therefore needs to
+ * be monitored for changes
+ */
+ private boolean processChildren(String zn, Map<String, EndpointDescription> newNodes,
+ Map<String, EndpointDescription> prevNodes) {
+ List<String> children;
+ try {
+ LOG.debug("Processing the children of {}", zn);
+ children = zk.getChildren(zn, false);
+
+ boolean foundANode = false;
+ for (String child : children) {
+ String childZNode = zn + '/' + child;
+ EndpointDescription endpoint = getEndpointDescriptionFromNode(childZNode);
+ if (endpoint != null) {
+ EndpointDescription prevEndpoint = prevNodes.get(child);
+ LOG.info("found new node " + zn + "/[" + child + "] ( []->child ) props: "
+ + endpoint.getProperties().values());
+ newNodes.put(child, endpoint);
+ prevNodes.remove(child);
+ foundANode = true;
+ LOG.debug("Properties: {}", endpoint.getProperties());
+ if (prevEndpoint == null) {
+ // This guy is new
+ endpointListener.endpointAdded(endpoint, null);
+ } else if (!prevEndpoint.getProperties().equals(endpoint.getProperties())) {
+ // TODO
+ }
+ }
+ if (recursive && processChildren(childZNode, newNodes, prevNodes)) {
+ zk.getChildren(childZNode, this);
+ }
+ }
+
+ return foundANode;
+ } catch (KeeperException e) {
+ LOG.error("Problem processing ZooKeeper node", e);
+ } catch (InterruptedException e) {
+ LOG.error("Problem processing ZooKeeper node", e);
+ }
+ return false;
+ }
+
+ /**
+ * Retrieves data from the given node and parses it into an EndpointDescription.
+ *
+ * @param node a node path
+ * @return endpoint found in the node or null if no endpoint was found
+ */
+ private EndpointDescription getEndpointDescriptionFromNode(String node) {
+ try {
+ Stat stat = zk.exists(node, false);
+ if (stat == null || stat.getDataLength() <= 0) {
+ return null;
+ }
+ byte[] data = zk.getData(node, false, null);
+ LOG.debug("Got data for node: {}", node);
+
+ EndpointDescription endpoint = getFirstEnpointDescription(data);
+ if (endpoint != null) {
+ return endpoint;
+ }
+ LOG.warn("No Discovery information found for node: {}", node);
+ } catch (Exception e) {
+ LOG.error("Problem getting EndpointDescription from node " + node, e);
+ }
+ return null;
+ }
+
+ public EndpointDescription getFirstEnpointDescription(byte[] data) {
+ List<EndpointDescriptionType> elements = parser.getEndpointDescriptions(new ByteArrayInputStream(data));
+ if (elements.isEmpty()) {
+ return null;
+ }
+ Map<String, Object> props = new PropertiesMapper().toProps(elements.get(0).getProperty());
+ return new EndpointDescription(props);
+ }
+}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManager.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManager.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManager.java
new file mode 100644
index 0000000..3a02a48
--- /dev/null
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManager.java
@@ -0,0 +1,261 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.discovery.zookeeper.subscribe;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.aries.rsa.discovery.zookeeper.ZooKeeperDiscovery;
+import org.apache.aries.rsa.discovery.zookeeper.util.Utils;
+import org.apache.aries.rsa.util.StringPlus;
+import org.apache.zookeeper.ZooKeeper;
+import org.osgi.framework.Bundle;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.Filter;
+import org.osgi.framework.ServiceReference;
+import org.osgi.service.remoteserviceadmin.EndpointDescription;
+import org.osgi.service.remoteserviceadmin.EndpointListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manages the EndpointListeners and the scopes they are interested in.
+ * For each scope with interested EndpointListeners an InterfaceMonitor is created.
+ * The InterfaceMonitor calls back when it detects added or removed external Endpoints.
+ * These events are then forwarded to all interested EndpointListeners.
+ */
+public class InterfaceMonitorManager {
+ private static final Logger LOG = LoggerFactory.getLogger(InterfaceMonitorManager.class);
+ private static final Pattern OBJECTCLASS_PATTERN = Pattern.compile(".*\\(objectClass=([^)]+)\\).*");
+
+ private final BundleContext bctx;
+ private final ZooKeeper zk;
+ // map of EndpointListeners and the scopes they are interested in
+ private final Map<ServiceReference<EndpointListener>, List<String>> endpointListenerScopes =
+ new HashMap<ServiceReference<EndpointListener>, List<String>>();
+ // map of scopes and their interest data
+ private final Map<String, Interest> interests = new HashMap<String, Interest>();
+
+ protected static class Interest {
+ List<ServiceReference<EndpointListener>> endpointListeners =
+ new CopyOnWriteArrayList<ServiceReference<EndpointListener>>();
+ InterfaceMonitor monitor;
+ }
+
+ public InterfaceMonitorManager(BundleContext bctx, ZooKeeper zk) {
+ this.bctx = bctx;
+ this.zk = zk;
+ }
+
+ public void addInterest(ServiceReference<EndpointListener> endpointListener) {
+ if (isOurOwnEndpointListener(endpointListener)) {
+ LOG.debug("Skipping our own EndpointListener");
+ return;
+ }
+
+ LOG.info("updating EndpointListener interests: {}", endpointListener);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("updated EndpointListener properties: {}", getProperties(endpointListener));
+ }
+ for (String scope : getScopes(endpointListener)) {
+ String objClass = getObjectClass(scope);
+ LOG.debug("Adding interest in scope {}, objectClass {}", scope, objClass);
+ addInterest(endpointListener, scope, objClass);
+ }
+ }
+
+ private static boolean isOurOwnEndpointListener(ServiceReference<EndpointListener> endpointListener) {
+ return Boolean.parseBoolean(String.valueOf(
+ endpointListener.getProperty(ZooKeeperDiscovery.DISCOVERY_ZOOKEEPER_ID)));
+ }
+
+ public synchronized void addInterest(ServiceReference<EndpointListener> endpointListener,
+ String scope, String objClass) {
+ // get or create interest for given scope and add listener to it
+ Interest interest = interests.get(scope);
+ if (interest == null) {
+ // create interest, add listener and start monitor
+ interest = new Interest();
+ interests.put(scope, interest);
+ interest.endpointListeners.add(endpointListener); // add it before monitor starts so we don't miss events
+ interest.monitor = createInterfaceMonitor(scope, objClass, interest);
+ interest.monitor.start();
+ } else {
+ // interest already exists, so just add listener to it
+ if (!interest.endpointListeners.contains(endpointListener)) {
+ interest.endpointListeners.add(endpointListener);
+ }
+ // notify listener of all known endpoints for given scope
+ // (as EndpointListener contract requires of all added/modified listeners)
+ for (EndpointDescription endpoint : interest.monitor.getEndpoints()) {
+ notifyListeners(endpoint, scope, true, Arrays.asList(endpointListener));
+ }
+ }
+
+ // add scope to listener's scopes list
+ List<String> scopes = endpointListenerScopes.get(endpointListener);
+ if (scopes == null) {
+ scopes = new ArrayList<String>(1);
+ endpointListenerScopes.put(endpointListener, scopes);
+ }
+ if (!scopes.contains(scope)) {
+ scopes.add(scope);
+ }
+ }
+
+ public synchronized void removeInterest(ServiceReference<EndpointListener> endpointListener) {
+ LOG.info("removing EndpointListener interests: {}", endpointListener);
+ List<String> scopes = endpointListenerScopes.get(endpointListener);
+ if (scopes == null) {
+ return;
+ }
+
+ for (String scope : scopes) {
+ Interest interest = interests.get(scope);
+ if (interest != null) {
+ interest.endpointListeners.remove(endpointListener);
+ if (interest.endpointListeners.isEmpty()) {
+ interest.monitor.close();
+ interests.remove(scope);
+ }
+ }
+ }
+ endpointListenerScopes.remove(endpointListener);
+ }
+
+ protected InterfaceMonitor createInterfaceMonitor(final String scope, String objClass, final Interest interest) {
+ // holding this object's lock in the callbacks can lead to a deadlock with InterfaceMonitor
+ EndpointListener endpointListener = new EndpointListener() {
+
+ public void endpointRemoved(EndpointDescription endpoint, String matchedFilter) {
+ notifyListeners(endpoint, scope, false, interest.endpointListeners);
+ }
+
+ public void endpointAdded(EndpointDescription endpoint, String matchedFilter) {
+ notifyListeners(endpoint, scope, true, interest.endpointListeners);
+ }
+ };
+ return new InterfaceMonitor(zk, objClass, endpointListener, scope);
+ }
+
+ private void notifyListeners(EndpointDescription endpoint, String currentScope, boolean isAdded,
+ List<ServiceReference<EndpointListener>> endpointListeners) {
+ for (ServiceReference<EndpointListener> endpointListenerRef : endpointListeners) {
+ EndpointListener service = bctx.getService(endpointListenerRef);
+ try {
+ EndpointListener endpointListener = (EndpointListener)service;
+ LOG.trace("matching {} against {}", endpoint, currentScope);
+ if (matchFilter(bctx, currentScope, endpoint)) {
+ LOG.debug("Matched {} against {}", endpoint, currentScope);
+ notifyListener(endpoint, currentScope, isAdded, endpointListenerRef.getBundle(),
+ endpointListener);
+ }
+ } finally {
+ if (service != null) {
+ bctx.ungetService(endpointListenerRef);
+ }
+ }
+ }
+ }
+
+ private static boolean matchFilter(BundleContext bctx, String filter, EndpointDescription endpoint) {
+ if (filter == null) {
+ return false;
+ }
+
+ try {
+ Filter f = bctx.createFilter(filter);
+ Dictionary<String, Object> dict = new Hashtable<String, Object>(endpoint.getProperties());
+ return f.match(dict);
+ } catch (Exception e) {
+ return false;
+ }
+ }
+
+
+ private void notifyListener(EndpointDescription endpoint, String currentScope, boolean isAdded,
+ Bundle endpointListenerBundle, EndpointListener endpointListener) {
+ if (endpointListenerBundle == null) {
+ LOG.info("listening service was unregistered, ignoring");
+ } else if (isAdded) {
+ LOG.info("calling EndpointListener.endpointAdded: " + endpointListener + " from bundle "
+ + endpointListenerBundle.getSymbolicName() + " for endpoint: " + endpoint);
+ endpointListener.endpointAdded(endpoint, currentScope);
+ } else {
+ LOG.info("calling EndpointListener.endpointRemoved: " + endpointListener + " from bundle "
+ + endpointListenerBundle.getSymbolicName() + " for endpoint: " + endpoint);
+ endpointListener.endpointRemoved(endpoint, currentScope);
+ }
+ }
+
+ public synchronized void close() {
+ for (Interest interest : interests.values()) {
+ interest.monitor.close();
+ }
+ interests.clear();
+ endpointListenerScopes.clear();
+ }
+
+ /**
+ * Only for test case!
+ */
+ protected synchronized Map<String, Interest> getInterests() {
+ return interests;
+ }
+
+ /**
+ * Only for test case!
+ */
+ protected synchronized Map<ServiceReference<EndpointListener>, List<String>> getEndpointListenerScopes() {
+ return endpointListenerScopes;
+ }
+
+ protected List<String> getScopes(ServiceReference<?> sref) {
+ return Utils.removeEmpty(StringPlus.normalize(sref.getProperty(EndpointListener.ENDPOINT_LISTENER_SCOPE)));
+ }
+
+ public static String getObjectClass(String scope) {
+ Matcher m = OBJECTCLASS_PATTERN.matcher(scope);
+ return m.matches() ? m.group(1) : null;
+ }
+
+ /**
+ * Returns a service's properties as a map.
+ *
+ * @param serviceReference a service reference
+ * @return the service's properties as a map
+ */
+ public static Map<String, Object> getProperties(ServiceReference<?> serviceReference) {
+ String[] keys = serviceReference.getPropertyKeys();
+ Map<String, Object> props = new HashMap<String, Object>(keys.length);
+ for (String key : keys) {
+ Object val = serviceReference.getProperty(key);
+ props.put(key, val);
+ }
+ return props;
+ }
+}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/util/Utils.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/util/Utils.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/util/Utils.java
new file mode 100644
index 0000000..82ccb85
--- /dev/null
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/util/Utils.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.discovery.zookeeper.util;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public final class Utils {
+
+ static final String PATH_PREFIX = "/osgi/service_registry";
+
+ private Utils() {
+ // never constructed
+ }
+
+ public static String getZooKeeperPath(String name) {
+ return name == null || name.isEmpty() ? PATH_PREFIX : PATH_PREFIX + '/' + name.replace('.', '/');
+ }
+
+ /**
+ * Removes nulls and empty strings from the given string array.
+ *
+ * @param strings an array of strings
+ * @return a new array containing the non-null and non-empty
+ * elements of the original array in the same order
+ */
+ public static List<String> removeEmpty(List<String> strings) {
+ List<String> result = new ArrayList<String>();
+ for (String s : strings) {
+ if (s != null && !s.isEmpty()) {
+ result.add(s);
+ }
+ }
+ return result;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/Activator.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/Activator.java b/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/Activator.java
deleted file mode 100644
index 1e6c551..0000000
--- a/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/Activator.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.dosgi.discovery.zookeeper;
-
-import java.util.Dictionary;
-import java.util.Hashtable;
-
-import org.osgi.framework.BundleActivator;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.Constants;
-import org.osgi.service.cm.ManagedService;
-
-public class Activator implements BundleActivator {
-
- private ZooKeeperDiscovery zkd;
-
- public synchronized void start(BundleContext bc) throws Exception {
- zkd = new ZooKeeperDiscovery(bc);
- Dictionary<String, String> props = new Hashtable<String, String>();
- props.put(Constants.SERVICE_PID, "org.apache.aries.rsa.discovery.zookeeper");
- bc.registerService(ManagedService.class.getName(), zkd, props);
- }
-
- public synchronized void stop(BundleContext bc) throws Exception {
- zkd.stop(true);
- }
-}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/ZooKeeperDiscovery.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/ZooKeeperDiscovery.java b/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/ZooKeeperDiscovery.java
deleted file mode 100644
index 3a7f2c4..0000000
--- a/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/ZooKeeperDiscovery.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.dosgi.discovery.zookeeper;
-
-import java.io.IOException;
-import java.util.Dictionary;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.cxf.dosgi.discovery.zookeeper.publish.PublishingEndpointListenerFactory;
-import org.apache.cxf.dosgi.discovery.zookeeper.subscribe.EndpointListenerTracker;
-import org.apache.cxf.dosgi.discovery.zookeeper.subscribe.InterfaceMonitorManager;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
-import org.osgi.framework.BundleContext;
-import org.osgi.service.cm.ConfigurationException;
-import org.osgi.service.cm.ManagedService;
-import org.osgi.service.remoteserviceadmin.EndpointListener;
-import org.osgi.util.tracker.ServiceTracker;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ZooKeeperDiscovery implements Watcher, ManagedService {
-
- public static final String DISCOVERY_ZOOKEEPER_ID = "org.apache.cxf.dosgi.discovery.zookeeper";
-
- private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperDiscovery.class);
-
- private final BundleContext bctx;
-
- private PublishingEndpointListenerFactory endpointListenerFactory;
- private ServiceTracker<EndpointListener, EndpointListener> endpointListenerTracker;
- private InterfaceMonitorManager imManager;
- private ZooKeeper zkClient;
- private boolean closed;
- private boolean started;
-
- private Dictionary<String, ?> curConfiguration;
-
- public ZooKeeperDiscovery(BundleContext bctx) {
- this.bctx = bctx;
- }
-
- public synchronized void updated(Dictionary<String, ?> configuration) throws ConfigurationException {
- LOG.debug("Received configuration update for Zookeeper Discovery: {}", configuration);
- // make changes only if config actually changed, to prevent unnecessary ZooKeeper reconnections
- if (!ZooKeeperDiscovery.toMap(configuration).equals(ZooKeeperDiscovery.toMap(curConfiguration))) {
- stop(false);
- curConfiguration = configuration;
- // config is null if it doesn't exist, is being deleted or has not yet been loaded
- // in which case we just stop running
- if (!closed && configuration != null) {
- try {
- createZookeeper(configuration);
- } catch (IOException e) {
- throw new ConfigurationException(null, "Error starting zookeeper client", e);
- }
- }
- }
- }
-
- private synchronized void start() {
- if (closed) {
- return;
- }
- if (started) {
- // we must be re-entrant, i.e. can be called when already started
- LOG.debug("ZookeeperDiscovery already started");
- return;
- }
- LOG.debug("starting ZookeeperDiscovery");
- endpointListenerFactory = new PublishingEndpointListenerFactory(zkClient, bctx);
- endpointListenerFactory.start();
- imManager = new InterfaceMonitorManager(bctx, zkClient);
- endpointListenerTracker = new EndpointListenerTracker(bctx, imManager);
- endpointListenerTracker.open();
- started = true;
- }
-
- public synchronized void stop(boolean close) {
- if (started) {
- LOG.debug("stopping ZookeeperDiscovery");
- }
- started = false;
- closed |= close;
- if (endpointListenerFactory != null) {
- endpointListenerFactory.stop();
- }
- if (endpointListenerTracker != null) {
- endpointListenerTracker.close();
- }
- if (imManager != null) {
- imManager.close();
- }
- if (zkClient != null) {
- try {
- zkClient.close();
- } catch (InterruptedException e) {
- LOG.error("Error closing ZooKeeper", e);
- }
- }
- }
-
- protected ZooKeeper createZooKeeper(String host, String port, int timeout) throws IOException {
- LOG.info("ZooKeeper discovery connecting to {}:{} with timeout {}",
- new Object[]{host, port, timeout});
- return new ZooKeeper(host + ":" + port, timeout, this);
- }
-
- /* Callback for ZooKeeper */
- public void process(WatchedEvent event) {
- LOG.debug("got ZooKeeper event " + event);
- switch (event.getState()) {
- case SyncConnected:
- LOG.info("Connection to ZooKeeper established");
- // this event can be triggered more than once in a row (e.g. after Disconnected event),
- // so we must be re-entrant here
- start();
- break;
-
- case Expired:
- LOG.info("Connection to ZooKeeper expired. Trying to create a new connection");
- stop(false);
- try {
- createZookeeper(curConfiguration);
- } catch (IOException e) {
- LOG.error("Error starting zookeeper client", e);
- }
- break;
-
- default:
- // ignore other events
- break;
- }
- }
-
- private void createZookeeper(Dictionary<String, ?> config) throws IOException {
- String host = (String)getWithDefault(config, "zookeeper.host", "localhost");
- String port = (String)getWithDefault(config, "zookeeper.port", "2181");
- int timeout = Integer.parseInt((String)getWithDefault(config, "zookeeper.timeout", "3000"));
- zkClient = createZooKeeper(host, port, timeout);
- }
-
- public Object getWithDefault(Dictionary<String, ?> config, String key, Object defaultValue) {
- Object value = config.get(key);
- return value != null ? value : defaultValue;
- }
-
- /**
- * Converts the given Dictionary to a Map.
- *
- * @param dict a dictionary
- * @param <K> the key type
- * @param <V> the value type
- * @return the converted map, or an empty map if the given dictionary is null
- */
- public static <K, V> Map<K, V> toMap(Dictionary<K, V> dict) {
- Map<K, V> map = new HashMap<K, V>();
- if (dict != null) {
- Enumeration<K> keys = dict.keys();
- while (keys.hasMoreElements()) {
- K key = keys.nextElement();
- map.put(key, dict.get(key));
- }
- }
- return map;
- }
-}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/publish/DiscoveryPlugin.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/publish/DiscoveryPlugin.java b/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/publish/DiscoveryPlugin.java
deleted file mode 100644
index 5d46585..0000000
--- a/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/publish/DiscoveryPlugin.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.dosgi.discovery.zookeeper.publish;
-
-import java.util.Map;
-
-/**
- * This interface allows transformation of service registration information before it is pushed into the ZooKeeper
- * discovery system.
- * It can be useful for situations where a host name or port number needs to be changed in cases where the host running
- * the service is known differently from the outside to what the local Java process thinks it is.
- * Extra service properties can also be added to the registration which can be useful to refine the remote service
- * lookup process. <p/>
- *
- * DiscoveryPlugins use the OSGi WhiteBoard pattern. To add one to the system, register an instance under this interface
- * with the OSGi Service Registry. All registered DiscoveryPlugin instances are visited and given a chance to
- * process the information before it is pushed into ZooKeeper. <p/>
- *
- * Note that the changes made using this plugin do not modify the local service registration.
- *
- */
-public interface DiscoveryPlugin {
-
- /**
- * Process service registration information. Plugins can change this information before it is published into the
- * ZooKeeper discovery system.
- *
- * @param mutableProperties A map of service registration properties. The map is mutable and any changes to the map
- * will be reflected in the ZooKeeper registration.
- * @param endpointKey The key under which the service is registered in ZooKeeper. This key typically has the
- * following format: hostname#port##context. While the actual value of this key is not actually used by the
- * system (people can use it as a hint to understand where the service is located), the value <i>must</i> be
- * unique for all services of a given type.
- * @return The <tt>endpointKey</tt> value to be used. If there is no need to change this simply return the value
- * of the <tt>endpointKey</tt> parameter.
- */
- String process(Map<String, Object> mutableProperties, String endpointKey);
-}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/publish/PublishingEndpointListener.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/publish/PublishingEndpointListener.java b/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/publish/PublishingEndpointListener.java
deleted file mode 100644
index 9bcfe72..0000000
--- a/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/publish/PublishingEndpointListener.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.dosgi.discovery.zookeeper.publish;
-
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.aries.rsa.discovery.endpoint.EndpointDescriptionParser;
-import org.apache.aries.rsa.discovery.endpoint.PropertiesMapper;
-import org.apache.cxf.dosgi.discovery.zookeeper.util.Utils;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.NoNodeException;
-import org.apache.zookeeper.KeeperException.NodeExistsException;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.ZooKeeper;
-import org.osgi.framework.BundleContext;
-import org.osgi.service.remoteserviceadmin.EndpointDescription;
-import org.osgi.service.remoteserviceadmin.EndpointListener;
-import org.osgi.util.tracker.ServiceTracker;
-import org.osgi.xmlns.rsa.v1_0.EndpointDescriptionType;
-import org.osgi.xmlns.rsa.v1_0.PropertyType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Listens for local Endpoints and publishes them to ZooKeeper.
- */
-public class PublishingEndpointListener implements EndpointListener {
-
- private static final Logger LOG = LoggerFactory.getLogger(PublishingEndpointListener.class);
-
- private final ZooKeeper zk;
- private final ServiceTracker<DiscoveryPlugin, DiscoveryPlugin> discoveryPluginTracker;
- private final List<EndpointDescription> endpoints = new ArrayList<EndpointDescription>();
- private boolean closed;
-
- private final EndpointDescriptionParser endpointDescriptionParser;
-
- public PublishingEndpointListener(ZooKeeper zk, BundleContext bctx) {
- this.zk = zk;
- discoveryPluginTracker = new ServiceTracker<DiscoveryPlugin, DiscoveryPlugin>(bctx,
- DiscoveryPlugin.class, null);
- discoveryPluginTracker.open();
- endpointDescriptionParser = new EndpointDescriptionParser();
- }
-
- public void endpointAdded(EndpointDescription endpoint, String matchedFilter) {
- LOG.info("Local EndpointDescription added: {}", endpoint);
-
- synchronized (endpoints) {
- if (closed) {
- return;
- }
- if (endpoints.contains(endpoint)) {
- // TODO -> Should the published endpoint be updated here?
- return;
- }
-
- try {
- addEndpoint(endpoint);
- endpoints.add(endpoint);
- } catch (Exception ex) {
- LOG.error("Exception while processing the addition of an endpoint.", ex);
- }
- }
- }
-
- private void addEndpoint(EndpointDescription endpoint) throws URISyntaxException, KeeperException,
- InterruptedException, IOException {
- Collection<String> interfaces = endpoint.getInterfaces();
- String endpointKey = getKey(endpoint);
- Map<String, Object> props = new HashMap<String, Object>(endpoint.getProperties());
-
- // process plugins
- Object[] plugins = discoveryPluginTracker.getServices();
- if (plugins != null) {
- for (Object plugin : plugins) {
- if (plugin instanceof DiscoveryPlugin) {
- endpointKey = ((DiscoveryPlugin)plugin).process(props, endpointKey);
- }
- }
- }
-
- for (String name : interfaces) {
- String path = Utils.getZooKeeperPath(name);
- String fullPath = path + '/' + endpointKey;
- LOG.info("Creating ZooKeeper node for service with path {}", fullPath);
- createPath(path, zk);
- List<PropertyType> propsOut = new PropertiesMapper().fromProps(props);
- EndpointDescriptionType epd = new EndpointDescriptionType();
- epd.getProperty().addAll(propsOut);
- byte[] epData = endpointDescriptionParser.getData(epd);
- createEphemeralNode(fullPath, epData);
- }
- }
-
- private void createEphemeralNode(String fullPath, byte[] data) throws KeeperException, InterruptedException {
- try {
- zk.create(fullPath, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
- } catch (NodeExistsException nee) {
- // this sometimes happens after a ZooKeeper node dies and the ephemeral node
- // that belonged to the old session was not yet deleted. We need to make our
- // session the owner of the node so it won't get deleted automatically -
- // we do this by deleting and recreating it ourselves.
- LOG.info("node for endpoint already exists, recreating: {}", fullPath);
- try {
- zk.delete(fullPath, -1);
- } catch (NoNodeException nne) {
- // it's a race condition, but as long as it got deleted - it's ok
- }
- zk.create(fullPath, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
- }
- }
-
- public void endpointRemoved(EndpointDescription endpoint, String matchedFilter) {
- LOG.info("Local EndpointDescription removed: {}", endpoint);
-
- synchronized (endpoints) {
- if (closed) {
- return;
- }
- if (!endpoints.contains(endpoint)) {
- return;
- }
-
- try {
- removeEndpoint(endpoint);
- endpoints.remove(endpoint);
- } catch (Exception ex) {
- LOG.error("Exception while processing the removal of an endpoint", ex);
- }
- }
- }
-
- private void removeEndpoint(EndpointDescription endpoint) throws UnknownHostException, URISyntaxException {
- Collection<String> interfaces = endpoint.getInterfaces();
- String endpointKey = getKey(endpoint);
- for (String name : interfaces) {
- String path = Utils.getZooKeeperPath(name);
- String fullPath = path + '/' + endpointKey;
- LOG.debug("Removing ZooKeeper node: {}", fullPath);
- try {
- zk.delete(fullPath, -1);
- } catch (Exception ex) {
- LOG.debug("Error while removing endpoint: {}", ex); // e.g. session expired
- }
- }
- }
-
- private static void createPath(String path, ZooKeeper zk) throws KeeperException, InterruptedException {
- StringBuilder current = new StringBuilder();
- List<String> parts = Utils.removeEmpty(Arrays.asList(path.split("/")));
- for (String part : parts) {
- current.append('/');
- current.append(part);
- try {
- zk.create(current.toString(), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- } catch (NodeExistsException nee) {
- // it's not the first node with this path to ever exist - that's normal
- }
- }
- }
-
- private static String getKey(EndpointDescription endpoint) throws URISyntaxException {
- URI uri = new URI(endpoint.getId());
- return new StringBuilder().append(uri.getHost()).append("#").append(uri.getPort())
- .append("#").append(uri.getPath().replace('/', '#')).toString();
- }
-
- public void close() {
- LOG.debug("closing - removing all endpoints");
- synchronized (endpoints) {
- closed = true;
- for (EndpointDescription endpoint : endpoints) {
- try {
- removeEndpoint(endpoint);
- } catch (Exception ex) {
- LOG.error("Exception while removing endpoint during close", ex);
- }
- }
- endpoints.clear();
- }
- discoveryPluginTracker.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/publish/PublishingEndpointListenerFactory.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/publish/PublishingEndpointListenerFactory.java b/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/publish/PublishingEndpointListenerFactory.java
deleted file mode 100644
index 99a9849..0000000
--- a/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/publish/PublishingEndpointListenerFactory.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.dosgi.discovery.zookeeper.publish;
-
-import java.util.ArrayList;
-import java.util.Dictionary;
-import java.util.Hashtable;
-import java.util.List;
-
-import org.apache.cxf.dosgi.discovery.zookeeper.ZooKeeperDiscovery;
-import org.apache.zookeeper.ZooKeeper;
-import org.osgi.framework.Bundle;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.Constants;
-import org.osgi.framework.ServiceFactory;
-import org.osgi.framework.ServiceRegistration;
-import org.osgi.service.remoteserviceadmin.EndpointListener;
-import org.osgi.service.remoteserviceadmin.RemoteConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Creates local EndpointListeners that publish to ZooKeeper.
- */
-public class PublishingEndpointListenerFactory implements ServiceFactory<PublishingEndpointListener> {
-
- private static final Logger LOG = LoggerFactory.getLogger(PublishingEndpointListenerFactory.class);
-
- private final BundleContext bctx;
- private final ZooKeeper zk;
- private final List<PublishingEndpointListener> listeners = new ArrayList<PublishingEndpointListener>();
- private ServiceRegistration<?> serviceRegistration;
-
- public PublishingEndpointListenerFactory(ZooKeeper zk, BundleContext bctx) {
- this.bctx = bctx;
- this.zk = zk;
- }
-
- public PublishingEndpointListener getService(Bundle b, ServiceRegistration<PublishingEndpointListener> sr) {
- LOG.debug("new EndpointListener from factory");
- synchronized (listeners) {
- PublishingEndpointListener pel = new PublishingEndpointListener(zk, bctx);
- listeners.add(pel);
- return pel;
- }
- }
-
- public void ungetService(Bundle b, ServiceRegistration<PublishingEndpointListener> sr,
- PublishingEndpointListener pel) {
- LOG.debug("remove EndpointListener");
- synchronized (listeners) {
- if (listeners.remove(pel)) {
- pel.close();
- }
- }
- }
-
- public synchronized void start() {
- Dictionary<String, String> props = new Hashtable<String, String>();
- String uuid = bctx.getProperty(Constants.FRAMEWORK_UUID);
- props.put(EndpointListener.ENDPOINT_LISTENER_SCOPE,
- String.format("(&(%s=*)(%s=%s))", Constants.OBJECTCLASS,
- RemoteConstants.ENDPOINT_FRAMEWORK_UUID, uuid));
- props.put(ZooKeeperDiscovery.DISCOVERY_ZOOKEEPER_ID, "true");
- serviceRegistration = bctx.registerService(EndpointListener.class.getName(), this, props);
- }
-
- public synchronized void stop() {
- if (serviceRegistration != null) {
- serviceRegistration.unregister();
- serviceRegistration = null;
- }
- synchronized (listeners) {
- for (PublishingEndpointListener pel : listeners) {
- pel.close();
- }
- listeners.clear();
- }
- }
-
- /**
- * Only for the test case!
- */
- protected List<PublishingEndpointListener> getListeners() {
- synchronized (listeners) {
- return listeners;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/EndpointListenerTracker.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/EndpointListenerTracker.java b/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/EndpointListenerTracker.java
deleted file mode 100644
index 4d0a25f..0000000
--- a/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/EndpointListenerTracker.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.dosgi.discovery.zookeeper.subscribe;
-
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.ServiceReference;
-import org.osgi.service.remoteserviceadmin.EndpointListener;
-import org.osgi.util.tracker.ServiceTracker;
-
-/**
- * Tracks interest in EndpointListeners. Delegates to InterfaceMonitorManager to manage
- * interest in the scopes of each EndpointListener.
- */
-public class EndpointListenerTracker extends ServiceTracker<EndpointListener, EndpointListener> {
- private final InterfaceMonitorManager imManager;
-
- public EndpointListenerTracker(BundleContext bctx, InterfaceMonitorManager imManager) {
- super(bctx, EndpointListener.class, null);
- this.imManager = imManager;
- }
-
- @Override
- public EndpointListener addingService(ServiceReference<EndpointListener> endpointListener) {
- imManager.addInterest(endpointListener);
- return null;
- }
-
- @Override
- public void modifiedService(ServiceReference<EndpointListener> endpointListener, EndpointListener service) {
- // called when an EndpointListener updates its service properties,
- // e.g. when its interest scope is expanded/reduced
- imManager.addInterest(endpointListener);
- }
-
- @Override
- public void removedService(ServiceReference<EndpointListener> endpointListener, EndpointListener service) {
- imManager.removeInterest(endpointListener);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/InterfaceMonitor.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/InterfaceMonitor.java b/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/InterfaceMonitor.java
deleted file mode 100644
index 3822b6e..0000000
--- a/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/InterfaceMonitor.java
+++ /dev/null
@@ -1,262 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.dosgi.discovery.zookeeper.subscribe;
-
-import java.io.ByteArrayInputStream;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.aries.rsa.discovery.endpoint.EndpointDescriptionParser;
-import org.apache.aries.rsa.discovery.endpoint.PropertiesMapper;
-import org.apache.cxf.dosgi.discovery.zookeeper.util.Utils;
-import org.apache.zookeeper.AsyncCallback.StatCallback;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.Code;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.Stat;
-import org.osgi.service.remoteserviceadmin.EndpointDescription;
-import org.osgi.service.remoteserviceadmin.EndpointListener;
-import org.osgi.xmlns.rsa.v1_0.EndpointDescriptionType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Monitors ZooKeeper for changes in published endpoints.
- * <p>
- * Specifically, it monitors the node path associated with a given interface class,
- * whose data is a serialized version of an EndpointDescription, and notifies an
- * EndpointListener when changes are detected (which can then propagate the
- * notification to other EndpointListeners with a matching scope).
- * <p>
- * Note that the EndpointListener is used here as a decoupling interface for
- * convenience, and is not necessarily used according to its documented contract.
- */
-public class InterfaceMonitor implements Watcher, StatCallback {
-
- private static final Logger LOG = LoggerFactory.getLogger(InterfaceMonitor.class);
-
- private final String znode;
- private final ZooKeeper zk;
- private final EndpointListener endpointListener;
- private final boolean recursive;
- private volatile boolean closed;
-
- // This map reference changes, so don't synchronize on it
- private Map<String, EndpointDescription> nodes = new HashMap<String, EndpointDescription>();
-
- private EndpointDescriptionParser parser;
-
- public InterfaceMonitor(ZooKeeper zk, String objClass, EndpointListener endpointListener, String scope) {
- this.zk = zk;
- this.znode = Utils.getZooKeeperPath(objClass);
- this.recursive = objClass == null || objClass.isEmpty();
- this.endpointListener = endpointListener;
- this.parser = new EndpointDescriptionParser();
- LOG.debug("Creating new InterfaceMonitor {} for scope [{}] and objectClass [{}]",
- new Object[] {recursive ? "(recursive)" : "", scope, objClass});
- }
-
- /**
- * Returns all endpoints that are currently known to this monitor.
- *
- * @return all endpoints that are currently known to this monitor
- */
- public synchronized List<EndpointDescription> getEndpoints() {
- return new ArrayList<EndpointDescription>(nodes.values());
- }
-
- public void start() {
- watch();
- }
-
- private void watch() {
- LOG.debug("registering a ZooKeeper.exists({}) callback", znode);
- zk.exists(znode, this, this, null);
- }
-
- /**
- * Zookeeper Watcher interface callback.
- */
- public void process(WatchedEvent event) {
- LOG.debug("ZooKeeper watcher callback on node {} for event {}", znode, event);
- processDelta();
- }
-
- /**
- * Zookeeper StatCallback interface callback.
- */
- @SuppressWarnings("deprecation")
- public void processResult(int rc, String path, Object ctx, Stat stat) {
- LOG.debug("ZooKeeper callback on node: {} code: {}", znode, rc);
-
- switch (rc) {
- case Code.Ok:
- case Code.NoNode:
- processDelta();
- return;
-
- case Code.SessionExpired:
- case Code.NoAuth:
- case Code.ConnectionLoss:
- return;
-
- default:
- watch();
- }
- }
-
- private void processDelta() {
- if (closed) {
- return;
- }
-
- if (zk.getState() != ZooKeeper.States.CONNECTED) {
- LOG.debug("ZooKeeper connection was already closed! Not processing changed event.");
- return;
- }
-
- try {
- if (zk.exists(znode, false) != null) {
- zk.getChildren(znode, this);
- refreshNodes();
- } else {
- LOG.debug("znode {} doesn't exist -> not processing any changes", znode);
- }
- } catch (Exception e) {
- if (zk.getState() != ZooKeeper.States.CONNECTED) {
- LOG.debug("Error getting Zookeeper data: " + e); // e.g. session expired, handled by ZooKeeperDiscovery
- } else {
- LOG.error("Error getting ZooKeeper data.", e);
- }
- }
- }
-
- public synchronized void close() {
- closed = true;
- for (EndpointDescription endpoint : nodes.values()) {
- endpointListener.endpointRemoved(endpoint, null);
- }
- nodes.clear();
- }
-
- private synchronized void refreshNodes() {
- if (closed) {
- return;
- }
- LOG.info("Processing change on node: {}", znode);
-
- Map<String, EndpointDescription> newNodes = new HashMap<String, EndpointDescription>();
- Map<String, EndpointDescription> prevNodes = new HashMap<String, EndpointDescription>(nodes);
- processChildren(znode, newNodes, prevNodes);
-
- // whatever is left in prevNodes now has been removed from Discovery
- LOG.debug("processChildren done. Nodes that are missing now and need to be removed: {}", prevNodes.values());
- for (EndpointDescription endpoint : prevNodes.values()) {
- endpointListener.endpointRemoved(endpoint, null);
- }
- nodes = newNodes;
- }
-
- /**
- * Iterates through all child nodes of the given node and tries to find
- * endpoints. If the recursive flag is set it also traverses into the child
- * nodes.
- *
- * @return true if an endpoint was found and if the node therefore needs to
- * be monitored for changes
- */
- private boolean processChildren(String zn, Map<String, EndpointDescription> newNodes,
- Map<String, EndpointDescription> prevNodes) {
- List<String> children;
- try {
- LOG.debug("Processing the children of {}", zn);
- children = zk.getChildren(zn, false);
-
- boolean foundANode = false;
- for (String child : children) {
- String childZNode = zn + '/' + child;
- EndpointDescription endpoint = getEndpointDescriptionFromNode(childZNode);
- if (endpoint != null) {
- EndpointDescription prevEndpoint = prevNodes.get(child);
- LOG.info("found new node " + zn + "/[" + child + "] ( []->child ) props: "
- + endpoint.getProperties().values());
- newNodes.put(child, endpoint);
- prevNodes.remove(child);
- foundANode = true;
- LOG.debug("Properties: {}", endpoint.getProperties());
- if (prevEndpoint == null) {
- // This guy is new
- endpointListener.endpointAdded(endpoint, null);
- } else if (!prevEndpoint.getProperties().equals(endpoint.getProperties())) {
- // TODO
- }
- }
- if (recursive && processChildren(childZNode, newNodes, prevNodes)) {
- zk.getChildren(childZNode, this);
- }
- }
-
- return foundANode;
- } catch (KeeperException e) {
- LOG.error("Problem processing ZooKeeper node", e);
- } catch (InterruptedException e) {
- LOG.error("Problem processing ZooKeeper node", e);
- }
- return false;
- }
-
- /**
- * Retrieves data from the given node and parses it into an EndpointDescription.
- *
- * @param node a node path
- * @return endpoint found in the node or null if no endpoint was found
- */
- private EndpointDescription getEndpointDescriptionFromNode(String node) {
- try {
- Stat stat = zk.exists(node, false);
- if (stat == null || stat.getDataLength() <= 0) {
- return null;
- }
- byte[] data = zk.getData(node, false, null);
- LOG.debug("Got data for node: {}", node);
-
- EndpointDescription endpoint = getFirstEnpointDescription(data);
- if (endpoint != null) {
- return endpoint;
- }
- LOG.warn("No Discovery information found for node: {}", node);
- } catch (Exception e) {
- LOG.error("Problem getting EndpointDescription from node " + node, e);
- }
- return null;
- }
-
- public EndpointDescription getFirstEnpointDescription(byte[] data) {
- List<EndpointDescriptionType> elements = parser.getEndpointDescriptions(new ByteArrayInputStream(data));
- if (elements.isEmpty()) {
- return null;
- }
- Map<String, Object> props = new PropertiesMapper().toProps(elements.get(0).getProperty());
- return new EndpointDescription(props);
- }
-}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/InterfaceMonitorManager.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/InterfaceMonitorManager.java b/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/InterfaceMonitorManager.java
deleted file mode 100644
index f44b5af..0000000
--- a/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/InterfaceMonitorManager.java
+++ /dev/null
@@ -1,261 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.dosgi.discovery.zookeeper.subscribe;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Dictionary;
-import java.util.HashMap;
-import java.util.Hashtable;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.aries.rsa.util.StringPlus;
-import org.apache.cxf.dosgi.discovery.zookeeper.ZooKeeperDiscovery;
-import org.apache.cxf.dosgi.discovery.zookeeper.util.Utils;
-import org.apache.zookeeper.ZooKeeper;
-import org.osgi.framework.Bundle;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.Filter;
-import org.osgi.framework.ServiceReference;
-import org.osgi.service.remoteserviceadmin.EndpointDescription;
-import org.osgi.service.remoteserviceadmin.EndpointListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Manages the EndpointListeners and the scopes they are interested in.
- * For each scope with interested EndpointListeners an InterfaceMonitor is created.
- * The InterfaceMonitor calls back when it detects added or removed external Endpoints.
- * These events are then forwarded to all interested EndpointListeners.
- */
-public class InterfaceMonitorManager {
- private static final Logger LOG = LoggerFactory.getLogger(InterfaceMonitorManager.class);
- private static final Pattern OBJECTCLASS_PATTERN = Pattern.compile(".*\\(objectClass=([^)]+)\\).*");
-
- private final BundleContext bctx;
- private final ZooKeeper zk;
- // map of EndpointListeners and the scopes they are interested in
- private final Map<ServiceReference<EndpointListener>, List<String>> endpointListenerScopes =
- new HashMap<ServiceReference<EndpointListener>, List<String>>();
- // map of scopes and their interest data
- private final Map<String, Interest> interests = new HashMap<String, Interest>();
-
- protected static class Interest {
- List<ServiceReference<EndpointListener>> endpointListeners =
- new CopyOnWriteArrayList<ServiceReference<EndpointListener>>();
- InterfaceMonitor monitor;
- }
-
- public InterfaceMonitorManager(BundleContext bctx, ZooKeeper zk) {
- this.bctx = bctx;
- this.zk = zk;
- }
-
- public void addInterest(ServiceReference<EndpointListener> endpointListener) {
- if (isOurOwnEndpointListener(endpointListener)) {
- LOG.debug("Skipping our own EndpointListener");
- return;
- }
-
- LOG.info("updating EndpointListener interests: {}", endpointListener);
- if (LOG.isDebugEnabled()) {
- LOG.debug("updated EndpointListener properties: {}", getProperties(endpointListener));
- }
- for (String scope : getScopes(endpointListener)) {
- String objClass = getObjectClass(scope);
- LOG.debug("Adding interest in scope {}, objectClass {}", scope, objClass);
- addInterest(endpointListener, scope, objClass);
- }
- }
-
- private static boolean isOurOwnEndpointListener(ServiceReference<EndpointListener> endpointListener) {
- return Boolean.parseBoolean(String.valueOf(
- endpointListener.getProperty(ZooKeeperDiscovery.DISCOVERY_ZOOKEEPER_ID)));
- }
-
- public synchronized void addInterest(ServiceReference<EndpointListener> endpointListener,
- String scope, String objClass) {
- // get or create interest for given scope and add listener to it
- Interest interest = interests.get(scope);
- if (interest == null) {
- // create interest, add listener and start monitor
- interest = new Interest();
- interests.put(scope, interest);
- interest.endpointListeners.add(endpointListener); // add it before monitor starts so we don't miss events
- interest.monitor = createInterfaceMonitor(scope, objClass, interest);
- interest.monitor.start();
- } else {
- // interest already exists, so just add listener to it
- if (!interest.endpointListeners.contains(endpointListener)) {
- interest.endpointListeners.add(endpointListener);
- }
- // notify listener of all known endpoints for given scope
- // (as EndpointListener contract requires of all added/modified listeners)
- for (EndpointDescription endpoint : interest.monitor.getEndpoints()) {
- notifyListeners(endpoint, scope, true, Arrays.asList(endpointListener));
- }
- }
-
- // add scope to listener's scopes list
- List<String> scopes = endpointListenerScopes.get(endpointListener);
- if (scopes == null) {
- scopes = new ArrayList<String>(1);
- endpointListenerScopes.put(endpointListener, scopes);
- }
- if (!scopes.contains(scope)) {
- scopes.add(scope);
- }
- }
-
- public synchronized void removeInterest(ServiceReference<EndpointListener> endpointListener) {
- LOG.info("removing EndpointListener interests: {}", endpointListener);
- List<String> scopes = endpointListenerScopes.get(endpointListener);
- if (scopes == null) {
- return;
- }
-
- for (String scope : scopes) {
- Interest interest = interests.get(scope);
- if (interest != null) {
- interest.endpointListeners.remove(endpointListener);
- if (interest.endpointListeners.isEmpty()) {
- interest.monitor.close();
- interests.remove(scope);
- }
- }
- }
- endpointListenerScopes.remove(endpointListener);
- }
-
- protected InterfaceMonitor createInterfaceMonitor(final String scope, String objClass, final Interest interest) {
- // holding this object's lock in the callbacks can lead to a deadlock with InterfaceMonitor
- EndpointListener endpointListener = new EndpointListener() {
-
- public void endpointRemoved(EndpointDescription endpoint, String matchedFilter) {
- notifyListeners(endpoint, scope, false, interest.endpointListeners);
- }
-
- public void endpointAdded(EndpointDescription endpoint, String matchedFilter) {
- notifyListeners(endpoint, scope, true, interest.endpointListeners);
- }
- };
- return new InterfaceMonitor(zk, objClass, endpointListener, scope);
- }
-
- private void notifyListeners(EndpointDescription endpoint, String currentScope, boolean isAdded,
- List<ServiceReference<EndpointListener>> endpointListeners) {
- for (ServiceReference<EndpointListener> endpointListenerRef : endpointListeners) {
- EndpointListener service = bctx.getService(endpointListenerRef);
- try {
- EndpointListener endpointListener = (EndpointListener)service;
- LOG.trace("matching {} against {}", endpoint, currentScope);
- if (matchFilter(bctx, currentScope, endpoint)) {
- LOG.debug("Matched {} against {}", endpoint, currentScope);
- notifyListener(endpoint, currentScope, isAdded, endpointListenerRef.getBundle(),
- endpointListener);
- }
- } finally {
- if (service != null) {
- bctx.ungetService(endpointListenerRef);
- }
- }
- }
- }
-
- private static boolean matchFilter(BundleContext bctx, String filter, EndpointDescription endpoint) {
- if (filter == null) {
- return false;
- }
-
- try {
- Filter f = bctx.createFilter(filter);
- Dictionary<String, Object> dict = new Hashtable<String, Object>(endpoint.getProperties());
- return f.match(dict);
- } catch (Exception e) {
- return false;
- }
- }
-
-
- private void notifyListener(EndpointDescription endpoint, String currentScope, boolean isAdded,
- Bundle endpointListenerBundle, EndpointListener endpointListener) {
- if (endpointListenerBundle == null) {
- LOG.info("listening service was unregistered, ignoring");
- } else if (isAdded) {
- LOG.info("calling EndpointListener.endpointAdded: " + endpointListener + " from bundle "
- + endpointListenerBundle.getSymbolicName() + " for endpoint: " + endpoint);
- endpointListener.endpointAdded(endpoint, currentScope);
- } else {
- LOG.info("calling EndpointListener.endpointRemoved: " + endpointListener + " from bundle "
- + endpointListenerBundle.getSymbolicName() + " for endpoint: " + endpoint);
- endpointListener.endpointRemoved(endpoint, currentScope);
- }
- }
-
- public synchronized void close() {
- for (Interest interest : interests.values()) {
- interest.monitor.close();
- }
- interests.clear();
- endpointListenerScopes.clear();
- }
-
- /**
- * Only for test case!
- */
- protected synchronized Map<String, Interest> getInterests() {
- return interests;
- }
-
- /**
- * Only for test case!
- */
- protected synchronized Map<ServiceReference<EndpointListener>, List<String>> getEndpointListenerScopes() {
- return endpointListenerScopes;
- }
-
- protected List<String> getScopes(ServiceReference<?> sref) {
- return Utils.removeEmpty(StringPlus.normalize(sref.getProperty(EndpointListener.ENDPOINT_LISTENER_SCOPE)));
- }
-
- public static String getObjectClass(String scope) {
- Matcher m = OBJECTCLASS_PATTERN.matcher(scope);
- return m.matches() ? m.group(1) : null;
- }
-
- /**
- * Returns a service's properties as a map.
- *
- * @param serviceReference a service reference
- * @return the service's properties as a map
- */
- public static Map<String, Object> getProperties(ServiceReference<?> serviceReference) {
- String[] keys = serviceReference.getPropertyKeys();
- Map<String, Object> props = new HashMap<String, Object>(keys.length);
- for (String key : keys) {
- Object val = serviceReference.getProperty(key);
- props.put(key, val);
- }
- return props;
- }
-}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/util/Utils.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/util/Utils.java b/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/util/Utils.java
deleted file mode 100644
index afd9c0a..0000000
--- a/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/util/Utils.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.dosgi.discovery.zookeeper.util;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public final class Utils {
-
- static final String PATH_PREFIX = "/osgi/service_registry";
-
- private Utils() {
- // never constructed
- }
-
- public static String getZooKeeperPath(String name) {
- return name == null || name.isEmpty() ? PATH_PREFIX : PATH_PREFIX + '/' + name.replace('.', '/');
- }
-
- /**
- * Removes nulls and empty strings from the given string array.
- *
- * @param strings an array of strings
- * @return a new array containing the non-null and non-empty
- * elements of the original array in the same order
- */
- public static List<String> removeEmpty(List<String> strings) {
- List<String> result = new ArrayList<String>();
- for (String s : strings) {
- if (s != null && !s.isEmpty()) {
- result.add(s);
- }
- }
- return result;
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/69bb901e/discovery/zookeeper/src/main/resources/OSGI-INF/metatype/zookeeper.xml
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/resources/OSGI-INF/metatype/zookeeper.xml b/discovery/zookeeper/src/main/resources/OSGI-INF/metatype/zookeeper.xml
new file mode 100644
index 0000000..361fa1e
--- /dev/null
+++ b/discovery/zookeeper/src/main/resources/OSGI-INF/metatype/zookeeper.xml
@@ -0,0 +1,34 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<MetaData xmlns="http://www.osgi.org/xmlns/metadata/v1.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="
+ http://www.osgi.org/xmlns/metadata/v1.0.0 http://www.osgi.org/xmlns/metatype/v1.1.0/metatype.xsd
+ ">
+ <OCD description="" name="Zookeeper server config" id="zookeeper.server">
+ <AD id="clientPort" required="false" type="String" default="2181" description=""/>
+ <AD id="tickTime" required="false" type="String" default="2000" description=""/>
+ <AD id="initLimit" required="false" type="String" default="10" description=""/>
+ <AD id="syncLimit" required="false" type="String" default="5" description=""/>
+ </OCD>
+ <Designate pid="org.apache.aries.rsa.discovery.zookeeper.server">
+ <Object ocdref="zookeeper.server"/>
+ </Designate>
+</MetaData>
\ No newline at end of file