You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by cs...@apache.org on 2012/11/02 16:02:02 UTC
svn commit: r1404989 - in
/cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src:
main/java/org/apache/cxf/dosgi/discovery/zookeeper/
test/java/org/apache/cxf/dosgi/discovery/zookeeper/
Author: cschneider
Date: Fri Nov 2 15:02:01 2012
New Revision: 1404989
URL: http://svn.apache.org/viewvc?rev=1404989&view=rev
Log:
DOSGI-138 DOSGI-123 DOSGI-70 Refactoring of discovery, added reconnect and republishing after connection loss to zookeeper
Added:
cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/InterfaceMonitorManager.java (with props)
cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/PublishingEndpointListener.java
- copied, changed from r1402482, cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/EndpointListenerImpl.java
cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/PublishingEndpointListenerFactory.java
- copied, changed from r1402482, cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/EndpointListenerFactory.java
cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/InterfaceMonitorManagerTest.java
- copied, changed from r1402482, cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/EndpointListenerTrackerCustomizerTest.java
Removed:
cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/DataMonitorListener.java
cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/EndpointListenerFactory.java
cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/EndpointListenerImpl.java
cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/InterfaceDataMonitorListenerImpl.java
cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/ActivatorTest.java
cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/EndpointListenerTrackerCustomizerTest.java
Modified:
cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/Activator.java
cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/EndpointListenerTrackerCustomizer.java
cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/InterfaceMonitor.java
cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/Util.java
cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/ZooKeeperDiscovery.java
cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/EndpointListenerFactoryTest.java
cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/EndpointListenerImplTest.java
cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/InterfaceMonitorTest.java
cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/UtilTest.java
Modified: cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/Activator.java
URL: http://svn.apache.org/viewvc/cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/Activator.java?rev=1404989&r1=1404988&r2=1404989&view=diff
==============================================================================
--- cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/Activator.java (original)
+++ cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/Activator.java Fri Nov 2 15:02:01 2012
@@ -18,92 +18,26 @@
*/
package org.apache.cxf.dosgi.discovery.zookeeper;
-import java.io.IOException;
import java.util.Dictionary;
-import java.util.Enumeration;
import java.util.Hashtable;
-import java.util.logging.Level;
-import java.util.logging.Logger;
import org.osgi.framework.BundleActivator;
import org.osgi.framework.BundleContext;
import org.osgi.framework.Constants;
-import org.osgi.framework.ServiceRegistration;
-import org.osgi.service.cm.ConfigurationException;
import org.osgi.service.cm.ManagedService;
-public class Activator implements BundleActivator, ManagedService {
- private static final Logger LOG = Logger.getLogger(Activator.class.getName());
-
+public class Activator implements BundleActivator {
private ZooKeeperDiscovery zkd;
- private Dictionary zkProperties;
- private BundleContext bctx;
- ServiceRegistration cmReg;
public synchronized void start(BundleContext bc) throws Exception {
- bctx = bc;
- zkProperties = getCMDefaults();
- zkd = createZooKeeperDiscovery();
-
- cmReg = bc.registerService(ManagedService.class.getName(), this, zkProperties);
+ zkd = new ZooKeeperDiscovery(bc);
+ Dictionary<String, String> props = new Hashtable<String, String>();
+ props.put(Constants.SERVICE_PID, "org.apache.cxf.dosgi.discovery.zookeeper");
+ bc.registerService(ManagedService.class.getName(), zkd, props);
}
public synchronized void stop(BundleContext bc) throws Exception {
- cmReg.unregister();
zkd.stop();
}
- public synchronized void updated(Dictionary configuration) throws ConfigurationException {
- if (LOG.isLoggable(Level.FINE))
- LOG.fine("Received configuration update for Zookeeper Discovery: " + configuration);
- if (configuration == null)
- return;
-
- Dictionary effective = getCMDefaults();
- // apply all values on top of the defaults
- for (Enumeration e = configuration.keys(); e.hasMoreElements();) {
- Object key = e.nextElement();
- if (key != null) {
- Object val = configuration.get(key);
- effective.put(key, val);
- }
- }
-
- if (zkProperties.equals(effective)) {
- LOG.info("Update called, but actual settings haven't changed ...");
- return;
- } else if (LOG.isLoggable(Level.INFO)) {
- LOG.info("Updating configuration for Zookeeper Discovery: " + configuration);
- }
-
- zkProperties = effective;
- cmReg.setProperties(zkProperties);
-
- synchronized (this) {
- zkd.stop();
- zkd = createZooKeeperDiscovery();
- }
-
- // call start in any case
- try {
- zkd.start();
- } catch (IOException e) {
- LOG.log(Level.SEVERE, "Failed to start the Zookeeper Discovery component.", e);
- }
-
- }
-
- private Dictionary getCMDefaults() {
- Dictionary props = new Hashtable();
- props.put("zookeeper.timeout", "3000");
- props.put("zookeeper.port", "2181");
- props.put(Constants.SERVICE_PID, "org.apache.cxf.dosgi.discovery.zookeeper");
- return props;
- }
-
- // for testing
- protected ZooKeeperDiscovery createZooKeeperDiscovery() {
- return new ZooKeeperDiscovery(bctx, zkProperties);
- }
-
}
Modified: cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/EndpointListenerTrackerCustomizer.java
URL: http://svn.apache.org/viewvc/cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/EndpointListenerTrackerCustomizer.java?rev=1404989&r1=1404988&r2=1404989&view=diff
==============================================================================
--- cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/EndpointListenerTrackerCustomizer.java (original)
+++ cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/EndpointListenerTrackerCustomizer.java Fri Nov 2 15:02:01 2012
@@ -18,12 +18,6 @@
*/
package org.apache.cxf.dosgi.discovery.zookeeper;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.regex.Matcher;
@@ -31,165 +25,69 @@ import java.util.regex.Pattern;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceReference;
-import org.osgi.service.remoteserviceadmin.EndpointDescription;
-import org.osgi.service.remoteserviceadmin.EndpointListener;
-import org.osgi.service.remoteserviceadmin.RemoteConstants;
import org.osgi.util.tracker.ServiceTrackerCustomizer;
+/**
+ * Tracks interest in EndpointListeners. Delegates to InterfaceMonitorManager to manage
+ * interest in the scopes of each EndpointListener
+ */
public class EndpointListenerTrackerCustomizer implements ServiceTrackerCustomizer {
-
private static final Logger LOG = Logger.getLogger(EndpointListenerTrackerCustomizer.class.getName());
- private ZooKeeperDiscovery zooKeeperDiscovery;
private static final Pattern OBJECTCLASS_PATTERN = Pattern.compile(".*\\(objectClass=([^)]+)\\).*");
- private Map<String /* scope */, Interest> interestingScopes = new HashMap<String, Interest>();
- private Map<ServiceReference, List<String> /* scopes of the epl */> handledEndpointlisteners = new HashMap<ServiceReference, List<String>>();
+ private InterfaceMonitorManager imManager;
- private BundleContext bctx;
-
- protected static class Interest {
- List<ServiceReference> relatedServiceListeners = new ArrayList<ServiceReference>(1);
- InterfaceMonitor im;
- }
-
- public EndpointListenerTrackerCustomizer(ZooKeeperDiscovery zooKeeperDiscovery, BundleContext bc) {
- this.zooKeeperDiscovery = zooKeeperDiscovery;
- bctx = bc;
+ public EndpointListenerTrackerCustomizer(BundleContext bc, InterfaceMonitorManager imManager) {
+ this.imManager = imManager;
}
public Object addingService(ServiceReference sref) {
- LOG.info("addingService: " + sref);
handleEndpointListener(sref);
return sref;
}
public void modifiedService(ServiceReference sref, Object service) {
- LOG.info("modifiedService: " + sref);
handleEndpointListener(sref);
}
private void handleEndpointListener(ServiceReference sref) {
+ if (isOurOwnEndpointListener(sref)) {
+ LOG.finest("Skipping our own endpointListener");
+ return;
+ }
+
if (LOG.isLoggable(Level.FINEST)) {
for (String key : sref.getPropertyKeys()) {
LOG.finest("modifiedService: property: " + key + " => " + sref.getProperty(key));
}
}
-
- if (Boolean.parseBoolean(String.valueOf(
- sref.getProperty(EndpointListenerFactory.DISCOVERY_ZOOKEEPER_ID)))) {
- LOG.finest("found my own endpointListener ... skipping it");
- return;
- }
String[] scopes = Util.getScopes(sref);
-
- if (LOG.isLoggable(Level.FINE)) {
- LOG.fine("trying to discover services for scopes[" + scopes.length + "]: ");
- for (String scope : scopes) {
- LOG.fine("Scope: "+scope);
- }
- }
- if (scopes.length > 0) {
- for (String scope : scopes) {
- LOG.fine("*********** Handling scope: " + scope);
- if("".equals(scope) || scope == null){
- LOG.warning("skipping empty scope from EndpointListener from " + sref.getBundle().getSymbolicName());
- continue;
- }
-
- String objClass = getObjectClass(scope);
- LOG.fine("*********** objectClass: " + objClass);
-
- synchronized (interestingScopes) {
- synchronized (handledEndpointlisteners) {
- Interest interest = interestingScopes.get(scope);
- if (interest == null) {
- interest = new Interest();
- interestingScopes.put(scope, interest);
- }
-
-
- if (!interest.relatedServiceListeners.contains(sref)) {
- interest.relatedServiceListeners.add(sref);
- }
-
- if (interest.im != null) {
- // close old Monitor
- interest.im.close();
- interest.im = null;
- }
-
- InterfaceMonitor dm = createInterfaceMonitor(scope, objClass, interest);
- dm.start();
- interest.im = dm;
-
- List<String> handledScopes = handledEndpointlisteners.get(sref);
- if (handledScopes == null) {
- handledScopes = new ArrayList<String>(1);
- handledEndpointlisteners.put(sref, handledScopes);
- }
-
- if (!handledScopes.contains(scope))
- handledScopes.add(scope);
-
- }
- }
-
+ for (String scope : scopes) {
+ String objClass = getObjectClass(scope);
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.fine("Adding interest in scope: " + scope + " objectClass: " + objClass);
}
+ imManager.addInterest(sref, scope, objClass);
}
}
+ private boolean isOurOwnEndpointListener(ServiceReference sref) {
+ return Boolean.parseBoolean(String.valueOf(
+ sref.getProperty(PublishingEndpointListenerFactory.DISCOVERY_ZOOKEEPER_ID)));
+ }
private String getObjectClass(String scope) {
+ if (scope == null) {
+ return null;
+ }
Matcher m = OBJECTCLASS_PATTERN.matcher(scope);
- if (m.matches())
- return m.group(1);
- return null;
+ return m.matches() ? m.group(1) : null;
}
public void removedService(ServiceReference sref, Object service) {
LOG.info("removedService: " + sref);
-
- List<String> handledScopes = handledEndpointlisteners.get(sref);
- if (handledScopes != null) {
- for (String scope : handledScopes) {
- Interest i = interestingScopes.get(scope);
- if (i != null) {
- i.relatedServiceListeners.remove(sref);
- if (i.relatedServiceListeners.size() == 0) {
- i.im.close();
- interestingScopes.remove(scope);
- }
- }
- }
- handledEndpointlisteners.remove(sref);
- }
-
+ imManager.removeInterest(sref);
}
-
-
- /**
- * Only for test case !
- * */
- protected Map<String, Interest> getInterestingScopes() {
- return interestingScopes;
- }
-
- /**
- * Only for test case !
- * */
- protected Map<ServiceReference, List<String>> getHandledEndpointlisteners() {
- return handledEndpointlisteners;
- }
-
-
- /**
- * Only for test case !
- * */
- protected InterfaceMonitor createInterfaceMonitor(String scope, String objClass, Interest interest) {
- InterfaceMonitor dm = new InterfaceMonitor(zooKeeperDiscovery.getZookeeper(),
- objClass, interest, scope, bctx);
- return dm;
- }
}
Modified: cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/InterfaceMonitor.java
URL: http://svn.apache.org/viewvc/cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/InterfaceMonitor.java?rev=1404989&r1=1404988&r2=1404989&view=diff
==============================================================================
--- cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/InterfaceMonitor.java (original)
+++ cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/InterfaceMonitor.java Fri Nov 2 15:02:01 2012
@@ -18,48 +18,68 @@
*/
package org.apache.cxf.dosgi.discovery.zookeeper;
+import java.io.ByteArrayInputStream;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
+import org.apache.cxf.dosgi.discovery.local.LocalDiscoveryUtils;
+import org.apache.zookeeper.AsyncCallback.StatCallback;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.AsyncCallback.StatCallback;
-import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.data.Stat;
+import org.jdom.Element;
import org.osgi.framework.BundleContext;
-import org.osgi.framework.ServiceReference;
+import org.osgi.service.remoteserviceadmin.EndpointDescription;
+import org.osgi.service.remoteserviceadmin.EndpointListener;
public class InterfaceMonitor implements Watcher, StatCallback {
private static final Logger LOG = Logger.getLogger(InterfaceMonitor.class.getName());
- InterfaceDataMonitorListenerImpl listener;
- final String znode;
- final ZooKeeper zookeeper;
-
+ private final String znode;
+ private final ZooKeeper zookeeper;
+ private final EndpointListener epListener;
+ private final boolean recursive;
private boolean closed = false;
+
+ // This map is *only* accessed in the change() method
+ private Map<String, EndpointDescription> nodes = new HashMap<String, EndpointDescription>();
- public InterfaceMonitor(ZooKeeper zk, String intf, EndpointListenerTrackerCustomizer.Interest zkd, String scope, BundleContext bctx) {
- LOG.fine("Creating new InterfaceMonitor for scope ["+scope+"] and objectClass ["+intf+"] ");
- listener = createInterfaceDataMonitorListener(zk, intf, zkd, scope, bctx);
- zookeeper = zk;
- znode = Util.getZooKeeperPath(intf);
+ public InterfaceMonitor(ZooKeeper zk, String intf, EndpointListener epListener, String scope, BundleContext bctx) {
+ LOG.fine("Creating new InterfaceMonitor for scope [" + scope + "] and objectClass [" + intf + "] ");
+ this.zookeeper = zk;
+ this.znode = Util.getZooKeeperPath(intf);
+ this.recursive = (intf == null || "".equals(intf));
+ this.epListener = epListener;
+ LOG.fine("InterfaceDataMonitorListenerImpl is recursive: " + recursive);
}
public void start() {
- process();
+ watch();
}
- private void process() {
+ private void watch() {
LOG.finest("registering a zookeeper.exists(" + znode+") callback");
zookeeper.exists(znode, this, this, null);
}
+ /**
+ * Zookeeper watcher
+ */
public void process(WatchedEvent event) {
LOG.finer("ZooKeeper watcher callback " + event);
processDelta();
}
+ /**
+ * Zookeeper StatCallback
+ */
+ @SuppressWarnings("deprecation")
public void processResult(int rc, String path, Object ctx, Stat stat) {
LOG.finer("ZooKeeper callback on node: " + znode + " code: " + rc );
@@ -75,7 +95,7 @@ public class InterfaceMonitor implements
return;
default:
- process();
+ watch();
return;
}
@@ -83,9 +103,11 @@ public class InterfaceMonitor implements
}
private void processDelta() {
- if(closed) return;
+ if (closed) {
+ return;
+ }
- if(zookeeper.getState() != ZooKeeper.States.CONNECTED){
+ if (zookeeper.getState() != ZooKeeper.States.CONNECTED){
LOG.info("zookeeper connection was already closed! Not processing changed event.");
return;
}
@@ -93,7 +115,7 @@ public class InterfaceMonitor implements
try {
if (zookeeper.exists(znode, false) != null) {
zookeeper.getChildren(znode, this);
- listener.change();
+ change();
}else{
LOG.fine(znode+" doesn't exist -> not processing any changes");
}
@@ -102,22 +124,101 @@ public class InterfaceMonitor implements
}
}
- public void inform(ServiceReference sref) {
- listener.inform(sref);
- }
-
public void close() {
- // TODO !!!
- closed = true;
+ for (EndpointDescription epd : nodes.values()) {
+ epListener.endpointRemoved(epd, null);
+ }
+ nodes.clear();
}
+ public synchronized void change() {
+ LOG.info("Zookeeper callback on node: " + znode);
+
+ Map<String, EndpointDescription> newNodes = new HashMap<String, EndpointDescription>();
+ Map<String, EndpointDescription> prevNodes = nodes;
+ processChildren(znode, newNodes, prevNodes);
+
+ // whatever is left in prevNodes now has been removed from Discovery
+ LOG.fine("processChildren done. Nodes that are missing now and need to be removed: "
+ + prevNodes.values());
+ for (EndpointDescription epd : prevNodes.values()) {
+ epListener.endpointRemoved(epd, null);
+ }
+ nodes = newNodes;
+ }
+
/**
- * Only for thest case
- * @return
- * */
- protected InterfaceDataMonitorListenerImpl createInterfaceDataMonitorListener(ZooKeeper zk, String intf,
- EndpointListenerTrackerCustomizer.Interest zkd,
- String scope, BundleContext bctx) {
- return new InterfaceDataMonitorListenerImpl(zk, intf, zkd,scope,bctx,this);
+ * iterates through all child nodes of the given node and tries to find endpoints. If the recursive flag
+ * is set it also traverses into the child nodes.
+ *
+ * @return true if an endpoint was found and if the node therefore needs to be monitored for changes
+ */
+ private boolean processChildren(String znode, Map<String, EndpointDescription> newNodes,
+ Map<String, EndpointDescription> prevNodes) {
+ List<String> children;
+ try {
+ LOG.fine("Processing the children of " + znode);
+ children = zookeeper.getChildren(znode, false);
+
+ boolean foundANode = false;
+ for (String child : children) {
+ String childZNode = znode + '/' + child;
+ EndpointDescription epd = getEndpointDescriptionFromNode(childZNode);
+ if (epd != null) {
+ EndpointDescription prevEpd = prevNodes.get(child);
+ LOG.info("found new node " + znode + "/[" + child + "] ( []->child ) props: "
+ + epd.getProperties().values());
+ newNodes.put(child, epd);
+ prevNodes.remove(child);
+ foundANode = true;
+ LOG.finest("Properties: " + epd.getProperties());
+ if (prevEpd == null) {
+ // This guy is new
+ epListener.endpointAdded(epd, null);
+ } else if (!prevEpd.getProperties().equals(epd.getProperties())) {
+ // TODO
+ }
+ }
+ if (recursive) {
+ if (processChildren(childZNode, newNodes, prevNodes)) {
+ zookeeper.getChildren(childZNode, this);
+ }
+ }
+ }
+
+ return foundANode;
+ } catch (KeeperException e) {
+ LOG.log(Level.SEVERE, "Problem processing Zookeeper node: " + e.getMessage(), e);
+ } catch (InterruptedException e) {
+ LOG.log(Level.SEVERE, "Problem processing Zookeeper node: " + e.getMessage(), e);
+ }
+ return false;
+ }
+
+ /**
+ * Scan the node data for Endpoint information and publish it to the related service listeners
+ *
+ * @param node
+ * @return endpoint found in the node or null if no endpoint was found
+ */
+ private EndpointDescription getEndpointDescriptionFromNode(String node) {
+ try {
+ Stat s = zookeeper.exists(node, false);
+ if (s.getDataLength() <= 0) {
+ return null;
+ }
+ byte[] data = zookeeper.getData(node, false, null);
+ LOG.fine("Child: " + node);
+
+ List<Element> elements = LocalDiscoveryUtils.getElements(new ByteArrayInputStream(data));
+ if (elements.size() > 0) {
+ return LocalDiscoveryUtils.getEndpointDescription(elements.get(0));
+ } else {
+ LOG.warning("No Discovery information found for node: " + node);
+ }
+ } catch (Exception e) {
+ LOG.log(Level.SEVERE, "Problem processing Zookeeper callback: " + e.getMessage(), e);
+ }
+ return null;
}
}
Added: cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/InterfaceMonitorManager.java
URL: http://svn.apache.org/viewvc/cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/InterfaceMonitorManager.java?rev=1404989&view=auto
==============================================================================
--- cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/InterfaceMonitorManager.java (added)
+++ cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/InterfaceMonitorManager.java Fri Nov 2 15:02:01 2012
@@ -0,0 +1,182 @@
+package org.apache.cxf.dosgi.discovery.zookeeper;
+
+import java.util.ArrayList;
+import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.zookeeper.ZooKeeper;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.Filter;
+import org.osgi.framework.FrameworkUtil;
+import org.osgi.framework.InvalidSyntaxException;
+import org.osgi.framework.ServiceReference;
+import org.osgi.service.remoteserviceadmin.EndpointDescription;
+import org.osgi.service.remoteserviceadmin.EndpointListener;
+
+/**
+ * Manages the EndpointListeners and the scopes they are interested in.
+ * For each scope with interested EndpointListeners an InterfaceMonitor is created.
+ * The InterfaceMonitor calls back when it detects added or removed external Endpoints.
+ * These events are then forwarded to all interested EndpointListeners
+ */
+public class InterfaceMonitorManager {
+ private static final Logger LOG = Logger.getLogger(InterfaceMonitorManager.class.getName());
+
+ private final ZooKeeper zooKeeper;
+ private final Map<ServiceReference, List<String> /* scopes of the epl */> handledEndpointlisteners = new HashMap<ServiceReference, List<String>>();
+ private final Map<String /* scope */, Interest> interestingScopes = new HashMap<String, Interest>();
+ private final BundleContext bctx;
+
+ protected static class Interest {
+ List<ServiceReference> relatedServiceListeners = new ArrayList<ServiceReference>(1);
+ InterfaceMonitor im;
+ }
+
+ public InterfaceMonitorManager(BundleContext bctx, ZooKeeper zooKeeper) {
+ this.bctx = bctx;
+ this.zooKeeper = zooKeeper;
+ }
+
+ void addInterest(ServiceReference sref, String scope, String objClass) {
+ synchronized (interestingScopes) {
+ synchronized (handledEndpointlisteners) {
+ Interest interest = interestingScopes.get(scope);
+ if (interest == null) {
+ interest = new Interest();
+ interestingScopes.put(scope, interest);
+ }
+
+ if (!interest.relatedServiceListeners.contains(sref)) {
+ interest.relatedServiceListeners.add(sref);
+ }
+
+ if (interest.im != null) {
+ // close old Monitor
+ interest.im.close();
+ interest.im = null;
+ }
+
+ InterfaceMonitor dm = createInterfaceMonitor(scope, objClass, interest);
+ dm.start();
+ interest.im = dm;
+
+ List<String> handledScopes = handledEndpointlisteners.get(sref);
+ if (handledScopes == null) {
+ handledScopes = new ArrayList<String>(1);
+ handledEndpointlisteners.put(sref, handledScopes);
+ }
+
+ if (!handledScopes.contains(scope))
+ handledScopes.add(scope);
+
+ }
+ }
+ }
+
+ /**
+ * Only for test case !
+ * */
+ protected Map<String, Interest> getInterestingScopes() {
+ return interestingScopes;
+ }
+
+ /**
+ * Only for test case !
+ * */
+ protected Map<ServiceReference, List<String>> getHandledEndpointlisteners() {
+ return handledEndpointlisteners;
+ }
+
+ protected InterfaceMonitor createInterfaceMonitor(String scope, String objClass, final Interest interest) {
+ EndpointListener epListener = new EndpointListener() {
+ public void endpointRemoved(EndpointDescription endpoint, String matchedFilter) {
+ notifyListeners(endpoint, false, interest.relatedServiceListeners);
+ }
+
+ public void endpointAdded(EndpointDescription endpoint, String matchedFilter) {
+ notifyListeners(endpoint, true, interest.relatedServiceListeners);
+ }
+ };
+ return new InterfaceMonitor(zooKeeper, objClass, epListener, scope, bctx);
+ }
+
+ public void removeInterest(ServiceReference sref) {
+ List<String> handledScopes = handledEndpointlisteners.get(sref);
+ if (handledScopes == null) {
+ return;
+ }
+
+ for (String scope : handledScopes) {
+ Interest i = interestingScopes.get(scope);
+ if (i != null) {
+ i.relatedServiceListeners.remove(sref);
+ if (i.relatedServiceListeners.size() == 0) {
+ i.im.close();
+ interestingScopes.remove(scope);
+ }
+ }
+ }
+ handledEndpointlisteners.remove(sref);
+ }
+
+ private void notifyListeners(EndpointDescription epd, boolean isAdded,
+ List<ServiceReference> relatedServiceListeners) {
+ for (ServiceReference sref : relatedServiceListeners) {
+ Object service = bctx.getService(sref);
+ if (service == null || !(service instanceof EndpointListener)) {
+ continue;
+ }
+ EndpointListener epl = (EndpointListener) service;
+ String[] scopes = Util.getScopes(sref);
+ for (final String currentScope : scopes) {
+ LOG.fine("matching " + epd + " against " + currentScope);
+ if (matches(currentScope, epd)) {
+ LOG.fine("Matched " + epd + "against " + currentScope);
+ if (isAdded) {
+ LOG.info("calling EndpointListener.endpointAdded: " + epl + "from bundle "
+ + sref.getBundle().getSymbolicName() + " for endpoint: " + epd);
+ epl.endpointAdded(epd, currentScope);
+ } else {
+ LOG.info("calling EndpointListener.endpointRemoved: " + epl + "from bundle "
+ + sref.getBundle().getSymbolicName() + " for endpoint: " + epd);
+ epl.endpointRemoved(epd, currentScope);
+ }
+ break;
+ }
+ }
+ }
+ }
+
+ private boolean matches(String scope, EndpointDescription epd) {
+ try {
+ Filter f = FrameworkUtil.createFilter(scope);
+ Dictionary<String, Object> dict = mapToDictionary(epd.getProperties());
+ return f.match(dict);
+ } catch (InvalidSyntaxException e) {
+ LOG.log(Level.SEVERE, "Currentscope [" + scope + "] resulted in" + " a bad filter!", e);
+ return false;
+ }
+ }
+
+ private Dictionary<String, Object> mapToDictionary(Map<String, Object> map) {
+ Dictionary<String, Object> d = new Hashtable<String, Object>();
+ Set<Map.Entry<String, Object>> entries = map.entrySet();
+ for (Map.Entry<String, Object> entry : entries) {
+ d.put(entry.getKey(), entry.getValue());
+ }
+ return d;
+ }
+
+ public void close() {
+ for (String scope : interestingScopes.keySet()) {
+ Interest interest = interestingScopes.get(scope);
+ interest.im.close();
+ }
+ }
+}
Propchange: cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/InterfaceMonitorManager.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Copied: cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/PublishingEndpointListener.java (from r1402482, cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/EndpointListenerImpl.java)
URL: http://svn.apache.org/viewvc/cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/PublishingEndpointListener.java?p2=cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/PublishingEndpointListener.java&p1=cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/EndpointListenerImpl.java&r1=1402482&r2=1404989&rev=1404989&view=diff
==============================================================================
--- cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/EndpointListenerImpl.java (original)
+++ cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/PublishingEndpointListener.java Fri Nov 2 15:02:01 2012
@@ -42,17 +42,20 @@ import org.osgi.service.remoteserviceadm
import org.osgi.service.remoteserviceadmin.EndpointListener;
import org.osgi.util.tracker.ServiceTracker;
-public class EndpointListenerImpl implements EndpointListener {
- private final Logger LOG = Logger.getLogger(EndpointListenerImpl.class.getName());
+/**
+ * Listens for local Endpoints and publishes them to Zookeeper
+ */
+public class PublishingEndpointListener implements EndpointListener {
+ private final Logger LOG = Logger.getLogger(PublishingEndpointListener.class.getName());
- private final ZooKeeperDiscovery discovery;
+ private final ZooKeeper zookeeper;
private final List<DiscoveryPlugin> discoveryPlugins = new CopyOnWriteArrayList<DiscoveryPlugin>();
private final ServiceTracker discoveryPluginTracker;
private final List<EndpointDescription> endpoints = new ArrayList<EndpointDescription>();
private boolean closed = false;
- public EndpointListenerImpl(ZooKeeperDiscovery zooKeeperDiscovery, BundleContext bctx) {
- discovery = zooKeeperDiscovery;
+ public PublishingEndpointListener(ZooKeeper zooKeeper, BundleContext bctx) {
+ this.zookeeper = zooKeeper;
discoveryPluginTracker = new ServiceTracker(bctx, DiscoveryPlugin.class.getName(), null) {
@Override
@@ -73,12 +76,8 @@ public class EndpointListenerImpl implem
discoveryPluginTracker.open();
}
- private ZooKeeper getZooKeeper() {
- return discovery.getZookeeper();
- }
-
public void endpointAdded(EndpointDescription endpoint, String matchedFilter) {
- LOG.info("endpointDescription added: " + endpoint);
+ LOG.info("Local endpointDescription added: " + endpoint);
if (closed)
return;
@@ -95,7 +94,6 @@ public class EndpointListenerImpl implem
Collection<String> interfaces = endpoint.getInterfaces();
String endpointKey = getKey(endpoint.getId());
- ZooKeeper zk = getZooKeeper();
for (String name : interfaces) {
Map<String, Object> props = new HashMap<String, Object>(endpoint.getProperties());
for (DiscoveryPlugin plugin : discoveryPlugins) {
@@ -103,11 +101,11 @@ public class EndpointListenerImpl implem
}
String path = Util.getZooKeeperPath(name);
- ensurePath(path, zk);
+ ensurePath(path, zookeeper);
String fullPath = path + '/' + endpointKey;
LOG.fine("Creating ZooKeeper node: " + fullPath);
- zk.create(fullPath, getData(props), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+ zookeeper.create(fullPath, getData(props), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
}
endpoints.add(endpoint);
@@ -119,7 +117,7 @@ public class EndpointListenerImpl implem
}
public void endpointRemoved(EndpointDescription endpoint, String matchedFilter) {
- LOG.info("endpointDescription removed: " + endpoint);
+ LOG.info("Local endpointDescription removed: " + endpoint);
if (closed)
return;
@@ -145,12 +143,11 @@ public class EndpointListenerImpl implem
Collection<String> interfaces = endpoint.getInterfaces();
String endpointKey = getKey(endpoint.getId());
- ZooKeeper zk = getZooKeeper();
for (String name : interfaces) {
String path = Util.getZooKeeperPath(name);
String fullPath = path + '/' + endpointKey;
LOG.fine("Removing ZooKeeper node: " + fullPath);
- zk.delete(fullPath, -1);
+ zookeeper.delete(fullPath, -1);
}
}
Copied: cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/PublishingEndpointListenerFactory.java (from r1402482, cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/EndpointListenerFactory.java)
URL: http://svn.apache.org/viewvc/cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/PublishingEndpointListenerFactory.java?p2=cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/PublishingEndpointListenerFactory.java&p1=cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/EndpointListenerFactory.java&r1=1402482&r2=1404989&rev=1404989&view=diff
==============================================================================
--- cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/EndpointListenerFactory.java (original)
+++ cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/PublishingEndpointListenerFactory.java Fri Nov 2 15:02:01 2012
@@ -23,6 +23,7 @@ import java.util.List;
import java.util.Properties;
import java.util.logging.Logger;
+import org.apache.zookeeper.ZooKeeper;
import org.osgi.framework.Bundle;
import org.osgi.framework.BundleContext;
import org.osgi.framework.Constants;
@@ -31,25 +32,27 @@ import org.osgi.framework.ServiceRegistr
import org.osgi.service.remoteserviceadmin.EndpointListener;
import org.osgi.service.remoteserviceadmin.RemoteConstants;
-public class EndpointListenerFactory implements ServiceFactory {
-
+/**
+ * Creates local Endpointlisteners that publish to Zookeeper
+ */
+public class PublishingEndpointListenerFactory implements ServiceFactory {
public static final String DISCOVERY_ZOOKEEPER_ID = "org.apache.cxf.dosgi.discovery.zookeeper";
-
- private Logger LOG = Logger.getLogger(EndpointListenerFactory.class.getName());
+ private static final Logger LOG = Logger.getLogger(PublishingEndpointListenerFactory.class.getName());
+
private BundleContext bctx;
- private ZooKeeperDiscovery discovery;
- private List<EndpointListenerImpl> listeners = new ArrayList<EndpointListenerImpl>();
+ private ZooKeeper zookeeper;
+ private List<PublishingEndpointListener> listeners = new ArrayList<PublishingEndpointListener>();
private ServiceRegistration serviceRegistartion;
- public EndpointListenerFactory(ZooKeeperDiscovery zooKeeperDiscovery, BundleContext bctx) {
+ public PublishingEndpointListenerFactory(ZooKeeper zooKeeper, BundleContext bctx) {
this.bctx = bctx;
- discovery = zooKeeperDiscovery;
+ this.zookeeper = zooKeeper;
}
public Object getService(Bundle b, ServiceRegistration sr) {
LOG.fine("new EndpointListener from factory");
synchronized (listeners) {
- EndpointListenerImpl epl = new EndpointListenerImpl(discovery, bctx);
+ PublishingEndpointListener epl = new PublishingEndpointListener(zookeeper, bctx);
listeners.add(epl);
return epl;
}
@@ -59,7 +62,7 @@ public class EndpointListenerFactory imp
LOG.fine("remove EndpointListener");
synchronized (listeners) {
if (listeners.contains(s)) {
- EndpointListenerImpl epl = (EndpointListenerImpl)s;
+ PublishingEndpointListener epl = (PublishingEndpointListener)s;
epl.close();
listeners.remove(epl);
}
@@ -67,22 +70,18 @@ public class EndpointListenerFactory imp
}
public synchronized void start() {
- serviceRegistartion = bctx.registerService(EndpointListener.class.getName(), this, null);
- updateServiceRegistration();
- }
-
- private void updateServiceRegistration() {
Properties props = new Properties();
props.put(EndpointListener.ENDPOINT_LISTENER_SCOPE, "(&(" + Constants.OBJECTCLASS + "=*)("+RemoteConstants.ENDPOINT_FRAMEWORK_UUID+"="+Util.getUUID(bctx)+"))");
props.put(DISCOVERY_ZOOKEEPER_ID, "true");
- serviceRegistartion.setProperties(props);
+ serviceRegistartion = bctx.registerService(EndpointListener.class.getName(), this, props);
}
public synchronized void stop() {
- if (serviceRegistartion != null)
+ if (serviceRegistartion != null) {
serviceRegistartion.unregister();
+ }
- for (EndpointListenerImpl epl : listeners) {
+ for (PublishingEndpointListener epl : listeners) {
epl.close();
}
}
@@ -90,7 +89,7 @@ public class EndpointListenerFactory imp
/**
* only for the test case !
*/
- protected List<EndpointListenerImpl> getListeners(){
+ protected List<PublishingEndpointListener> getListeners(){
return listeners;
}
Modified: cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/Util.java
URL: http://svn.apache.org/viewvc/cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/Util.java?rev=1404989&r1=1404988&r2=1404989&view=diff
==============================================================================
--- cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/Util.java (original)
+++ cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/Util.java Fri Nov 2 15:02:01 2012
@@ -65,6 +65,7 @@ public class Util {
}
if (property instanceof Collection) {
+ @SuppressWarnings("rawtypes")
Collection col = (Collection)property;
// System.out.println("Collection: size "+col.size());
String[] ret = new String[col.size()];
@@ -83,8 +84,9 @@ public class Util {
String[] scopes = Util.getStringPlusProperty(sref.getProperty(EndpointListener.ENDPOINT_LISTENER_SCOPE));
ArrayList<String> normalizedScopes = new ArrayList<String>(scopes.length);
for (String scope : scopes) {
- if(scope!=null && !"".equals(scope))
+ if (scope != null && !"".equals(scope)) {
normalizedScopes.add(scope);
+ }
}
return normalizedScopes.toArray(new String[normalizedScopes.size()]);
}
Modified: cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/ZooKeeperDiscovery.java
URL: http://svn.apache.org/viewvc/cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/ZooKeeperDiscovery.java?rev=1404989&r1=1404988&r2=1404989&view=diff
==============================================================================
--- cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/ZooKeeperDiscovery.java (original)
+++ cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/ZooKeeperDiscovery.java Fri Nov 2 15:02:01 2012
@@ -20,111 +20,98 @@ package org.apache.cxf.dosgi.discovery.z
import java.io.IOException;
import java.util.Dictionary;
+import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooKeeper;
import org.osgi.framework.BundleContext;
import org.osgi.service.cm.ConfigurationException;
+import org.osgi.service.cm.ManagedService;
import org.osgi.service.remoteserviceadmin.EndpointListener;
import org.osgi.util.tracker.ServiceTracker;
-public class ZooKeeperDiscovery implements Watcher {
+public class ZooKeeperDiscovery implements Watcher, ManagedService {
private static final Logger LOG = Logger.getLogger(ZooKeeperDiscovery.class.getName());
- private boolean started = false;
-
- private BundleContext bctx;
- private ZooKeeper zooKeeper;
- private Dictionary properties = null;
+ private final BundleContext bctx;
- private EndpointListenerFactory endpointListenerFactory;
+ private PublishingEndpointListenerFactory endpointListenerFactory;
private ServiceTracker endpointListenerTracker;
- private String zkHost;
- private String zkPort;
- private int zkTimeout;
+ private InterfaceMonitorManager imManager;
- public ZooKeeperDiscovery(BundleContext bctx, Dictionary initialProps) {
- this.bctx = bctx;
- endpointListenerFactory = new EndpointListenerFactory(this, bctx);
- properties = initialProps;
+ private ZooKeeper zooKeeper;
- endpointListenerTracker = new ServiceTracker(bctx, EndpointListener.class.getName(),
- new EndpointListenerTrackerCustomizer(this, bctx));
- }
+ @SuppressWarnings("rawtypes")
+ private Dictionary curConfiguration;
- public synchronized void start() throws IOException, ConfigurationException {
- if(started) return;
- started = true;
- createZooKeeper(properties);
-
- // Wait up to 10 seconds for the connection to be established and only register
- // the listeners once the connection is established
- int loops = 100;
-
- while(loops>0){
- if(zooKeeper.getState()==ZooKeeper.States.CONNECTED){
- break;
- }
- --loops;
- try {
- Thread.sleep(100);
- } catch (InterruptedException e) {}
- }
-
- if(zooKeeper.getState()!=ZooKeeper.States.CONNECTED){
- throw new IOException("Connection to ZookeeperServer failed !");
- }
-
- endpointListenerFactory.start();
- endpointListenerTracker.open();
- }
-
- public synchronized void stop() {
- if(!started) return;
- started = false;
- endpointListenerFactory.stop();
- endpointListenerTracker.close();
+ public ZooKeeperDiscovery(BundleContext bctx) {
+ this.bctx = bctx;
+ this.curConfiguration = null;
}
+ @SuppressWarnings("rawtypes")
+ public synchronized void updated(Dictionary configuration) throws ConfigurationException {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.fine("Received configuration update for Zookeeper Discovery: " + configuration);
+ }
- private void createZooKeeper(Dictionary props) throws IOException, ConfigurationException {
- zkHost = getProp(props, "zookeeper.host");
- zkPort = getProp(props, "zookeeper.port");
- zkTimeout = Integer.parseInt(getProp(props, "zookeeper.timeout", "3000"));
+ synchronized (this) {
+ stop();
+ }
- zooKeeper = createZooKeeper();
+ if (configuration == null) {
+ return;
+ }
+ curConfiguration = configuration;
+ try {
+ zooKeeper = createZooKeeper(configuration);
+ } catch (IOException e) {
+ LOG.log(Level.SEVERE, "Failed to start the Zookeeper Discovery component.", e);
+ }
}
- // separated for testing
- ZooKeeper createZooKeeper() throws IOException {
- return new ZooKeeper(zkHost + ":" + zkPort, zkTimeout, this);
+ private void startModules() {
+ endpointListenerFactory = new PublishingEndpointListenerFactory(zooKeeper, bctx);
+ endpointListenerFactory.start();
+ imManager = new InterfaceMonitorManager(bctx, zooKeeper);
+ EndpointListenerTrackerCustomizer customizer = new EndpointListenerTrackerCustomizer(bctx, imManager);
+ endpointListenerTracker = new ServiceTracker(bctx, EndpointListener.class.getName(), customizer);
+ endpointListenerTracker.open();
}
- private <T> boolean hasChanged(T orig, T nw) {
- if (orig == nw) {
- return false;
+ public synchronized void stop() {
+ if (endpointListenerFactory != null) {
+ endpointListenerFactory.stop();
}
-
- if (orig == null) {
- return true;
+ if (imManager != null) {
+ imManager.close();
+ }
+ if (endpointListenerTracker != null) {
+ endpointListenerTracker.close();
+ }
+ if (zooKeeper != null) {
+ try {
+ zooKeeper.close();
+ } catch (InterruptedException e) {
+ LOG.log(Level.SEVERE, "Error closing zookeeper", e);
+ }
}
-
- return !orig.equals(nw);
}
- private static String getProp(Dictionary props, String key) throws ConfigurationException {
- String val = getProp(props, key, null);
- if (val != null) {
- return val;
- } else {
- throw new ConfigurationException(key, "The property " + key + " requires a value");
- }
+ @SuppressWarnings("rawtypes")
+ private ZooKeeper createZooKeeper(Dictionary props) throws IOException {
+ String zkHost = getProp(props, "zookeeper.host", "localhost");
+ String zkPort = getProp(props, "zookeeper.port", "2181");
+ int zkTimeout = Integer.parseInt(getProp(props, "zookeeper.timeout", "3000"));
+ return new ZooKeeper(zkHost + ":" + zkPort, zkTimeout, this);
}
+ @SuppressWarnings("rawtypes")
private static String getProp(Dictionary props, String key, String def) {
Object val = props.get(key);
String rv;
@@ -139,13 +126,20 @@ public class ZooKeeperDiscovery implemen
}
/* Callback for ZooKeeper */
- public void process(WatchedEvent arg0) {
- // TODO Auto-generated method stub
-
- }
-
- protected ZooKeeper getZookeeper() {
- return zooKeeper;
+ public void process(WatchedEvent event) {
+ KeeperState state = event.getState();
+ if (state == KeeperState.SyncConnected) {
+ LOG.info("Connection to zookeeper established");
+ startModules();
+ }
+ if (state == KeeperState.Expired) {
+ LOG.info("Connection to zookeeper expired. Trying to create a new connection");
+ stop();
+ try {
+ zooKeeper = createZooKeeper(curConfiguration);
+ } catch (IOException e) {
+ LOG.log(Level.SEVERE, "Failed to start the Zookeeper Discovery component.", e);
+ }
+ }
}
-
}
Modified: cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/EndpointListenerFactoryTest.java
URL: http://svn.apache.org/viewvc/cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/EndpointListenerFactoryTest.java?rev=1404989&r1=1404988&r2=1404989&view=diff
==============================================================================
--- cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/EndpointListenerFactoryTest.java (original)
+++ cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/EndpointListenerFactoryTest.java Fri Nov 2 15:02:01 2012
@@ -23,14 +23,12 @@ import java.util.Properties;
import junit.framework.TestCase;
-import org.easymock.IAnswer;
+import org.apache.zookeeper.ZooKeeper;
import org.easymock.classextension.EasyMock;
import org.easymock.classextension.IMocksControl;
import org.osgi.framework.BundleContext;
-import org.osgi.framework.Constants;
import org.osgi.framework.ServiceRegistration;
import org.osgi.service.remoteserviceadmin.EndpointListener;
-import org.osgi.service.remoteserviceadmin.RemoteConstants;
public class EndpointListenerFactoryTest extends TestCase {
@@ -38,42 +36,21 @@ public class EndpointListenerFactoryTest
IMocksControl c = EasyMock.createNiceControl();
BundleContext ctx = c.createMock(BundleContext.class);
- ZooKeeperDiscovery zkd = c.createMock(ZooKeeperDiscovery.class);
+ ZooKeeper zk = c.createMock(ZooKeeper.class);
ServiceRegistration sreg = c.createMock(ServiceRegistration.class);
- EndpointListenerFactory eplf = new EndpointListenerFactory(zkd, ctx);
+ PublishingEndpointListenerFactory eplf = new PublishingEndpointListenerFactory(zk, ctx);
EasyMock.expect(
ctx.registerService(EasyMock.eq(EndpointListener.class.getName()), EasyMock.eq(eplf),
(Properties)EasyMock.anyObject())).andReturn(sreg).once();
-
-
- sreg.setProperties((Properties)EasyMock.anyObject());
- EasyMock.expectLastCall().andAnswer(new IAnswer<Object>() {
-
- public Object answer() throws Throwable {
- Properties p = (Properties)EasyMock.getCurrentArguments()[0];
- assertNotNull(p);
- String scope = (String)p.get(EndpointListener.ENDPOINT_LISTENER_SCOPE);
- assertNotNull(scope);
- assertEquals("(&(" + Constants.OBJECTCLASS + "=*)(" + RemoteConstants.ENDPOINT_FRAMEWORK_UUID
- + "=myUUID))", scope);
- return null;
- }
- }).once();
-
EasyMock.expect(ctx.getProperty(EasyMock.eq("org.osgi.framework.uuid"))).andReturn("myUUID")
.anyTimes();
-
-
-
c.replay();
eplf.start();
c.verify();
-
-
c.reset();
sreg.unregister();
@@ -89,37 +66,20 @@ public class EndpointListenerFactoryTest
IMocksControl c = EasyMock.createNiceControl();
BundleContext ctx = c.createMock(BundleContext.class);
- ZooKeeperDiscovery zkd = c.createMock(ZooKeeperDiscovery.class);
+ ZooKeeper zk = c.createMock(ZooKeeper.class);
ServiceRegistration sreg = c.createMock(ServiceRegistration.class);
- EndpointListenerFactory eplf = new EndpointListenerFactory(zkd, ctx);
+ PublishingEndpointListenerFactory eplf = new PublishingEndpointListenerFactory(zk, ctx);
EasyMock.expect(
ctx.registerService(EasyMock.eq(EndpointListener.class.getName()), EasyMock.eq(eplf),
(Properties)EasyMock.anyObject())).andReturn(sreg).once();
- sreg.setProperties((Properties)EasyMock.anyObject());
- EasyMock.expectLastCall().andAnswer(new IAnswer<Object>() {
-
- public Object answer() throws Throwable {
- Properties p = (Properties)EasyMock.getCurrentArguments()[0];
- assertNotNull(p);
- String scope = (String)p.get(EndpointListener.ENDPOINT_LISTENER_SCOPE);
- assertNotNull(scope);
- assertEquals("(&(" + Constants.OBJECTCLASS + "=*)(" + RemoteConstants.ENDPOINT_FRAMEWORK_UUID
- + "=myUUID))", scope);
- return null;
- }
- }).once();
-
-
EasyMock.expect(ctx.getProperty(EasyMock.eq("org.osgi.framework.uuid"))).andReturn("myUUID")
.anyTimes();
-
-
- EndpointListenerImpl eli = c.createMock(EndpointListenerImpl.class);
+ PublishingEndpointListener eli = c.createMock(PublishingEndpointListener.class);
eli.close();
EasyMock.expectLastCall().once();
@@ -131,7 +91,7 @@ public class EndpointListenerFactoryTest
assertNotNull(service);
assertTrue(service instanceof EndpointListener);
- List<EndpointListenerImpl> listeners = eplf.getListeners();
+ List<PublishingEndpointListener> listeners = eplf.getListeners();
assertEquals(1, listeners.size());
assertEquals(service, listeners.get(0));
Modified: cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/EndpointListenerImplTest.java
URL: http://svn.apache.org/viewvc/cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/EndpointListenerImplTest.java?rev=1404989&r1=1404988&r2=1404989&view=diff
==============================================================================
--- cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/EndpointListenerImplTest.java (original)
+++ cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/EndpointListenerImplTest.java Fri Nov 2 15:02:01 2012
@@ -48,13 +48,8 @@ public class EndpointListenerImplTest ex
IMocksControl c = EasyMock.createNiceControl();
BundleContext ctx = c.createMock(BundleContext.class);
- ZooKeeperDiscovery zkd = c.createMock(ZooKeeperDiscovery.class);
-
ZooKeeper zk = c.createMock(ZooKeeper.class);
- EasyMock.expect(zkd.getZookeeper()).andReturn(zk).anyTimes();
-
-
String path = "/osgi/service_registry/myClass/google.de#80##test";
EasyMock.expect(
zk.create(EasyMock.eq(path),
@@ -66,7 +61,7 @@ public class EndpointListenerImplTest ex
c.replay();
- EndpointListenerImpl eli = new EndpointListenerImpl(zkd, ctx);
+ PublishingEndpointListener eli = new PublishingEndpointListener(zk, ctx);
Map<String, Object> props = new HashMap<String, Object>();
props.put(Constants.OBJECTCLASS, new String[] {
@@ -134,18 +129,12 @@ public class EndpointListenerImplTest ex
String expectedFullPath = "/osgi/service_registry/org/foo/myClass/some.machine#9876##test";
EasyMock.expect(zk.create(
EasyMock.eq(expectedFullPath),
- EasyMock.aryEq(EndpointListenerImpl.getData(expectedProps)),
+ EasyMock.aryEq(PublishingEndpointListener.getData(expectedProps)),
EasyMock.eq(Ids.OPEN_ACL_UNSAFE),
EasyMock.eq(CreateMode.EPHEMERAL))).andReturn("");
EasyMock.replay(zk);
- ZooKeeperDiscovery zkd = new ZooKeeperDiscovery(ctx, null) {
- @Override
- protected ZooKeeper getZookeeper() {
- return zk;
- }
- };
- EndpointListenerImpl eli = new EndpointListenerImpl(zkd, ctx);
+ PublishingEndpointListener eli = new PublishingEndpointListener(zk, ctx);
List<EndpointDescription> endpointList = getEndpointsList(eli);
assertEquals("Precondition", 0, endpointList.size());
@@ -157,7 +146,7 @@ public class EndpointListenerImplTest ex
@SuppressWarnings("unchecked")
- private List<EndpointDescription> getEndpointsList(EndpointListenerImpl eli) throws Exception {
+ private List<EndpointDescription> getEndpointsList(PublishingEndpointListener eli) throws Exception {
Field field = eli.getClass().getDeclaredField("endpoints");
field.setAccessible(true);
return (List<EndpointDescription>) field.get(eli);
@@ -168,13 +157,8 @@ public class EndpointListenerImplTest ex
IMocksControl c = EasyMock.createNiceControl();
BundleContext ctx = c.createMock(BundleContext.class);
- ZooKeeperDiscovery zkd = c.createMock(ZooKeeperDiscovery.class);
-
ZooKeeper zk = c.createMock(ZooKeeper.class);
- EasyMock.expect(zkd.getZookeeper()).andReturn(zk).anyTimes();
-
-
String path = "/osgi/service_registry/myClass/google.de#80##test";
EasyMock.expect(
zk.create(EasyMock.eq(path),
@@ -186,7 +170,7 @@ public class EndpointListenerImplTest ex
c.replay();
- EndpointListenerImpl eli = new EndpointListenerImpl(zkd, ctx);
+ PublishingEndpointListener eli = new PublishingEndpointListener(zk, ctx);
Map<String, Object> props = new HashMap<String, Object>();
props.put(Constants.OBJECTCLASS, new String[] {
@@ -208,7 +192,7 @@ public class EndpointListenerImplTest ex
public void testGetKey() throws Exception {
assertEquals("somehost#9090##org#example#TestEndpoint",
- EndpointListenerImpl.getKey("http://somehost:9090/org/example/TestEndpoint"));
+ PublishingEndpointListener.getKey("http://somehost:9090/org/example/TestEndpoint"));
}
}
Copied: cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/InterfaceMonitorManagerTest.java (from r1402482, cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/EndpointListenerTrackerCustomizerTest.java)
URL: http://svn.apache.org/viewvc/cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/InterfaceMonitorManagerTest.java?p2=cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/InterfaceMonitorManagerTest.java&p1=cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/EndpointListenerTrackerCustomizerTest.java&r1=1402482&r2=1404989&rev=1404989&view=diff
==============================================================================
--- cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/EndpointListenerTrackerCustomizerTest.java (original)
+++ cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/InterfaceMonitorManagerTest.java Fri Nov 2 15:02:01 2012
@@ -23,29 +23,27 @@ import java.util.Properties;
import junit.framework.TestCase;
-import org.apache.cxf.dosgi.discovery.zookeeper.EndpointListenerTrackerCustomizer.Interest;
+import org.apache.zookeeper.ZooKeeper;
import org.easymock.IAnswer;
import org.easymock.classextension.EasyMock;
import org.easymock.classextension.IMocksControl;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceReference;
-import org.osgi.service.remoteserviceadmin.EndpointListener;
-public class EndpointListenerTrackerCustomizerTest extends TestCase{
+public class InterfaceMonitorManagerTest extends TestCase {
public void testEndpointListenerTrackerCustomizer(){
IMocksControl c = EasyMock.createNiceControl();
BundleContext ctx = c.createMock(BundleContext.class);
- ZooKeeperDiscovery zkd = c.createMock(ZooKeeperDiscovery.class);
+ ZooKeeper zk = c.createMock(ZooKeeper.class);
ServiceReference sref = c.createMock(ServiceReference.class);
ServiceReference sref2 = c.createMock(ServiceReference.class);
final Properties p = new Properties();
-
EasyMock.expect(sref.getPropertyKeys()).andAnswer(new IAnswer<String[]>() {
public String[] answer() throws Throwable {
return p.keySet().toArray(new String[p.keySet().size()]);
@@ -75,8 +73,8 @@ public class EndpointListenerTrackerCust
final ArrayList<IMocksControl> controls = new ArrayList<IMocksControl>();
- EndpointListenerTrackerCustomizer eltc = new EndpointListenerTrackerCustomizer(zkd,ctx){
- protected InterfaceMonitor createInterfaceMonitor(String scope, String objClass, Interest interest){
+ InterfaceMonitorManager eltc = new InterfaceMonitorManager(ctx, zk){
+ protected InterfaceMonitor createInterfaceMonitor(String scope, String objClass, Interest interest) {
IMocksControl lc = EasyMock.createNiceControl();
InterfaceMonitor im = lc.createMock(InterfaceMonitor.class);
im.start();
@@ -91,14 +89,14 @@ public class EndpointListenerTrackerCust
c.replay();
- eltc.addingService(sref); // sref has no scope -> nothing should happen
+ // sref has no scope -> nothing should happen
assertEquals(0, eltc.getHandledEndpointlisteners().size());
assertEquals(0, eltc.getInterestingScopes().size());
- p.put(EndpointListener.ENDPOINT_LISTENER_SCOPE, "(objectClass=mine)");
+ //p.put(EndpointListener.ENDPOINT_LISTENER_SCOPE, );
- eltc.addingService(sref);
+ eltc.addInterest(sref, "(objectClass=mine)", "mine");
assertEquals(1, eltc.getHandledEndpointlisteners().size());
assertEquals(1, eltc.getHandledEndpointlisteners().get(sref).size());
@@ -106,14 +104,14 @@ public class EndpointListenerTrackerCust
assertEquals(1, eltc.getInterestingScopes().size());
- eltc.addingService(sref);
+ eltc.addInterest(sref, "(objectClass=mine)", "mine");
assertEquals(1, eltc.getHandledEndpointlisteners().size());
assertEquals(1, eltc.getHandledEndpointlisteners().get(sref).size());
assertEquals("(objectClass=mine)", eltc.getHandledEndpointlisteners().get(sref).get(0));
assertEquals(1, eltc.getInterestingScopes().size());
- eltc.addingService(sref2);
+ eltc.addInterest(sref2, "(objectClass=mine)", "mine");
assertEquals(2, eltc.getHandledEndpointlisteners().size());
assertEquals(1, eltc.getHandledEndpointlisteners().get(sref).size());
@@ -123,22 +121,21 @@ public class EndpointListenerTrackerCust
assertEquals(1, eltc.getInterestingScopes().size());
- eltc.removedService(sref, null);
+ eltc.removeInterest(sref);
assertEquals(1, eltc.getHandledEndpointlisteners().size());
assertEquals(1, eltc.getHandledEndpointlisteners().get(sref2).size());
assertEquals("(objectClass=mine)", eltc.getHandledEndpointlisteners().get(sref2).get(0));
assertEquals(1, eltc.getInterestingScopes().size());
- eltc.removedService(sref, null);
+ eltc.removeInterest(sref);
assertEquals(1, eltc.getHandledEndpointlisteners().size());
assertEquals(1, eltc.getHandledEndpointlisteners().get(sref2).size());
assertEquals("(objectClass=mine)", eltc.getHandledEndpointlisteners().get(sref2).get(0));
assertEquals(1, eltc.getInterestingScopes().size());
-
- eltc.removedService(sref2, null);
+ eltc.removeInterest(sref2);
assertEquals(0, eltc.getHandledEndpointlisteners().size());
assertEquals(0, eltc.getInterestingScopes().size());
Modified: cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/InterfaceMonitorTest.java
URL: http://svn.apache.org/viewvc/cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/InterfaceMonitorTest.java?rev=1404989&r1=1404988&r2=1404989&view=diff
==============================================================================
--- cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/InterfaceMonitorTest.java (original)
+++ cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/InterfaceMonitorTest.java Fri Nov 2 15:02:01 2012
@@ -18,74 +18,52 @@
*/
package org.apache.cxf.dosgi.discovery.zookeeper;
-import java.util.Collection;
+import static org.easymock.EasyMock.eq;
+import static org.easymock.EasyMock.expect;
+
import java.util.Collections;
import junit.framework.TestCase;
-import org.apache.cxf.dosgi.discovery.zookeeper.EndpointListenerTrackerCustomizer.Interest;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.easymock.classextension.EasyMock;
import org.easymock.classextension.IMocksControl;
import org.osgi.framework.BundleContext;
+import org.osgi.service.remoteserviceadmin.EndpointListener;
public class InterfaceMonitorTest extends TestCase {
public void testInterfaceMonitor() throws KeeperException, InterruptedException {
- IMocksControl c = EasyMock.createNiceControl();
+ IMocksControl c = EasyMock.createControl();
BundleContext ctx = c.createMock(BundleContext.class);
- ZooKeeperDiscovery zkd = c.createMock(ZooKeeperDiscovery.class);
-
ZooKeeper zk = c.createMock(ZooKeeper.class);
- EasyMock.expect(zk.getState()).andReturn(ZooKeeper.States.CONNECTED).anyTimes();
-
- EasyMock.expect(zkd.getZookeeper()).andReturn(zk).anyTimes();
-
- EndpointListenerTrackerCustomizer.Interest interest = new EndpointListenerTrackerCustomizer.Interest();
+ expect(zk.getState()).andReturn(ZooKeeper.States.CONNECTED).anyTimes();
String scope = "(myProp=test)";
String interf = "es.schaaf.test";
String node = Util.getZooKeeperPath(interf);
- final InterfaceDataMonitorListenerImpl idmli = c.createMock(InterfaceDataMonitorListenerImpl.class);
-
- InterfaceMonitor im = new InterfaceMonitor(zk, interf, interest, scope, ctx) {
- @Override
- protected InterfaceDataMonitorListenerImpl createInterfaceDataMonitorListener(ZooKeeper zk,
- String intf,
- Interest zkd,
- String scope,
- BundleContext bctx) {
- return idmli;
- }
- };
-
- idmli.change();
- EasyMock.expectLastCall().once();
-
- zk.exists(EasyMock.eq(node), EasyMock.eq(im), EasyMock.eq(im), EasyMock.anyObject());
+ EndpointListener epListener = c.createMock(EndpointListener.class);
+ InterfaceMonitor im = new InterfaceMonitor(zk, interf, epListener, scope, ctx);
+ zk.exists(eq(node), eq(im), eq(im), EasyMock.anyObject());
EasyMock.expectLastCall().once();
- EasyMock.expect(zk.exists(EasyMock.eq(node), EasyMock.eq(false))).andReturn(new Stat()).anyTimes();
-
- EasyMock.expect(zk.getChildren(EasyMock.eq(node), EasyMock.eq(im))).andReturn(Collections.EMPTY_LIST)
- .once();
+ expect(zk.exists(eq(node), eq(false))).andReturn(new Stat()).anyTimes();
+ expect(zk.getChildren(eq(node), eq(false))).andReturn(Collections.<String> emptyList()).once();
+ expect(zk.getChildren(eq(node), eq(im))).andReturn(Collections.<String> emptyList()).once();
c.replay();
-
im.start();
-
// simulate a zk callback
WatchedEvent we = new WatchedEvent(EventType.NodeCreated, KeeperState.SyncConnected, node);
im.process(we);
-
c.verify();
}
}
Modified: cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/UtilTest.java
URL: http://svn.apache.org/viewvc/cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/UtilTest.java?rev=1404989&r1=1404988&r2=1404989&view=diff
==============================================================================
--- cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/UtilTest.java (original)
+++ cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/UtilTest.java Fri Nov 2 15:02:01 2012
@@ -64,6 +64,7 @@ public class UtilTest extends TestCase {
}
+ @SuppressWarnings("unchecked")
public void testGetStringPlusProperty() {
Object in = "MyString";
String[] out = Util.getStringPlusProperty(in);