You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aries.apache.org by cs...@apache.org on 2018/02/08 16:41:01 UTC

[1/9] aries-rsa git commit: [ARIES-1758] First impl for serializing Version

Repository: aries-rsa
Updated Branches:
  refs/heads/master 44943bcef -> 19849747e


[ARIES-1758] First impl for serializing Version


Project: http://git-wip-us.apache.org/repos/asf/aries-rsa/repo
Commit: http://git-wip-us.apache.org/repos/asf/aries-rsa/commit/19849747
Tree: http://git-wip-us.apache.org/repos/asf/aries-rsa/tree/19849747
Diff: http://git-wip-us.apache.org/repos/asf/aries-rsa/diff/19849747

Branch: refs/heads/master
Commit: 19849747e5d300e4613bd9759cacdfe0570edad9
Parents: 9a08b23
Author: Christian Schneider <cs...@adobe.com>
Authored: Thu Feb 8 17:33:36 2018 +0100
Committer: Christian Schneider <cs...@adobe.com>
Committed: Thu Feb 8 17:40:34 2018 +0100

----------------------------------------------------------------------
 .../aries/rsa/provider/tcp/MethodInvoker.java   | 18 ++++-
 .../aries/rsa/provider/tcp/SerVersion.java      | 25 +++++++
 .../aries/rsa/provider/tcp/TCPServer.java       |  1 +
 .../rsa/provider/tcp/TcpInvocationHandler.java  | 38 +++++++++-
 .../rsa/provider/tcp/VersionDeserializer.java   | 67 +++++++++++++++++
 .../rsa/provider/tcp/VersionSerializer.java     | 79 ++++++++++++++++++++
 .../provider/tcp/TcpProviderPrimitiveTest.java  | 14 ++++
 .../tcp/myservice/PrimitiveService.java         |  6 ++
 .../tcp/myservice/PrimitiveServiceImpl.java     | 12 +++
 9 files changed, 256 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/19849747/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/MethodInvoker.java
----------------------------------------------------------------------
diff --git a/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/MethodInvoker.java b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/MethodInvoker.java
index d5f2a03..dadf5f7 100644
--- a/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/MethodInvoker.java
+++ b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/MethodInvoker.java
@@ -24,6 +24,8 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 
+import org.osgi.framework.Version;
+
 public class MethodInvoker {
 
     private HashMap<Object, Object> primTypes;
@@ -39,9 +41,11 @@ public class MethodInvoker {
         this.primTypes.put(Float.TYPE, Float.class);
         this.primTypes.put(Double.TYPE, Double.class);
         this.primTypes.put(Boolean.TYPE, Boolean.class);
+        this.primTypes.put(Character.TYPE, Character.class);
     }
     
     public Object invoke(String methodName, Object[] args) {
+        args = VersionDeserializer.replaceAr(args);
         Class<?>[] parameterTypesAr = getTypes(args);
         Method method = null;
         try {
@@ -52,6 +56,18 @@ public class MethodInvoker {
         }
     }
     
+    private void readReplaceVersion(Object[] args) {
+        if (args != null) {
+            for (int c=0; c<args.length; c++) {
+                Object current = args[c];
+                if (current instanceof SerVersion) {
+                    SerVersion serVersion = (SerVersion)current;
+                    args[c] = new Version(serVersion.getVersion());
+                }
+            }
+        }
+    }
+    
     private Method getMethod(String methodName, Class<?>[] parameterTypesAr) {
         try {
             return service.getClass().getMethod(methodName, parameterTypesAr);
@@ -87,7 +103,7 @@ public class MethodInvoker {
         }
         return type.isAssignableFrom(paramType);
     }
-
+    
     private Class<?>[] getTypes(Object[] args) {
         List<Class<?>> parameterTypes = new ArrayList<>();
         if (args != null) {

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/19849747/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/SerVersion.java
----------------------------------------------------------------------
diff --git a/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/SerVersion.java b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/SerVersion.java
new file mode 100644
index 0000000..ad912df
--- /dev/null
+++ b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/SerVersion.java
@@ -0,0 +1,25 @@
+package org.apache.aries.rsa.provider.tcp;
+
+import java.io.Serializable;
+
+import org.osgi.framework.Version;
+
+public class SerVersion implements Serializable {
+    private static final long serialVersionUID = 4725855052866235835L;
+
+    private String version;
+    
+    public SerVersion() {
+    }
+    
+    public SerVersion(Version version) {
+        this.version = version.toString();
+    }
+    
+    public String getVersion() {
+        return version;
+    }
+    public void setVersion(String version) {
+        this.version = version;
+    }
+}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/19849747/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TCPServer.java
----------------------------------------------------------------------
diff --git a/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TCPServer.java b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TCPServer.java
index ce1b06a..fca6df3 100644
--- a/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TCPServer.java
+++ b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TCPServer.java
@@ -86,6 +86,7 @@ public class TCPServer implements Closeable, Runnable {
         Object[] args = (Object[])ois.readObject();
         Object result = invoker.invoke(methodName, args);
         result = resolveAsnyc(result);
+        result = VersionSerializer.replace(result);
         if (result instanceof InvocationTargetException) {
             result = ((InvocationTargetException) result).getCause();
         }

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/19849747/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpInvocationHandler.java
----------------------------------------------------------------------
diff --git a/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpInvocationHandler.java b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpInvocationHandler.java
index fbda0ec..0fd3526 100644
--- a/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpInvocationHandler.java
+++ b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpInvocationHandler.java
@@ -24,12 +24,17 @@ import java.io.ObjectOutputStream;
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Method;
 import java.net.Socket;
+import java.net.SocketTimeoutException;
 import java.net.UnknownHostException;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.Future;
 import java.util.function.Supplier;
 
+import org.osgi.framework.ServiceException;
+import org.osgi.framework.Version;
 import org.osgi.util.promise.Deferred;
 import org.osgi.util.promise.Promise;
 
@@ -90,18 +95,23 @@ public class TcpInvocationHandler implements InvocationHandler {
     }
 
     private Object handleSyncCall(Method method, Object[] args) throws Throwable {
+        args = (Object[])VersionSerializer.replace(args);
         Object result;
         try (
-                Socket socket = new Socket(this.host, this.port);
+                Socket socket = openSocket();
                 ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream())
             ) {
             socket.setSoTimeout(timeoutMillis);
             out.writeObject(method.getName());
+            
             out.writeObject(args);
             out.flush();
             result = parseResult(socket);
+            result = VersionDeserializer.replace(result);
+        } catch (SocketTimeoutException e) {
+            throw new ServiceException("Timeout calling " + host + ":" + port + " method: " + method.getName(), ServiceException.REMOTE, e);
         } catch (Throwable e) {
-            throw new RuntimeException("Error calling " + host + ":" + port + " method: " + method.getName(), e);
+            throw new ServiceException("Error calling " + host + ":" + port + " method: " + method.getName(), ServiceException.REMOTE, e);
         }
         if (result instanceof Throwable) {
             throw (Throwable)result;
@@ -109,9 +119,31 @@ public class TcpInvocationHandler implements InvocationHandler {
         return result;
     }
 
+    private Socket openSocket() throws UnknownHostException, IOException {
+        return AccessController.doPrivileged(new PrivilegedAction<Socket>() {
+
+            @Override
+            public Socket run() {
+                try {
+                    return new Socket(host, port);
+                } catch (Exception e) {
+                    throw new RuntimeException(e.getMessage(), e);
+                }
+            }
+        });
+    }
+
     private Object parseResult(Socket socket) throws Throwable {
         try (ObjectInputStream in = new LoaderObjectInputStream(socket.getInputStream(), cl)) {
-            return in.readObject();
+            return readReplaceVersion(in.readObject());
+        }
+    }
+
+    private Object readReplaceVersion(Object readObject) {
+        if (readObject instanceof SerVersion) {
+            return new Version(((SerVersion)readObject).getVersion());
+        } else {
+            return readObject;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/19849747/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/VersionDeserializer.java
----------------------------------------------------------------------
diff --git a/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/VersionDeserializer.java b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/VersionDeserializer.java
new file mode 100644
index 0000000..84d965e
--- /dev/null
+++ b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/VersionDeserializer.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.provider.tcp;
+
+import org.osgi.framework.Version;
+
+public class VersionDeserializer {
+
+    public static Object[] replaceAr(Object[] args) {
+        if (args == null) {
+            return null;
+        }
+        Object[] result = new Object[args.length];
+        for (int c=0; c<args.length; c++) {
+            result[c] = replace(args[c]);
+        }
+        return result;
+    }
+    
+    public static Object replace(Object obj) {
+        if (obj == null) {
+            return obj;
+        }
+        if (obj.getClass().isArray()) {
+            if (obj.getClass().getComponentType() == SerVersion.class) {
+                return replaceVersionAr((SerVersion[]) obj);
+            } else if (obj.getClass().getComponentType() == Object.class) {
+                return replaceAr((Object[]) obj);
+            } else {
+                return obj;
+            }
+        } else if (obj instanceof SerVersion) {
+            SerVersion serVersion = (SerVersion) obj;
+            return Version.parseVersion(serVersion.getVersion());
+        } else {
+            return obj;
+        }
+    }
+    
+    private static Version[] replaceVersionAr(SerVersion[] obj) {
+        if (obj == null) {
+            return null;
+        }
+        Version[] result = new Version[obj.length];
+        for (int c=0; c<obj.length; c++) {
+            result[c] = Version.parseVersion(obj[c].getVersion());
+        }
+        return result;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/19849747/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/VersionSerializer.java
----------------------------------------------------------------------
diff --git a/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/VersionSerializer.java b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/VersionSerializer.java
new file mode 100644
index 0000000..2de7114
--- /dev/null
+++ b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/VersionSerializer.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.provider.tcp;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.osgi.framework.Version;
+
+public class VersionSerializer {
+
+    public static Object replace(Object obj) {
+        if (obj == null) {
+            return null;
+        }
+        if (obj.getClass().isArray()) {
+            if (obj.getClass().getComponentType() == Version.class) {
+                return replaceVersionAr((Version[]) obj);
+            } else if (obj.getClass().getComponentType() == Object.class) {
+                return replaceAr((Object[]) obj);
+            } else {
+                return obj;
+            }
+        } else if (obj instanceof List) {
+            return replaceList((List)obj);
+        } else if (obj instanceof Version) {
+            return new SerVersion((Version) obj);
+        } else {
+            return obj;
+        }
+    }
+    
+    private static Object replaceList(List<Version> obj) {
+        List<SerVersion> result = new ArrayList<>();
+        for (Version version : obj) {
+            result.add(new SerVersion(version));
+        }
+        return result;
+    }
+
+    private static SerVersion[] replaceVersionAr(Version[] obj) {
+        if (obj == null) {
+            return null;
+        }
+        SerVersion[] result = new SerVersion[obj.length];
+        for (int c=0; c<obj.length; c++) {
+            result[c] = new SerVersion((Version) obj[c]);
+        }
+        return result;
+    }
+
+    public static Object[] replaceAr(Object[] args) {
+        if (args == null) {
+            return null;
+        }
+        Object[] result = new Object[args.length];
+        for (int c=0; c<args.length; c++) {
+            result[c] = replace(args[c]);
+        }
+        return result;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/19849747/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderPrimitiveTest.java
----------------------------------------------------------------------
diff --git a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderPrimitiveTest.java b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderPrimitiveTest.java
index 2c08104..8badef7 100644
--- a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderPrimitiveTest.java
+++ b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderPrimitiveTest.java
@@ -18,7 +18,10 @@
  */
 package org.apache.aries.rsa.provider.tcp;
 
+import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.core.StringStartsWith.startsWith;
+import static org.junit.Assert.assertThat;
+import static org.osgi.framework.Version.parseVersion;
 
 import java.io.IOException;
 import java.util.HashMap;
@@ -34,6 +37,7 @@ import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.osgi.framework.BundleContext;
+import org.osgi.framework.Version;
 
 public class TcpProviderPrimitiveTest {
 
@@ -98,6 +102,16 @@ public class TcpProviderPrimitiveTest {
     public void testByteAr() {
         Assert.assertArrayEquals(new byte[]{1}, myServiceProxy.callByteAr(new byte[]{1}));
     }
+    
+    @Test
+    public void testVersion() {
+        assertThat(myServiceProxy.callVersion(parseVersion("1.0.0")), equalTo(parseVersion("1.0.0")));
+    }
+    
+    @Test
+    public void testVersionAr() {
+        assertThat(myServiceProxy.callVersionAr(new Version[] {parseVersion("1.0.0")}), equalTo(new Version[] {parseVersion("1.0.0")}));
+    }
 
     @AfterClass
     public static void close() throws IOException {

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/19849747/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/PrimitiveService.java
----------------------------------------------------------------------
diff --git a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/PrimitiveService.java b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/PrimitiveService.java
index d7988a5..679c2fa 100644
--- a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/PrimitiveService.java
+++ b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/PrimitiveService.java
@@ -18,6 +18,8 @@
  */
 package org.apache.aries.rsa.provider.tcp.myservice;
 
+import org.osgi.framework.Version;
+
 public interface PrimitiveService {
     
     byte callByte(byte num);
@@ -35,4 +37,8 @@ public interface PrimitiveService {
     boolean callBoolean(boolean bool);
     
     byte[] callByteAr(byte[] byteAr);
+
+    Version callVersion(Version version);
+
+    Version[] callVersionAr(Version[] version);
 }

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/19849747/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/PrimitiveServiceImpl.java
----------------------------------------------------------------------
diff --git a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/PrimitiveServiceImpl.java b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/PrimitiveServiceImpl.java
index 1e6e5fd..9039dc2 100644
--- a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/PrimitiveServiceImpl.java
+++ b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/PrimitiveServiceImpl.java
@@ -1,5 +1,7 @@
 package org.apache.aries.rsa.provider.tcp.myservice;
 
+import org.osgi.framework.Version;
+
 public class PrimitiveServiceImpl implements PrimitiveService {
 
     @Override
@@ -43,4 +45,14 @@ public class PrimitiveServiceImpl implements PrimitiveService {
         return byteAr;
     }
 
+    @Override
+    public Version callVersion(Version version) {
+        return version;
+    }
+    
+    @Override
+    public Version[] callVersionAr(Version[] version) {
+        return version;
+    }
+
 }


[2/9] aries-rsa git commit: Fix for NPE

Posted by cs...@apache.org.
Fix for NPE


Project: http://git-wip-us.apache.org/repos/asf/aries-rsa/repo
Commit: http://git-wip-us.apache.org/repos/asf/aries-rsa/commit/362b68d6
Tree: http://git-wip-us.apache.org/repos/asf/aries-rsa/tree/362b68d6
Diff: http://git-wip-us.apache.org/repos/asf/aries-rsa/diff/362b68d6

Branch: refs/heads/master
Commit: 362b68d69225f78002e7db0118b76c9cec1b4b4f
Parents: 7544836
Author: Christian Schneider <cs...@adobe.com>
Authored: Thu Feb 8 17:27:05 2018 +0100
Committer: Christian Schneider <cs...@adobe.com>
Committed: Thu Feb 8 17:40:34 2018 +0100

----------------------------------------------------------------------
 .../zookeeper/repository/ZookeeperEndpointRepository.java    | 8 ++++++--
 1 file changed, 6 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/362b68d6/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepository.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepository.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepository.java
index c5c03a4..6e2641f 100644
--- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepository.java
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepository.java
@@ -247,13 +247,17 @@ public class ZookeeperEndpointRepository implements Closeable, Watcher {
     private void handleRemoved(String path) {
         EndpointDescription endpoint = nodes.remove(path);
         EndpointEvent event = new EndpointEvent(EndpointEvent.REMOVED, endpoint);
-        listener.endpointChanged(event, null);
+        if (listener != null) {
+            listener.endpointChanged(event, null);
+        }
     }
 
     private void handleChanged(String path, EndpointDescription endpoint) {
         EndpointDescription old = nodes.put(path, endpoint);
         EndpointEvent event = new EndpointEvent(old == null ? EndpointEvent.ADDED : EndpointEvent.MODIFIED, endpoint);
-        listener.endpointChanged(event, null);
+        if (listener != null) {
+            listener.endpointChanged(event, null);
+        }
     }
 
 }


[6/9] aries-rsa git commit: [ARIES-1775] Remove DiscoveryPlugin

Posted by cs...@apache.org.
[ARIES-1775] Remove DiscoveryPlugin


Project: http://git-wip-us.apache.org/repos/asf/aries-rsa/repo
Commit: http://git-wip-us.apache.org/repos/asf/aries-rsa/commit/fa57fb2b
Tree: http://git-wip-us.apache.org/repos/asf/aries-rsa/tree/fa57fb2b
Diff: http://git-wip-us.apache.org/repos/asf/aries-rsa/diff/fa57fb2b

Branch: refs/heads/master
Commit: fa57fb2b9374e1bbe00f563caaca4e22c08535eb
Parents: ed5adf8
Author: Christian Schneider <cs...@adobe.com>
Authored: Wed Feb 7 11:16:38 2018 +0100
Committer: Christian Schneider <cs...@adobe.com>
Committed: Thu Feb 8 17:40:34 2018 +0100

----------------------------------------------------------------------
 .../zookeeper/publish/DiscoveryPlugin.java      |  54 ----------
 .../publish/PublishingEndpointListener.java     |  27 -----
 .../publish/PublishingEndpointListenerTest.java | 103 -------------------
 3 files changed, 184 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/fa57fb2b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/DiscoveryPlugin.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/DiscoveryPlugin.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/DiscoveryPlugin.java
deleted file mode 100644
index 41afbf2..0000000
--- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/DiscoveryPlugin.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.aries.rsa.discovery.zookeeper.publish;
-
-import java.util.Map;
-
-/**
- * This interface allows transformation of service registration information before it is pushed into the ZooKeeper
- * discovery system.
- * It can be useful for situations where a host name or port number needs to be changed in cases where the host running
- * the service is known differently from the outside to what the local Java process thinks it is.
- * Extra service properties can also be added to the registration which can be useful to refine the remote service
- * lookup process. <p>
- *
- * DiscoveryPlugins use the OSGi WhiteBoard pattern. To add one to the system, register an instance under this interface
- * with the OSGi Service Registry. All registered DiscoveryPlugin instances are visited and given a chance to
- * process the information before it is pushed into ZooKeeper. <p>
- *
- * Note that the changes made using this plugin do not modify the local service registration.
- *
- */
-public interface DiscoveryPlugin {
-
-    /**
-     * Process service registration information. Plugins can change this information before it is published into the
-     * ZooKeeper discovery system.
-     *
-     * @param mutableProperties A map of service registration properties. The map is mutable and any changes to the map
-     * will be reflected in the ZooKeeper registration.
-     * @param endpointKey The key under which the service is registered in ZooKeeper. This key typically has the
-     * following format: hostname#port##context. While the actual value of this key is not actually used by the
-     * system (people can use it as a hint to understand where the service is located), the value <i>must</i> be
-     * unique for all services of a given type.
-     * @return The <tt>endpointKey</tt> value to be used. If there is no need to change this simply return the value
-     * of the <tt>endpointKey</tt> parameter.
-     */
-    String process(Map<String, Object> mutableProperties, String endpointKey);
-}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/fa57fb2b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListener.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListener.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListener.java
index 6dc0a08..c3bf01f 100644
--- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListener.java
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListener.java
@@ -66,7 +66,6 @@ public class PublishingEndpointListener implements EndpointEventListener, Endpoi
     private boolean closed;
     private final EndpointDescriptionParser endpointDescriptionParser;
 
-    private ServiceTracker<DiscoveryPlugin, DiscoveryPlugin> discoveryPluginTracker;
     private ServiceRegistration<?> listenerReg;
 
     public PublishingEndpointListener(ZooKeeper zk, BundleContext bctx) {
@@ -75,9 +74,6 @@ public class PublishingEndpointListener implements EndpointEventListener, Endpoi
     }
     
     public void start(BundleContext bctx) {
-        discoveryPluginTracker = new ServiceTracker<DiscoveryPlugin, DiscoveryPlugin>(bctx, 
-                DiscoveryPlugin.class, null);
-            discoveryPluginTracker.open();
         Dictionary<String, String> props = new Hashtable<String, String>();
         String uuid = bctx.getProperty(Constants.FRAMEWORK_UUID);
         props.put(EndpointEventListener.ENDPOINT_LISTENER_SCOPE, 
@@ -124,15 +120,6 @@ public class PublishingEndpointListener implements EndpointEventListener, Endpoi
         String endpointKey = getKey(endpoint);
         Map<String, Object> props = new HashMap<String, Object>(endpoint.getProperties());
 
-        // process plugins
-        Object[] plugins = discoveryPluginTracker.getServices();
-        if (plugins != null) {
-            for (Object plugin : plugins) {
-                if (plugin instanceof DiscoveryPlugin) {
-                    endpointKey = ((DiscoveryPlugin)plugin).process(props, endpointKey);
-                }
-            }
-        }
         LOG.info("Changing endpoint in zookeeper: {}", endpoint);
         for (String name : interfaces) {
             String path = Utils.getZooKeeperPath(name);
@@ -173,17 +160,6 @@ public class PublishingEndpointListener implements EndpointEventListener, Endpoi
                                                                   InterruptedException, IOException {
         Collection<String> interfaces = endpoint.getInterfaces();
         String endpointKey = getKey(endpoint);
-        Map<String, Object> props = new HashMap<String, Object>(endpoint.getProperties());
-
-        // process plugins
-        Object[] plugins = discoveryPluginTracker != null ? discoveryPluginTracker.getServices() : null;
-        if (plugins != null) {
-            for (Object plugin : plugins) {
-                if (plugin instanceof DiscoveryPlugin) {
-                    endpointKey = ((DiscoveryPlugin)plugin).process(props, endpointKey);
-                }
-            }
-        }
         LOG.info("Exporting endpoint to zookeeper: {}", endpoint);
         for (String name : interfaces) {
             String path = Utils.getZooKeeperPath(name);
@@ -283,8 +259,5 @@ public class PublishingEndpointListener implements EndpointEventListener, Endpoi
             }
             endpoints.clear();
         }
-        if (discoveryPluginTracker != null) {
-            discoveryPluginTracker.close();
-        }
     }
 }

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/fa57fb2b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerTest.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerTest.java b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerTest.java
index fcdc9f7..b7debf6 100644
--- a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerTest.java
+++ b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerTest.java
@@ -20,27 +20,17 @@ package org.apache.aries.rsa.discovery.zookeeper.publish;
 
 import static org.easymock.EasyMock.expect;
 
-import java.io.ByteArrayOutputStream;
-import java.lang.reflect.Field;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
-import org.apache.aries.rsa.discovery.endpoint.EndpointDescriptionParser;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooKeeper;
 import org.easymock.EasyMock;
-import org.easymock.IAnswer;
 import org.easymock.IMocksControl;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.Constants;
-import org.osgi.framework.Filter;
-import org.osgi.framework.FrameworkUtil;
-import org.osgi.framework.InvalidSyntaxException;
-import org.osgi.framework.ServiceListener;
-import org.osgi.framework.ServiceReference;
 import org.osgi.service.remoteserviceadmin.EndpointDescription;
 import org.osgi.service.remoteserviceadmin.EndpointEvent;
 import org.osgi.service.remoteserviceadmin.RemoteConstants;
@@ -73,49 +63,6 @@ public class PublishingEndpointListenerTest extends TestCase {
         c.verify();
     }
 
-    public void testDiscoveryPlugin() throws Exception {
-        BundleContext ctx = EasyMock.createMock(BundleContext.class);
-        stubCreateFilter(ctx);
-        ctx.addServiceListener(EasyMock.isA(ServiceListener.class),
-                EasyMock.eq("(objectClass=" + DiscoveryPlugin.class.getName() + ")"));
-
-        ServiceReference<DiscoveryPlugin> sr1 = createAppendPlugin(ctx);
-        ServiceReference<DiscoveryPlugin> sr2 = createPropertyPlugin(ctx);
-
-        EasyMock.expect(ctx.getServiceReferences(DiscoveryPlugin.class.getName(), null))
-                .andReturn(new ServiceReference[]{sr1, sr2}).anyTimes();
-        EasyMock.replay(ctx);
-
-        EndpointDescription endpoint = createEndpoint();
-
-        Map<String, Object> expectedProps = new HashMap<String, Object>(endpoint.getProperties());
-        expectedProps.put("endpoint.id", "http://google.de:80/test/sub/appended");
-        expectedProps.put("foo", "bar");
-        expectedProps.put("service.imported", "true");
-
-        final ZooKeeper zk = EasyMock.createNiceMock(ZooKeeper.class);
-        String expectedFullPath = "/osgi/service_registry/org/foo/myClass/some.machine#9876##test";
-        
-        EndpointDescription epd = new EndpointDescription(expectedProps);
-        ByteArrayOutputStream bos = new ByteArrayOutputStream();
-        new EndpointDescriptionParser().writeEndpoint(epd, bos);
-        byte[] data = bos.toByteArray();
-        expectCreated(zk, expectedFullPath, EasyMock.aryEq(data));
-        EasyMock.replay(zk);
-
-        PublishingEndpointListener eli = new PublishingEndpointListener(zk, ctx);
-
-        List<EndpointDescription> endpoints = getEndpoints(eli);
-        assertEquals("Precondition", 0, endpoints.size());
-        eli.endpointChanged(new EndpointEvent(EndpointEvent.ADDED, endpoint), null);
-        assertEquals(1, endpoints.size());
-
-        //TODO enable
-        //EasyMock.verify(zk);
-    }
-
-
-
     public void testClose() throws KeeperException, InterruptedException {
         IMocksControl c = EasyMock.createNiceControl();
         BundleContext ctx = c.createMock(BundleContext.class);
@@ -133,56 +80,6 @@ public class PublishingEndpointListenerTest extends TestCase {
         c.verify();
     }
 
-    @SuppressWarnings("unchecked")
-    private ServiceReference<DiscoveryPlugin> createAppendPlugin(BundleContext ctx) {
-        DiscoveryPlugin plugin1 = new DiscoveryPlugin() {
-            public String process(Map<String, Object> mutableProperties, String endpointKey) {
-                String eid = (String) mutableProperties.get("endpoint.id");
-                mutableProperties.put("endpoint.id", eid + "/appended");
-                return endpointKey;
-            }
-        };
-        ServiceReference<DiscoveryPlugin> sr1 = EasyMock.createMock(ServiceReference.class);
-        EasyMock.expect(ctx.getService(sr1)).andReturn(plugin1).anyTimes();
-        return sr1;
-    }
-
-    @SuppressWarnings("unchecked")
-    private ServiceReference<DiscoveryPlugin> createPropertyPlugin(BundleContext ctx) {
-        DiscoveryPlugin plugin2 = new DiscoveryPlugin() {
-            public String process(Map<String, Object> mutableProperties, String endpointKey) {
-                mutableProperties.put("foo", "bar");
-                return endpointKey.replaceAll("localhost", "some.machine");
-            }
-        };
-        ServiceReference<DiscoveryPlugin> sr2 = EasyMock.createMock(ServiceReference.class);
-        EasyMock.expect(ctx.getService(sr2)).andReturn(plugin2).anyTimes();
-        return sr2;
-    }
-
-    @SuppressWarnings("unchecked")
-    private List<EndpointDescription> getEndpoints(PublishingEndpointListener eli) throws Exception {
-        Field field = eli.getClass().getDeclaredField("endpoints");
-        field.setAccessible(true);
-        return (List<EndpointDescription>) field.get(eli);
-    }
-
-    private void stubCreateFilter(BundleContext ctx) throws InvalidSyntaxException {
-        EasyMock.expect(ctx.createFilter(EasyMock.isA(String.class))).andAnswer(new IAnswer<Filter>() {
-            public Filter answer() throws Throwable {
-                return FrameworkUtil.createFilter((String) EasyMock.getCurrentArguments()[0]);
-            }
-        }).anyTimes();
-    }
-
-    private void expectCreated(ZooKeeper zk, String path, byte[] dataMatcher) throws KeeperException, InterruptedException {
-        expect(zk.create(EasyMock.eq(path), 
-                         dataMatcher, 
-                         EasyMock.eq(Ids.OPEN_ACL_UNSAFE),
-                         EasyMock.eq(CreateMode.EPHEMERAL)))
-            .andReturn("");
-    }
-    
     private void expectCreated(ZooKeeper zk, String path) throws KeeperException, InterruptedException {
         expect(zk.create(EasyMock.eq(path), 
                          (byte[])EasyMock.anyObject(), 


[7/9] aries-rsa git commit: Add Awaitility for tests

Posted by cs...@apache.org.
Add Awaitility for tests


Project: http://git-wip-us.apache.org/repos/asf/aries-rsa/repo
Commit: http://git-wip-us.apache.org/repos/asf/aries-rsa/commit/20330373
Tree: http://git-wip-us.apache.org/repos/asf/aries-rsa/tree/20330373
Diff: http://git-wip-us.apache.org/repos/asf/aries-rsa/diff/20330373

Branch: refs/heads/master
Commit: 20330373407397e9bef73ebfcf4ca2f372a77e5b
Parents: ca922a4
Author: Christian Schneider <cs...@adobe.com>
Authored: Thu Feb 8 09:30:52 2018 +0100
Committer: Christian Schneider <cs...@adobe.com>
Committed: Thu Feb 8 17:40:34 2018 +0100

----------------------------------------------------------------------
 parent/pom.xml | 11 +++++++++++
 1 file changed, 11 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/20330373/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index 09e9cc7..af59c55 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -85,6 +85,11 @@
              <artifactId>shazamcrest</artifactId>
              <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.awaitility</groupId>
+            <artifactId>awaitility</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <dependencyManagement>
@@ -154,6 +159,12 @@
                 <scope>test</scope>
             </dependency>
             <dependency>
+                <groupId>org.awaitility</groupId>
+                <artifactId>awaitility</artifactId>
+                <version>3.0.0</version>
+                <scope>test</scope>
+            </dependency>
+            <dependency>
             <groupId>org.apache.zookeeper</groupId>
             <artifactId>zookeeper</artifactId>
             <version>${zookeeper.version}</version>


[5/9] aries-rsa git commit: [ARIES-1774] Switch to watching all endpoints in zookeeper to be able to pass tck

Posted by cs...@apache.org.
[ARIES-1774] Switch to watching all endpoints in zookeeper to be able to pass tck


Project: http://git-wip-us.apache.org/repos/asf/aries-rsa/repo
Commit: http://git-wip-us.apache.org/repos/asf/aries-rsa/commit/ca922a42
Tree: http://git-wip-us.apache.org/repos/asf/aries-rsa/tree/ca922a42
Diff: http://git-wip-us.apache.org/repos/asf/aries-rsa/diff/ca922a42

Branch: refs/heads/master
Commit: ca922a42e0705150d39c6f6955d647f7b1c3ad39
Parents: f07ee8b
Author: Christian Schneider <cs...@adobe.com>
Authored: Thu Feb 8 09:30:22 2018 +0100
Committer: Christian Schneider <cs...@adobe.com>
Committed: Thu Feb 8 17:40:34 2018 +0100

----------------------------------------------------------------------
 .../discovery/zookeeper/ZooKeeperDiscovery.java |   3 +-
 .../repository/ZookeeperEndpointRepository.java | 162 ++++++-----
 .../subscribe/EndpointListenerTracker.java      |  16 +-
 .../zookeeper/subscribe/InterfaceMonitor.java   | 266 -------------------
 .../subscribe/InterfaceMonitorManager.java      | 213 ++++-----------
 .../ZookeeperEndpointRepositoryTest.java        |  25 +-
 .../subscribe/InterfaceMonitorManagerTest.java  |  71 ++---
 .../subscribe/InterfaceMonitorTest.java         |  71 -----
 8 files changed, 198 insertions(+), 629 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/ca922a42/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java
index d265a22..50d9598 100644
--- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java
@@ -98,7 +98,8 @@ public class ZooKeeperDiscovery implements Watcher, ManagedService {
         repository = new ZookeeperEndpointRepository(zkClient);
         endpointListener = new PublishingEndpointListener(repository);
         endpointListener.start(bctx);
-        imManager = new InterfaceMonitorManager(bctx, zkClient);
+        imManager = new InterfaceMonitorManager(repository);
+        repository.addListener(imManager);
         endpointListenerTracker = new EndpointListenerTracker(bctx, imManager);
         endpointListenerTracker.open();
         started = true;

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/ca922a42/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepository.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepository.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepository.java
index 2349c45..c5c03a4 100644
--- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepository.java
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepository.java
@@ -15,6 +15,7 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.aries.rsa.discovery.endpoint.EndpointDescriptionParser;
+import org.apache.aries.rsa.discovery.zookeeper.subscribe.InterfaceMonitorManager;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.NoNodeException;
@@ -53,33 +54,11 @@ public class ZookeeperEndpointRepository implements Closeable, Watcher {
         } catch (Exception e) {
             throw new IllegalStateException("Unable to create base path");
         }
-        // Not yet needed
-        //this.registerWatcher();
+        this.registerWatcher();
     }
 
-    private void registerWatcher() {
-        try {
-            List<String> children = zk.getChildren(ZookeeperEndpointRepository.PATH_PREFIX, this);
-            System.out.println(children);
-        } catch (KeeperException e) {
-            e.printStackTrace();
-        } catch (InterruptedException e) {
-            e.printStackTrace();
-        }
-    }
-    
-    protected void notifyListener(WatchedEvent wevent) {
-        EndpointDescription ep = read(wevent.getPath());
-        if (ep != null) {
-            int type = getEndpointEventType(wevent);
-            EndpointEvent event = new EndpointEvent(type, ep);
-            listener.endpointChanged(event, null);
-        }
-    }
-    
-    private int getEndpointEventType(WatchedEvent wevent) {
-        EventType type = wevent.getType();
-        return EndpointEvent.ADDED;
+    public void addListener(EndpointEventListener listener) {
+        this.listener = listener;
     }
 
     /**
@@ -89,23 +68,7 @@ public class ZookeeperEndpointRepository implements Closeable, Watcher {
      * @return endpoint found in the node or null if no endpoint was found
      */
     public EndpointDescription read(String path) {
-        try {
-            Stat stat = zk.exists(path, false);
-            if (stat == null || stat.getDataLength() <= 0) {
-                return null;
-            }
-            byte[] data = zk.getData(path, false, null);
-            LOG.debug("Got data for node: {}", path);
-
-            EndpointDescription endpoint = parser.readEndpoint(new ByteArrayInputStream(data));
-            if (endpoint != null) {
-                return endpoint;
-            }
-            LOG.warn("No Discovery information found for node: {}", path);
-        } catch (Exception e) {
-            LOG.error("Problem getting EndpointDescription from node " + path, e);
-        }
-        return null;
+        return nodes.get(path);
     }
 
     public void add(EndpointDescription endpoint) throws URISyntaxException, KeeperException,
@@ -113,6 +76,8 @@ public class ZookeeperEndpointRepository implements Closeable, Watcher {
         Collection<String> interfaces = endpoint.getInterfaces();
         String endpointKey = getKey(endpoint);
     
+        createEphemeralNode(ZookeeperEndpointRepository.getZooKeeperPath("") + endpointKey, getData(endpoint));
+        
         LOG.info("Exporting endpoint to zookeeper: {}", endpoint);
         for (String name : interfaces) {
             String path = ZookeeperEndpointRepository.getZooKeeperPath(name);
@@ -152,8 +117,42 @@ public class ZookeeperEndpointRepository implements Closeable, Watcher {
         }
     }
     
-    public List<EndpointDescription> getAll() throws KeeperException, InterruptedException {
-        return null;
+    public Collection<EndpointDescription> getAll() {
+        return nodes.values();
+    }
+
+    /**
+     * Removes nulls and empty strings from the given string array.
+     *
+     * @param strings an array of strings
+     * @return a new array containing the non-null and non-empty
+     *         elements of the original array in the same order
+     */
+    public static List<String> removeEmpty(List<String> strings) {
+        List<String> result = new ArrayList<String>();
+        if (strings == null) {
+            return result;
+        }
+        for (String s : strings) {
+            if (s != null && !s.isEmpty()) {
+                result.add(s);
+            }
+        }
+        return result;
+    }
+
+    public static String getZooKeeperPath(String name) {
+        return name == null || name.isEmpty() ? PATH_PREFIX : PATH_PREFIX + '/' + name.replace('.', '/');
+    }
+
+    @Override
+    public void process(WatchedEvent event) {
+        LOG.info("Received event {}", event);
+        if (event.getType() == EventType.NodeDeleted) {
+            handleRemoved(event.getPath());
+            return;
+        }
+        watchRecursive(event.getPath());
     }
 
     @Override
@@ -161,6 +160,28 @@ public class ZookeeperEndpointRepository implements Closeable, Watcher {
 
     }
 
+    private void registerWatcher() {
+        try {
+            watchRecursive(ZookeeperEndpointRepository.PATH_PREFIX);
+        } catch (Exception e) {
+            LOG.info(e.getMessage(), e);
+        }
+    }
+
+    private void watchRecursive(String path) {
+        LOG.info("Watching {}", path);
+        handleZNodeChanged(path);
+        try {
+            List<String> children = zk.getChildren(path, this);
+            for (String child : children) {
+                String childPath = (path.endsWith("/") ? path : path + "/") + child;
+                watchRecursive(childPath);
+            }
+        } catch (Exception e) {
+            LOG.info(e.getMessage(), e);
+        }
+    }
+
     private byte[] getData(EndpointDescription epd) {
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
         parser.writeEndpoint(epd, bos);
@@ -201,39 +222,38 @@ public class ZookeeperEndpointRepository implements Closeable, Watcher {
         }
     }
 
-    /**
-     * Removes nulls and empty strings from the given string array.
-     *
-     * @param strings an array of strings
-     * @return a new array containing the non-null and non-empty
-     *         elements of the original array in the same order
-     */
-    public static List<String> removeEmpty(List<String> strings) {
-        List<String> result = new ArrayList<String>();
-        if (strings == null) {
-            return result;
-        }
-        for (String s : strings) {
-            if (s != null && !s.isEmpty()) {
-                result.add(s);
-            }
-        }
-        return result;
-    }
-
-    public static String getZooKeeperPath(String name) {
-        return name == null || name.isEmpty() ? PATH_PREFIX : PATH_PREFIX + '/' + name.replace('.', '/');
-    }
-
     private static String getKey(EndpointDescription endpoint) throws URISyntaxException {
         URI uri = new URI(endpoint.getId());
         return new StringBuilder().append(uri.getHost()).append("#").append(uri.getPort())
             .append("#").append(uri.getPath().replace('/', '#')).toString();
     }
 
-    @Override
-    public void process(WatchedEvent event) {
-        
+    private void handleZNodeChanged(String path) {
+        try {
+            Stat stat = new Stat();
+            byte[] data = zk.getData(path, false, stat);
+            if (data == null || data.length == 0) {
+                return;
+            }
+            EndpointDescription endpoint = parser.readEndpoint(new ByteArrayInputStream(data));
+            if (endpoint != null) {
+               handleChanged(path, endpoint);
+            }
+        } catch (Exception e) {
+            LOG.info(e.getMessage(), e);
+        }
+    }
+
+    private void handleRemoved(String path) {
+        EndpointDescription endpoint = nodes.remove(path);
+        EndpointEvent event = new EndpointEvent(EndpointEvent.REMOVED, endpoint);
+        listener.endpointChanged(event, null);
+    }
+
+    private void handleChanged(String path, EndpointDescription endpoint) {
+        EndpointDescription old = nodes.put(path, endpoint);
+        EndpointEvent event = new EndpointEvent(old == null ? EndpointEvent.ADDED : EndpointEvent.MODIFIED, endpoint);
+        listener.endpointChanged(event, null);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/ca922a42/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/EndpointListenerTracker.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/EndpointListenerTracker.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/EndpointListenerTracker.java
index 0ed1097..d4805d0 100644
--- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/EndpointListenerTracker.java
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/EndpointListenerTracker.java
@@ -51,21 +51,23 @@ public class EndpointListenerTracker extends ServiceTracker {
     }
 
     @Override
-    public Object addingService(ServiceReference endpointListener) {
-        imManager.addInterest(endpointListener);
-        return null;
+    public Object addingService(ServiceReference sref) {
+        Object epListener = super.addingService(sref);
+        imManager.addInterest(sref, epListener);
+        return epListener;
     }
 
     @Override
-    public void modifiedService(ServiceReference endpointListener, Object service) {
+    public void modifiedService(ServiceReference sref, Object epListener) {
         // called when an EndpointListener updates its service properties,
         // e.g. when its interest scope is expanded/reduced
-        imManager.addInterest(endpointListener);
+        imManager.addInterest(sref, epListener);
     }
 
     @Override
-    public void removedService(ServiceReference endpointListener, Object service) {
-        imManager.removeInterest(endpointListener);
+    public void removedService(ServiceReference sref, Object epListener) {
+        imManager.removeInterest(sref);
+        super.removedService(sref, epListener);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/ca922a42/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitor.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitor.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitor.java
deleted file mode 100644
index 2c90b3c..0000000
--- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitor.java
+++ /dev/null
@@ -1,266 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.aries.rsa.discovery.zookeeper.subscribe;
-
-import java.io.ByteArrayInputStream;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.aries.rsa.discovery.endpoint.EndpointDescriptionParser;
-import org.apache.aries.rsa.discovery.zookeeper.repository.ZookeeperEndpointRepository;
-import org.apache.zookeeper.AsyncCallback.StatCallback;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.Code;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.Stat;
-import org.osgi.service.remoteserviceadmin.EndpointDescription;
-import org.osgi.service.remoteserviceadmin.EndpointEvent;
-import org.osgi.service.remoteserviceadmin.EndpointEventListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Monitors ZooKeeper for changes in published endpoints.
- * <p>
- * Specifically, it monitors the node path associated with a given interface class,
- * whose data is a serialized version of an EndpointDescription, and notifies an
- * EndpointListener when changes are detected (which can then propagate the
- * notification to other EndpointListeners with a matching scope).
- * <p>
- * Note that the EndpointListener is used here as a decoupling interface for
- * convenience, and is not necessarily used according to its documented contract.
- */
-public class InterfaceMonitor implements Watcher, StatCallback {
-
-    private static final Logger LOG = LoggerFactory.getLogger(InterfaceMonitor.class);
-
-    private final String znode;
-    private final ZooKeeper zk;
-    private final EndpointEventListener endpointListener;
-    private final boolean recursive;
-    private volatile boolean closed;
-
-    // This map reference changes, so don't synchronize on it
-    private Map<String, EndpointDescription> nodes = new HashMap<String, EndpointDescription>();
-
-    private EndpointDescriptionParser parser;
-
-    public InterfaceMonitor(ZooKeeper zk, String objClass, EndpointEventListener endpointListener, String scope) {
-        this.zk = zk;
-        this.znode = ZookeeperEndpointRepository.getZooKeeperPath(objClass);
-        this.recursive = objClass == null || objClass.isEmpty();
-        this.endpointListener = endpointListener;
-        this.parser = new EndpointDescriptionParser();
-        LOG.debug("Creating new InterfaceMonitor {} for scope [{}] and objectClass [{}]",
-                new Object[] {recursive ? "(recursive)" : "", scope, objClass});
-    }
-
-    /**
-     * Returns all endpoints that are currently known to this monitor.
-     *
-     * @return all endpoints that are currently known to this monitor
-     */
-    public synchronized List<EndpointDescription> getEndpoints() {
-        return new ArrayList<EndpointDescription>(nodes.values());
-    }
-
-    public void start() {
-        watch();
-    }
-
-    private void watch() {
-        LOG.debug("registering a ZooKeeper.exists({}) callback", znode);
-        zk.exists(znode, this, this, null);
-        zk.getData(znode, this, new DataCallback() {
-            
-            @Override
-            public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
-                processDelta();
-            }
-        }, null);
-    }
-
-    /**
-     * Zookeeper Watcher interface callback.
-     */
-    public void process(WatchedEvent event) {
-        LOG.debug("ZooKeeper watcher callback on node {} for event {}", znode, event);
-        processDelta();
-    }
-
-    /**
-     * Zookeeper StatCallback interface callback.
-     */
-    @SuppressWarnings("deprecation")
-    public void processResult(int rc, String path, Object ctx, Stat stat) {
-        LOG.debug("ZooKeeper callback on node: {} code: {}", znode, rc);
-
-        switch (rc) {
-        case Code.Ok:
-        case Code.NoNode:
-            processDelta();
-            return;
-
-        case Code.SessionExpired:
-        case Code.NoAuth:
-        case Code.ConnectionLoss:
-            return;
-
-        default:
-            watch();
-        }
-    }
-
-    private void processDelta() {
-        if (closed) {
-            return;
-        }
-
-        if (zk.getState() != ZooKeeper.States.CONNECTED) {
-            LOG.debug("ZooKeeper connection was already closed! Not processing changed event.");
-            return;
-        }
-
-        try {
-            if (zk.exists(znode, false) != null) {
-                zk.getChildren(znode, this);
-                refreshNodes();
-            } else {
-                LOG.debug("znode {} doesn't exist -> not processing any changes", znode);
-            }
-        } catch (Exception e) {
-            if (zk.getState() != ZooKeeper.States.CONNECTED) {
-                LOG.debug("Error getting Zookeeper data: " + e); // e.g. session expired, handled by ZooKeeperDiscovery
-            } else {
-                LOG.error("Error getting ZooKeeper data.", e);
-            }
-        }
-    }
-
-    public synchronized void close() {
-        closed = true;
-        for (EndpointDescription endpoint : nodes.values()) {
-            EndpointEvent event = new EndpointEvent(EndpointEvent.REMOVED, endpoint);
-            endpointListener.endpointChanged(event, null);
-        }
-        nodes.clear();
-    }
-
-    private synchronized void refreshNodes() {
-        if (closed) {
-            return;
-        }
-        LOG.info("Processing change on node: {}", znode);
-
-        Map<String, EndpointDescription> newNodes = new HashMap<String, EndpointDescription>();
-        Map<String, EndpointDescription> prevNodes = new HashMap<String, EndpointDescription>(nodes);
-        processChildren(znode, newNodes, prevNodes);
-
-        // whatever is left in prevNodes now has been removed from Discovery
-        LOG.debug("processChildren done. Nodes that are missing now and need to be removed: {}", prevNodes.values());
-        for (EndpointDescription endpoint : prevNodes.values()) {
-            EndpointEvent event = new EndpointEvent(EndpointEvent.REMOVED, endpoint);
-            endpointListener.endpointChanged(event, null);
-        }
-        nodes = newNodes;
-    }
-
-    /**
-     * Iterates through all child nodes of the given node and tries to find
-     * endpoints. If the recursive flag is set it also traverses into the child
-     * nodes.
-     *
-     * @return true if an endpoint was found and if the node therefore needs to
-     *         be monitored for changes
-     */
-    private boolean processChildren(String zn, Map<String, EndpointDescription> newNodes,
-            Map<String, EndpointDescription> prevNodes) {
-        List<String> children;
-        try {
-            LOG.debug("Processing the children of {}", zn);
-            children = zk.getChildren(zn, false);
-
-            boolean foundANode = false;
-            for (String child : children) {
-                String childZNode = zn + '/' + child;
-                EndpointDescription endpoint = getEndpointDescriptionFromNode(childZNode);
-                if (endpoint != null) {
-                    EndpointDescription prevEndpoint = prevNodes.get(child);
-                    
-                    newNodes.put(child, endpoint);
-                    prevNodes.remove(child);
-                    foundANode = true;
-                    LOG.debug("Properties: {}", endpoint.getProperties());
-                    if (prevEndpoint == null) {
-                        // This guy is new
-                        LOG.info("found new node " + zn + "/[" + child + "]   ( []->child )  props: "
-                                + endpoint.getProperties().values());
-                        EndpointEvent event = new EndpointEvent(EndpointEvent.ADDED, endpoint);
-                        endpointListener.endpointChanged(event, null);
-                    } else if (!prevEndpoint.getProperties().equals(endpoint.getProperties())) {
-                        LOG.info("Found changed node " + zn + "/[" + child + "]   ( []->child )  props: "
-                                + endpoint.getProperties().values());
-                        EndpointEvent event = new EndpointEvent(EndpointEvent.MODIFIED, endpoint);
-                        endpointListener.endpointChanged(event, null);
-                    }
-                }
-                if (recursive && processChildren(childZNode, newNodes, prevNodes)) {
-                    zk.getChildren(childZNode, this);
-                }
-            }
-
-            return foundANode;
-        } catch (KeeperException e) {
-            LOG.error("Problem processing ZooKeeper node", e);
-        } catch (InterruptedException e) {
-            LOG.error("Problem processing ZooKeeper node", e);
-        }
-        return false;
-    }
-
-    /**
-     * Retrieves data from the given node and parses it into an EndpointDescription.
-     *
-     * @param node a node path
-     * @return endpoint found in the node or null if no endpoint was found
-     */
-    private EndpointDescription getEndpointDescriptionFromNode(String node) {
-        try {
-            Stat stat = zk.exists(node, false);
-            if (stat == null || stat.getDataLength() <= 0) {
-                return null;
-            }
-            byte[] data = zk.getData(node, false, null);
-            LOG.debug("Got data for node: {}", node);
-
-            EndpointDescription endpoint = parser.readEndpoint(new ByteArrayInputStream(data));
-            if (endpoint != null) {
-                return endpoint;
-            }
-            LOG.warn("No Discovery information found for node: {}", node);
-        } catch (Exception e) {
-            LOG.error("Problem getting EndpointDescription from node " + node, e);
-        }
-        return null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/ca922a42/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManager.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManager.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManager.java
index 0aa98b3..7b5c7d2 100644
--- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManager.java
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManager.java
@@ -18,23 +18,13 @@
  */
 package org.apache.aries.rsa.discovery.zookeeper.subscribe;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Dictionary;
 import java.util.HashMap;
-import java.util.Hashtable;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 
 import org.apache.aries.rsa.discovery.zookeeper.ZooKeeperDiscovery;
 import org.apache.aries.rsa.discovery.zookeeper.repository.ZookeeperEndpointRepository;
 import org.apache.aries.rsa.util.StringPlus;
-import org.apache.zookeeper.ZooKeeper;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.Filter;
 import org.osgi.framework.ServiceReference;
 import org.osgi.service.remoteserviceadmin.EndpointDescription;
 import org.osgi.service.remoteserviceadmin.EndpointEvent;
@@ -50,165 +40,95 @@ import org.slf4j.LoggerFactory;
  * These events are then forwarded to all interested EndpointEventListeners.
  */
 @SuppressWarnings({"deprecation", "rawtypes"})
-public class InterfaceMonitorManager {
+public class InterfaceMonitorManager implements EndpointEventListener {
     private static final Logger LOG = LoggerFactory.getLogger(InterfaceMonitorManager.class);
-    private static final Pattern OBJECTCLASS_PATTERN = Pattern.compile(".*\\(objectClass=([^)]+)\\).*");
 
-    private final BundleContext bctx;
-    private final ZooKeeper zk;
-    // map of EndpointEventListeners and the scopes they are interested in
-    private final Map<ServiceReference, List<String>> epListenerScopes =
-            new HashMap<ServiceReference, List<String>>();
-    // map of scopes and their interest data
-    private final Map<String, Interest> interests = new HashMap<String, Interest>();
+    private final ZookeeperEndpointRepository repository;
+    private final Map<ServiceReference, Interest> interests = new HashMap<ServiceReference, Interest>();
 
     protected static class Interest {
-        List<ServiceReference> epListeners = 
-            new CopyOnWriteArrayList<ServiceReference>();
-        InterfaceMonitor monitor;
+        List<String> scopes;
+        Object epListener;
     }
 
-    public InterfaceMonitorManager(BundleContext bctx, ZooKeeper zk) {
-        this.bctx = bctx;
-        this.zk = zk;
+    public InterfaceMonitorManager(ZookeeperEndpointRepository repository) {
+        this.repository = repository;
     }
 
-    public void addInterest(ServiceReference<?> eplistener) {
-        if (isOurOwnEndpointEventListener(eplistener)) {
+    public void addInterest(ServiceReference<?> sref, Object epListener) {
+        if (isOurOwnEndpointEventListener(sref)) {
             LOG.debug("Skipping our own EndpointEventListener");
             return;
         }
-        List<String> scopes = getScopes(eplistener);
+        List<String> scopes = getScopes(sref);
         LOG.debug("adding Interests: {}", scopes);
         
-        for (String scope : scopes) {
-            String objClass = getObjectClass(scope);
-            addInterest(eplistener, scope, objClass);
-        }
-    }
-
-    private static boolean isOurOwnEndpointEventListener(ServiceReference<?> EndpointEventListener) {
-        return Boolean.parseBoolean(String.valueOf(
-                EndpointEventListener.getProperty(ZooKeeperDiscovery.DISCOVERY_ZOOKEEPER_ID)));
-    }
-
-    public synchronized void addInterest(ServiceReference epListener, 
-                                         String scope, String objClass) {
         // get or create interest for given scope and add listener to it
-        Interest interest = interests.get(scope);
+        Interest interest = interests.get(epListener);
         if (interest == null) {
             // create interest, add listener and start monitor
             interest = new Interest();
-            interests.put(scope, interest);
-            interest.epListeners.add(epListener); // add it before monitor starts so we don't miss events
-            interest.monitor = createInterfaceMonitor(scope, objClass, interest);
-            interest.monitor.start();
-        } else {
-            // interest already exists, so just add listener to it
-            if (!interest.epListeners.contains(epListener)) {
-                interest.epListeners.add(epListener);
-            }
-            // notify listener of all known endpoints for given scope
-            // (as EndpointEventListener contract requires of all added/modified listeners)
-            for (EndpointDescription endpoint : interest.monitor.getEndpoints()) {
-                EndpointEvent event = new EndpointEvent(EndpointEvent.ADDED, endpoint);
-                notifyListeners(event, scope, Arrays.asList(epListener));
-            }
-        }
-
-        // add scope to listener's scopes list
-        List<String> scopes = epListenerScopes.get(epListener);
-        if (scopes == null) {
-            scopes = new ArrayList<String>(1);
-            epListenerScopes.put(epListener, scopes);
-        }
-        if (!scopes.contains(scope)) {
-            scopes.add(scope);
+            interest.epListener = epListener;
+            interest.scopes = scopes;
+            interests.put(sref, interest);
+            sendExistingEndpoints(scopes, epListener);
         }
     }
 
-    public synchronized void removeInterest(ServiceReference<EndpointEventListener> EndpointEventListener) {
-        LOG.info("removing EndpointEventListener interests: {}", EndpointEventListener);
-        List<String> scopes = epListenerScopes.get(EndpointEventListener);
-        if (scopes == null) {
-            return;
+    private void sendExistingEndpoints(List<String> scopes, Object epListener) {
+        for (EndpointDescription endpoint : repository.getAll()) {
+            EndpointEvent event = new EndpointEvent(EndpointEvent.ADDED, endpoint);
+            notifyListener(event, scopes, epListener);
         }
+    }
 
-        for (String scope : scopes) {
-            Interest interest = interests.get(scope);
-            if (interest != null) {
-                interest.epListeners.remove(EndpointEventListener);
-                if (interest.epListeners.isEmpty()) {
-                    interest.monitor.close();
-                    interests.remove(scope);
-                }
-            }
-        }
-        epListenerScopes.remove(EndpointEventListener);
+    private static boolean isOurOwnEndpointEventListener(ServiceReference<?> EndpointEventListener) {
+        return Boolean.parseBoolean(String.valueOf(
+                EndpointEventListener.getProperty(ZooKeeperDiscovery.DISCOVERY_ZOOKEEPER_ID)));
     }
 
-    protected InterfaceMonitor createInterfaceMonitor(final String scope, String objClass, final Interest interest) {
-        // holding this object's lock in the callbacks can lead to a deadlock with InterfaceMonitor
-        EndpointEventListener listener = new EndpointEventListener() {
+    public synchronized void removeInterest(ServiceReference<EndpointEventListener> epListenerRef) {
+        LOG.info("removing EndpointEventListener interests: {}", epListenerRef);
+        interests.remove(epListenerRef);
+    }
 
-            @Override
-            public void endpointChanged(EndpointEvent event, String filter) {
-                notifyListeners(event, scope, interest.epListeners);
-            }
-        };
-        return new InterfaceMonitor(zk, objClass, listener, scope);
+    @Override
+    public void endpointChanged(EndpointEvent event, String filter) {
+        for (Interest interest : interests.values()) {
+            notifyListener(event, interest.scopes, interest.epListener);
+        }
     }
 
-    private void notifyListeners(EndpointEvent event, String currentScope,
-            List<ServiceReference> epListeners) {
+    private void notifyListener(EndpointEvent event, List<String> scopes, Object service) {
         EndpointDescription endpoint = event.getEndpoint();
-        for (ServiceReference<?> epListenerRef : epListeners) {
-            if (epListenerRef.getBundle() == null) {
-                LOG.info("listening service was unregistered, ignoring");
-            }
-            Object service = bctx.getService(epListenerRef);
-            LOG.trace("matching {} against {}", endpoint, currentScope);
-            if (matchFilter(bctx, currentScope, endpoint)) {
-                LOG.debug("Matched {} against {}", endpoint, currentScope);
-            try {
-                if (service instanceof EndpointEventListener) {
-                    EndpointEventListener epeListener = (EndpointEventListener)service;
-                    notifyListener(event, currentScope, epeListener);
-                } else if (service instanceof EndpointListener) {
-                    EndpointListener epListener = (EndpointListener)service;
-                    notifyListener(event, currentScope, epListener);
-                }
-            } finally {
-                if (service != null) {
-                    bctx.ungetService(epListenerRef);
-                }
-            }
-            }
+        String currentScope = getFirstMatch(scopes, endpoint);
+        if (currentScope == null) {
+            return;
         }
-    }
-    
-    private static boolean matchFilter(BundleContext bctx, String filter, EndpointDescription endpoint) {
-        if (filter == null) {
-            return false;
+        LOG.debug("Matched {} against {}", endpoint, currentScope);
+        if (service instanceof EndpointEventListener) {
+            notifyEEListener(event, currentScope, (EndpointEventListener)service);
+        } else if (service instanceof EndpointListener) {
+            notifyEListener(event, currentScope, (EndpointListener)service);
         }
+    }
     
-        try {
-            Filter f = bctx.createFilter(filter);
-            Dictionary<String, Object> dict = new Hashtable<String, Object>(endpoint.getProperties());
-            return f.match(dict);
-        } catch (Exception e) {
-            return false;
+    private String getFirstMatch(List<String> scopes, EndpointDescription endpoint) {
+        for (String scope : scopes) {
+            if (endpoint.matches(scope)) {
+                return scope;
+            }
         }
+        return null;
     }
 
-
-    private void notifyListener(EndpointEvent event, String currentScope, EndpointEventListener listener) {
+    private void notifyEEListener(EndpointEvent event, String currentScope, EndpointEventListener listener) {
         EndpointDescription endpoint = event.getEndpoint();
         LOG.info("Calling endpointchanged on class {} for filter {}, type {}, endpoint {} ", listener, currentScope, endpoint);
         listener.endpointChanged(event, currentScope);
     }
     
-    private void notifyListener(EndpointEvent event, String currentScope, EndpointListener listener) {
+    private void notifyEListener(EndpointEvent event, String currentScope, EndpointListener listener) {
         EndpointDescription endpoint = event.getEndpoint();
         LOG.info("Calling old listener on class {} for filter {}, type {}, endpoint {} ", listener, currentScope, endpoint);
         switch (event.getType()) {
@@ -228,49 +148,18 @@ public class InterfaceMonitorManager {
     }
 
     public synchronized void close() {
-        for (Interest interest : interests.values()) {
-            interest.monitor.close();
-        }
         interests.clear();
-        epListenerScopes.clear();
     }
 
     /**
      * Only for test case!
      */
-    protected synchronized Map<String, Interest> getInterests() {
+    protected synchronized Map<ServiceReference, Interest> getInterests() {
         return interests;
     }
 
-    /**
-     * Only for test case!
-     */
-    protected synchronized Map<ServiceReference, List<String>> getEndpointListenerScopes() {
-        return epListenerScopes;
-    }
-
     protected List<String> getScopes(ServiceReference<?> sref) {
         return StringPlus.normalize(sref.getProperty(EndpointEventListener.ENDPOINT_LISTENER_SCOPE));
     }
-    
-    public static String getObjectClass(String scope) {
-        Matcher m = OBJECTCLASS_PATTERN.matcher(scope);
-        return m.matches() ? m.group(1) : null;
-    }
 
-    /**
-     * Returns a service's properties as a map.
-     *
-     * @param serviceReference a service reference
-     * @return the service's properties as a map
-     */
-    public static Map<String, Object> getProperties(ServiceReference<?> serviceReference) {
-        String[] keys = serviceReference.getPropertyKeys();
-        Map<String, Object> props = new HashMap<String, Object>(keys.length);
-        for (String key : keys) {
-            Object val = serviceReference.getProperty(key);
-            props.put(key, val);
-        }
-        return props;
-    }
 }

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/ca922a42/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepositoryTest.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepositoryTest.java b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepositoryTest.java
index 3a20f5a..d9f23e6 100644
--- a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepositoryTest.java
+++ b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepositoryTest.java
@@ -1,15 +1,22 @@
 package org.apache.aries.rsa.discovery.zookeeper.repository;
 
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.samePropertyValuesAs;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
 
 import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.net.URISyntaxException;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
@@ -32,6 +39,7 @@ public class ZookeeperEndpointRepositoryTest {
     private ZooKeeperServer server;
     private ZooKeeper zk;
     private ServerCnxnFactory factory;
+    private List<EndpointEvent> events = new ArrayList<>();
 
     @Before
     public void before() throws IOException, InterruptedException, KeeperException {
@@ -62,25 +70,38 @@ public class ZookeeperEndpointRepositoryTest {
 
     @After
     public void after() throws InterruptedException {
-        zk.close();
+        //zk.close(); // Seems to cause SessionTimeout error 
         factory.shutdown();
     }
 
     @Test
     public void test() throws IOException, URISyntaxException, KeeperException, InterruptedException {
+        final Semaphore sem = new Semaphore(0);
         EndpointEventListener listener = new EndpointEventListener() {
             
             @Override
             public void endpointChanged(EndpointEvent event, String filter) {
-                System.out.println(event);
+                events.add(event);
+                sem.release();
             }
         };
         ZookeeperEndpointRepository repository = new ZookeeperEndpointRepository(zk, listener);
+        
         EndpointDescription endpoint = createEndpoint();
         repository.add(endpoint);
         
+        assertThat(sem.tryAcquire(1000, TimeUnit.SECONDS), equalTo(true));
+
         String path = "/osgi/service_registry/java/lang/Runnable/test.de#-1##service1";
         EndpointDescription ep2 = repository.read(path);
+        assertNotNull(ep2);
+
+        repository.remove(endpoint);
+
+        assertThat(sem.tryAcquire(1000, TimeUnit.SECONDS), equalTo(true));
+        assertThat(events.get(0), samePropertyValuesAs(new EndpointEvent(EndpointEvent.ADDED, endpoint)));
+        assertThat(events.get(1), samePropertyValuesAs(new EndpointEvent(EndpointEvent.REMOVED, endpoint)));
+        
         repository.close();
     }
     

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/ca922a42/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManagerTest.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManagerTest.java b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManagerTest.java
index 84eca09..41b0795 100644
--- a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManagerTest.java
+++ b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManagerTest.java
@@ -18,95 +18,68 @@
  */
 package org.apache.aries.rsa.discovery.zookeeper.subscribe;
 
-import static org.easymock.EasyMock.getCurrentArguments;
+import static org.easymock.EasyMock.expect;
 import static org.junit.Assert.assertEquals;
 
-import java.util.Collections;
-import java.util.Dictionary;
-import java.util.Hashtable;
+import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.zookeeper.ZooKeeper;
+import org.apache.aries.rsa.discovery.zookeeper.ZooKeeperDiscovery;
+import org.apache.aries.rsa.discovery.zookeeper.repository.ZookeeperEndpointRepository;
 import org.easymock.EasyMock;
-import org.easymock.IAnswer;
 import org.easymock.IMocksControl;
 import org.junit.Test;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.Constants;
 import org.osgi.framework.ServiceReference;
+import org.osgi.service.remoteserviceadmin.EndpointDescription;
 import org.osgi.service.remoteserviceadmin.EndpointEventListener;
 
 public class InterfaceMonitorManagerTest {
 
     @Test
     public void testEndpointListenerTrackerCustomizer() {
-        IMocksControl c = EasyMock.createNiceControl();
-        BundleContext ctx = c.createMock(BundleContext.class);
-        ServiceReference<EndpointEventListener> sref = createService(c, "(objectClass=mine)", "mine");
-        ServiceReference<EndpointEventListener> sref2 = createService(c, "(objectClass=mine)", "mine");
-        ZooKeeper zk = c.createMock(ZooKeeper.class);
-        InterfaceMonitorManager eltc = new InterfaceMonitorManager(ctx, zk);
+        IMocksControl c = EasyMock.createControl();
+        ServiceReference<EndpointEventListener> sref = createService(c, "(objectClass=mine)");
+        ServiceReference<EndpointEventListener> sref2 = createService(c, "(objectClass=mine)");
+        ZookeeperEndpointRepository repository = c.createMock(ZookeeperEndpointRepository.class);
+        List<EndpointDescription> endpoints = new ArrayList<>();
+        expect(repository.getAll()).andReturn(endpoints).atLeastOnce();
+        EndpointEventListener epListener1 = c.createMock(EndpointEventListener.class); 
+        EndpointEventListener epListener2 = c.createMock(EndpointEventListener.class); 
 
         c.replay();
 
+        InterfaceMonitorManager eltc = new InterfaceMonitorManager(repository);
         // sref has no scope -> nothing should happen
-        assertEquals(0, eltc.getEndpointListenerScopes().size());
         assertEquals(0, eltc.getInterests().size());
 
-        eltc.addInterest(sref);
-        assertScopeIncludes(sref, eltc);
-        assertEquals(1, eltc.getEndpointListenerScopes().size());
-        assertEquals(1, eltc.getInterests().size());
 
-        eltc.addInterest(sref);
-        assertScopeIncludes(sref, eltc);
-        assertEquals(1, eltc.getEndpointListenerScopes().size());
+        eltc.addInterest(sref, epListener1);
         assertEquals(1, eltc.getInterests().size());
 
-        eltc.addInterest(sref2);
-        assertScopeIncludes(sref, eltc);
-        assertScopeIncludes(sref2, eltc);
-        assertEquals(2, eltc.getEndpointListenerScopes().size());
+        eltc.addInterest(sref, epListener1);
         assertEquals(1, eltc.getInterests().size());
 
+        eltc.addInterest(sref2, epListener2);
+        assertEquals(2, eltc.getInterests().size());
+
         eltc.removeInterest(sref);
-        assertScopeIncludes(sref2, eltc);
-        assertEquals(1, eltc.getEndpointListenerScopes().size());
         assertEquals(1, eltc.getInterests().size());
 
         eltc.removeInterest(sref);
-        assertScopeIncludes(sref2, eltc);
-        assertEquals(1, eltc.getEndpointListenerScopes().size());
         assertEquals(1, eltc.getInterests().size());
 
         eltc.removeInterest(sref2);
-        assertEquals(0, eltc.getEndpointListenerScopes().size());
         assertEquals(0, eltc.getInterests().size());
 
         c.verify();
     }
 
     @SuppressWarnings("unchecked")
-    private ServiceReference<EndpointEventListener> createService(IMocksControl c, String scope, String objectClass) {
+    private ServiceReference<EndpointEventListener> createService(IMocksControl c, String scope) {
         ServiceReference<EndpointEventListener> sref = c.createMock(ServiceReference.class);
-        final Dictionary<String, String> props = new Hashtable<>();
-        props.put(EndpointEventListener.ENDPOINT_LISTENER_SCOPE, scope);
-        props.put(Constants.OBJECTCLASS, objectClass);
-        String[] keys = Collections.list(props.keys()).toArray(new String[]{});
-        EasyMock.expect(sref.getPropertyKeys()).andReturn(keys).anyTimes();
-        EasyMock.expect(sref.getProperty((String)EasyMock.anyObject())).andAnswer(new IAnswer<Object>() {
-            public Object answer() throws Throwable {
-                return props.get(getCurrentArguments()[0]);
-            }
-        }).anyTimes();
+        expect(sref.getProperty(EndpointEventListener.ENDPOINT_LISTENER_SCOPE)).andReturn(scope).atLeastOnce();
+        expect(sref.getProperty(ZooKeeperDiscovery.DISCOVERY_ZOOKEEPER_ID)).andReturn(null).atLeastOnce();
         return sref;
     }
 
-    private void assertScopeIncludes(ServiceReference<EndpointEventListener> sref, InterfaceMonitorManager imm) {
-        List<String> srefScope = imm.getEndpointListenerScopes().get(sref);
-        assertEquals(1, srefScope.size());
-        assertEquals("(objectClass=mine)", srefScope.get(0));
-        
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/ca922a42/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorTest.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorTest.java b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorTest.java
deleted file mode 100644
index e09cfbf..0000000
--- a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorTest.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.aries.rsa.discovery.zookeeper.subscribe;
-
-import static org.easymock.EasyMock.eq;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
-
-import java.util.Collections;
-
-import org.apache.aries.rsa.discovery.zookeeper.repository.ZookeeperEndpointRepository;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher.Event.EventType;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.AsyncCallback.DataCallback;
-import org.apache.zookeeper.data.Stat;
-import org.easymock.EasyMock;
-import org.easymock.IMocksControl;
-import org.osgi.service.remoteserviceadmin.EndpointEventListener;
-
-import junit.framework.TestCase;
-
-public class InterfaceMonitorTest extends TestCase {
-
-    public void testInterfaceMonitor() throws KeeperException, InterruptedException {
-        IMocksControl c = EasyMock.createControl();
-
-        ZooKeeper zk = c.createMock(ZooKeeper.class);
-        expect(zk.getState()).andReturn(ZooKeeper.States.CONNECTED).anyTimes();
-
-        String scope = "(myProp=test)";
-        String interf = "es.schaaf.test";
-        String node = ZookeeperEndpointRepository.getZooKeeperPath(interf);
-
-        EndpointEventListener endpointListener = c.createMock(EndpointEventListener.class);
-        InterfaceMonitor im = new InterfaceMonitor(zk, interf, endpointListener, scope);
-        zk.exists(eq(node), eq(im), eq(im), EasyMock.anyObject());
-        EasyMock.expectLastCall().once();
-        zk.getData(eq(node), eq(im), EasyMock.anyObject(DataCallback.class), EasyMock.anyObject());
-        expectLastCall();
-
-        expect(zk.exists(eq(node), eq(false))).andReturn(new Stat()).anyTimes();
-        expect(zk.getChildren(eq(node), eq(false))).andReturn(Collections.<String> emptyList()).once();
-        expect(zk.getChildren(eq(node), eq(im))).andReturn(Collections.<String> emptyList()).once();
-
-        c.replay();
-        im.start();
-        // simulate a zk callback
-        WatchedEvent we = new WatchedEvent(EventType.NodeCreated, KeeperState.SyncConnected, node);
-        im.process(we);
-        c.verify();
-    }
-}


[4/9] aries-rsa git commit: [ARIES-1774] Remove PublishingEndpointListenerFactory

Posted by cs...@apache.org.
[ARIES-1774] Remove PublishingEndpointListenerFactory


Project: http://git-wip-us.apache.org/repos/asf/aries-rsa/repo
Commit: http://git-wip-us.apache.org/repos/asf/aries-rsa/commit/ed5adf8f
Tree: http://git-wip-us.apache.org/repos/asf/aries-rsa/tree/ed5adf8f
Diff: http://git-wip-us.apache.org/repos/asf/aries-rsa/diff/ed5adf8f

Branch: refs/heads/master
Commit: ed5adf8fc57ea2f10d0d73d97a47bdaa013b1b2f
Parents: 44943bc
Author: Christian Schneider <cs...@adobe.com>
Authored: Wed Feb 7 10:45:30 2018 +0100
Committer: Christian Schneider <cs...@adobe.com>
Committed: Thu Feb 8 17:40:34 2018 +0100

----------------------------------------------------------------------
 .../discovery/zookeeper/ZooKeeperDiscovery.java |  13 ++-
 .../publish/PublishingEndpointListener.java     |  42 +++++--
 .../PublishingEndpointListenerFactory.java      | 110 -------------------
 .../PublishingEndpointListenerFactoryTest.java  | 103 -----------------
 4 files changed, 41 insertions(+), 227 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/ed5adf8f/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java
index 13dadad..c8b020d 100644
--- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java
@@ -25,7 +25,7 @@ import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.aries.rsa.discovery.zookeeper.publish.PublishingEndpointListenerFactory;
+import org.apache.aries.rsa.discovery.zookeeper.publish.PublishingEndpointListener;
 import org.apache.aries.rsa.discovery.zookeeper.subscribe.EndpointListenerTracker;
 import org.apache.aries.rsa.discovery.zookeeper.subscribe.InterfaceMonitorManager;
 import org.apache.zookeeper.WatchedEvent;
@@ -46,7 +46,7 @@ public class ZooKeeperDiscovery implements Watcher, ManagedService {
 
     private final BundleContext bctx;
 
-    private PublishingEndpointListenerFactory endpointListenerFactory;
+    private PublishingEndpointListener endpointListener;
     private ServiceTracker<?, ?> endpointListenerTracker;
     private InterfaceMonitorManager imManager;
     private ZooKeeper zkClient;
@@ -92,8 +92,8 @@ public class ZooKeeperDiscovery implements Watcher, ManagedService {
             return;
         }
         LOG.debug("starting ZookeeperDiscovery");
-        endpointListenerFactory = new PublishingEndpointListenerFactory(zkClient, bctx);
-        endpointListenerFactory.start();
+        endpointListener = new PublishingEndpointListener(zkClient, bctx);
+        endpointListener.start(bctx);
         imManager = new InterfaceMonitorManager(bctx, zkClient);
         endpointListenerTracker = new EndpointListenerTracker(bctx, imManager);
         endpointListenerTracker.open();
@@ -106,8 +106,9 @@ public class ZooKeeperDiscovery implements Watcher, ManagedService {
         }
         started = false;
         closed |= close;
-        if (endpointListenerFactory != null) {
-            endpointListenerFactory.stop();
+        if (endpointListener != null) {
+            endpointListener.stop();
+            endpointListener.close();
         }
         if (endpointListenerTracker != null) {
             endpointListenerTracker.close();

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/ed5adf8f/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListener.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListener.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListener.java
index d5fe7a6..6dc0a08 100644
--- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListener.java
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListener.java
@@ -26,11 +26,14 @@ import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Dictionary;
 import java.util.HashMap;
+import java.util.Hashtable;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.aries.rsa.discovery.endpoint.EndpointDescriptionParser;
+import org.apache.aries.rsa.discovery.zookeeper.ZooKeeperDiscovery;
 import org.apache.aries.rsa.discovery.zookeeper.util.Utils;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -38,12 +41,14 @@ import org.apache.zookeeper.KeeperException.NoNodeException;
 import org.apache.zookeeper.KeeperException.NodeExistsException;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.Stat;
 import org.osgi.framework.BundleContext;
+import org.osgi.framework.Constants;
+import org.osgi.framework.ServiceRegistration;
 import org.osgi.service.remoteserviceadmin.EndpointDescription;
 import org.osgi.service.remoteserviceadmin.EndpointEvent;
 import org.osgi.service.remoteserviceadmin.EndpointEventListener;
 import org.osgi.service.remoteserviceadmin.EndpointListener;
+import org.osgi.service.remoteserviceadmin.RemoteConstants;
 import org.osgi.util.tracker.ServiceTracker;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -57,19 +62,38 @@ public class PublishingEndpointListener implements EndpointEventListener, Endpoi
     private static final Logger LOG = LoggerFactory.getLogger(PublishingEndpointListener.class);
 
     private final ZooKeeper zk;
-    private final ServiceTracker<DiscoveryPlugin, DiscoveryPlugin> discoveryPluginTracker;
     private final List<EndpointDescription> endpoints = new ArrayList<EndpointDescription>();
     private boolean closed;
-
     private final EndpointDescriptionParser endpointDescriptionParser;
 
+    private ServiceTracker<DiscoveryPlugin, DiscoveryPlugin> discoveryPluginTracker;
+    private ServiceRegistration<?> listenerReg;
+
     public PublishingEndpointListener(ZooKeeper zk, BundleContext bctx) {
         this.zk = zk;
-        discoveryPluginTracker = new ServiceTracker<DiscoveryPlugin, DiscoveryPlugin>(bctx, 
-            DiscoveryPlugin.class, null);
-        discoveryPluginTracker.open();
         endpointDescriptionParser = new EndpointDescriptionParser();
     }
+    
+    public void start(BundleContext bctx) {
+        discoveryPluginTracker = new ServiceTracker<DiscoveryPlugin, DiscoveryPlugin>(bctx, 
+                DiscoveryPlugin.class, null);
+            discoveryPluginTracker.open();
+        Dictionary<String, String> props = new Hashtable<String, String>();
+        String uuid = bctx.getProperty(Constants.FRAMEWORK_UUID);
+        props.put(EndpointEventListener.ENDPOINT_LISTENER_SCOPE, 
+                  String.format("(&(%s=*)(%s=%s))", Constants.OBJECTCLASS, 
+                                RemoteConstants.ENDPOINT_FRAMEWORK_UUID, uuid));
+        props.put(ZooKeeperDiscovery.DISCOVERY_ZOOKEEPER_ID, "true");
+        String[] ifAr = {EndpointEventListener.class.getName(), EndpointListener.class.getName()};
+        listenerReg = bctx.registerService(ifAr, this, props);
+    }
+    
+    public void stop() {
+        if (listenerReg != null) {
+            listenerReg.unregister();
+            listenerReg = null;
+        }
+    }
 
     @Override
     public void endpointChanged(EndpointEvent event, String filter) {
@@ -152,7 +176,7 @@ public class PublishingEndpointListener implements EndpointEventListener, Endpoi
         Map<String, Object> props = new HashMap<String, Object>(endpoint.getProperties());
 
         // process plugins
-        Object[] plugins = discoveryPluginTracker.getServices();
+        Object[] plugins = discoveryPluginTracker != null ? discoveryPluginTracker.getServices() : null;
         if (plugins != null) {
             for (Object plugin : plugins) {
                 if (plugin instanceof DiscoveryPlugin) {
@@ -259,6 +283,8 @@ public class PublishingEndpointListener implements EndpointEventListener, Endpoi
             }
             endpoints.clear();
         }
-        discoveryPluginTracker.close();
+        if (discoveryPluginTracker != null) {
+            discoveryPluginTracker.close();
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/ed5adf8f/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerFactory.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerFactory.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerFactory.java
deleted file mode 100644
index 444f7bb..0000000
--- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerFactory.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.aries.rsa.discovery.zookeeper.publish;
-
-import java.util.ArrayList;
-import java.util.Dictionary;
-import java.util.Hashtable;
-import java.util.List;
-
-import org.apache.aries.rsa.discovery.zookeeper.ZooKeeperDiscovery;
-import org.apache.zookeeper.ZooKeeper;
-import org.osgi.framework.Bundle;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.Constants;
-import org.osgi.framework.ServiceFactory;
-import org.osgi.framework.ServiceRegistration;
-import org.osgi.service.remoteserviceadmin.EndpointEventListener;
-import org.osgi.service.remoteserviceadmin.EndpointListener;
-import org.osgi.service.remoteserviceadmin.RemoteConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Creates local EndpointListeners that publish to ZooKeeper.
- */
-@SuppressWarnings("deprecation")
-public class PublishingEndpointListenerFactory implements ServiceFactory<PublishingEndpointListener> {
-
-    private static final Logger LOG = LoggerFactory.getLogger(PublishingEndpointListenerFactory.class);
-
-    private final BundleContext bctx;
-    private final ZooKeeper zk;
-    private final List<PublishingEndpointListener> listeners = new ArrayList<PublishingEndpointListener>();
-    private ServiceRegistration<?> serviceRegistration;
-
-    public PublishingEndpointListenerFactory(ZooKeeper zk, BundleContext bctx) {
-        this.bctx = bctx;
-        this.zk = zk;
-    }
-
-    public PublishingEndpointListener getService(Bundle b, ServiceRegistration<PublishingEndpointListener> sr) {
-        LOG.debug("new EndpointListener from factory");
-        synchronized (listeners) {
-            PublishingEndpointListener pel = new PublishingEndpointListener(zk, bctx);
-            listeners.add(pel);
-            return pel;
-        }
-    }
-
-    public void ungetService(Bundle b, ServiceRegistration<PublishingEndpointListener> sr, 
-                             PublishingEndpointListener pel) {
-        LOG.debug("remove EndpointListener");
-        synchronized (listeners) {
-            if (listeners.remove(pel)) {
-                pel.close();
-            }
-        }
-    }
-
-    public synchronized void start() {
-        Dictionary<String, String> props = new Hashtable<String, String>();
-        String uuid = bctx.getProperty(Constants.FRAMEWORK_UUID);
-        props.put(EndpointEventListener.ENDPOINT_LISTENER_SCOPE, 
-                  String.format("(&(%s=*)(%s=%s))", Constants.OBJECTCLASS, 
-                                RemoteConstants.ENDPOINT_FRAMEWORK_UUID, uuid));
-        props.put(ZooKeeperDiscovery.DISCOVERY_ZOOKEEPER_ID, "true");
-        String[] ifAr = {EndpointEventListener.class.getName(), EndpointListener.class.getName()};
-        serviceRegistration = bctx.registerService(ifAr, this, props);
-    }
-    
-    public synchronized void stop() {
-        if (serviceRegistration != null) {
-            serviceRegistration.unregister();
-            serviceRegistration = null;
-        }
-        synchronized (listeners) {
-            for (PublishingEndpointListener pel : listeners) {
-                pel.close();
-            }
-            listeners.clear();
-        }
-    }
-
-    /**
-     * Only for the test case!
-     * 
-     * @return 
-     */
-    protected List<PublishingEndpointListener> getListeners() {
-        synchronized (listeners) {
-            return listeners;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/ed5adf8f/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerFactoryTest.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerFactoryTest.java b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerFactoryTest.java
deleted file mode 100644
index 381ec9d..0000000
--- a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerFactoryTest.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.aries.rsa.discovery.zookeeper.publish;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.util.Dictionary;
-import java.util.List;
-
-import org.apache.zookeeper.ZooKeeper;
-import org.easymock.EasyMock;
-import org.easymock.IMocksControl;
-import org.junit.Before;
-import org.junit.Test;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.ServiceRegistration;
-import org.osgi.service.remoteserviceadmin.EndpointEventListener;
-import org.osgi.service.remoteserviceadmin.EndpointListener;
-
-@SuppressWarnings("deprecation")
-public class PublishingEndpointListenerFactoryTest {
-
-    private IMocksControl c;
-    private BundleContext ctx;
-    private ZooKeeper zk;
-
-    @Before
-    public void before() {
-        c = EasyMock.createNiceControl();
-        zk = c.createMock(ZooKeeper.class);
-        ctx = createBundleContext();
-    }
-    
-    @Test
-    public void testScope() {
-        PublishingEndpointListenerFactory eplf = new PublishingEndpointListenerFactory(zk, ctx);
-
-        c.replay();
-        eplf.start();
-        c.verify();
-
-    }
-
-    @Test
-    public void testServiceFactory() {
-        PublishingEndpointListenerFactory eplf = new PublishingEndpointListenerFactory(zk, ctx);
-
-        PublishingEndpointListener eli = c.createMock(PublishingEndpointListener.class);
-        eli.close();
-        EasyMock.expectLastCall().once();
-
-        c.replay();
-        eplf.start();
-
-        PublishingEndpointListener service = eplf.getService(null, null);
-        assertTrue(service instanceof EndpointEventListener);
-
-        List<PublishingEndpointListener> listeners = eplf.getListeners();
-        assertEquals(1, listeners.size());
-        assertEquals(service, listeners.get(0));
-
-        eplf.ungetService(null, null, service);
-        listeners = eplf.getListeners();
-        assertEquals(0, listeners.size());
-
-        eplf.ungetService(null, null, eli); // no call to close
-        listeners.add(eli);
-        eplf.ungetService(null, null, eli); // call to close
-        listeners = eplf.getListeners();
-        assertEquals(0, listeners.size());
-
-        c.verify();
-    }
-
-    @SuppressWarnings({ "rawtypes", "unchecked" })
-    private BundleContext createBundleContext() {
-        BundleContext ctx = c.createMock(BundleContext.class);
-        ServiceRegistration sreg = c.createMock(ServiceRegistration.class);
-        String[] ifAr = {EndpointEventListener.class.getName(), EndpointListener.class.getName()};
-        EasyMock.expect(ctx.registerService(EasyMock.aryEq(ifAr), EasyMock.anyObject(),
-                                            (Dictionary<String, String>)EasyMock.anyObject())).andReturn(sreg).once();
-    
-        EasyMock.expect(ctx.getProperty(EasyMock.eq("org.osgi.framework.uuid"))).andReturn("myUUID").anyTimes();
-        return ctx;
-    }
-}


[3/9] aries-rsa git commit: [ARIES-1774] Centralize Zookeeper logic in ZookeeperEndpointRepository

Posted by cs...@apache.org.
[ARIES-1774] Centralize Zookeeper logic in ZookeeperEndpointRepository


Project: http://git-wip-us.apache.org/repos/asf/aries-rsa/repo
Commit: http://git-wip-us.apache.org/repos/asf/aries-rsa/commit/f07ee8b5
Tree: http://git-wip-us.apache.org/repos/asf/aries-rsa/tree/f07ee8b5
Diff: http://git-wip-us.apache.org/repos/asf/aries-rsa/diff/f07ee8b5

Branch: refs/heads/master
Commit: f07ee8b59e226579a55c3463329abf912a2ae291
Parents: fa57fb2
Author: Christian Schneider <cs...@adobe.com>
Authored: Wed Feb 7 16:52:07 2018 +0100
Committer: Christian Schneider <cs...@adobe.com>
Committed: Thu Feb 8 17:40:34 2018 +0100

----------------------------------------------------------------------
 .../discovery/zookeeper/ZooKeeperDiscovery.java |   7 +-
 .../publish/PublishingEndpointListener.java     | 178 ++------------
 .../repository/ZookeeperEndpointRepository.java | 239 +++++++++++++++++++
 .../zookeeper/subscribe/InterfaceMonitor.java   |   4 +-
 .../subscribe/InterfaceMonitorManager.java      |   4 +-
 .../rsa/discovery/zookeeper/util/Utils.java     |  57 -----
 .../publish/PublishingEndpointListenerTest.java |  23 +-
 .../ZookeeperEndpointRepositoryTest.java        | 116 +++++++++
 .../subscribe/InterfaceMonitorTest.java         |   4 +-
 .../rsa/discovery/zookeeper/util/UtilsTest.java |  37 ---
 10 files changed, 383 insertions(+), 286 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/f07ee8b5/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java
index c8b020d..d265a22 100644
--- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java
@@ -26,6 +26,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.aries.rsa.discovery.zookeeper.publish.PublishingEndpointListener;
+import org.apache.aries.rsa.discovery.zookeeper.repository.ZookeeperEndpointRepository;
 import org.apache.aries.rsa.discovery.zookeeper.subscribe.EndpointListenerTracker;
 import org.apache.aries.rsa.discovery.zookeeper.subscribe.InterfaceMonitorManager;
 import org.apache.zookeeper.WatchedEvent;
@@ -55,6 +56,8 @@ public class ZooKeeperDiscovery implements Watcher, ManagedService {
 
     private Dictionary<String, ?> curConfiguration;
 
+    private ZookeeperEndpointRepository repository;
+
     public ZooKeeperDiscovery(BundleContext bctx) {
         this.bctx = bctx;
     }
@@ -92,7 +95,8 @@ public class ZooKeeperDiscovery implements Watcher, ManagedService {
             return;
         }
         LOG.debug("starting ZookeeperDiscovery");
-        endpointListener = new PublishingEndpointListener(zkClient, bctx);
+        repository = new ZookeeperEndpointRepository(zkClient);
+        endpointListener = new PublishingEndpointListener(repository);
         endpointListener.start(bctx);
         imManager = new InterfaceMonitorManager(bctx, zkClient);
         endpointListenerTracker = new EndpointListenerTracker(bctx, imManager);
@@ -108,7 +112,6 @@ public class ZooKeeperDiscovery implements Watcher, ManagedService {
         closed |= close;
         if (endpointListener != null) {
             endpointListener.stop();
-            endpointListener.close();
         }
         if (endpointListenerTracker != null) {
             endpointListenerTracker.close();

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/f07ee8b5/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListener.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListener.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListener.java
index c3bf01f..ceef6b0 100644
--- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListener.java
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListener.java
@@ -18,29 +18,11 @@
  */
 package org.apache.aries.rsa.discovery.zookeeper.publish;
 
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
 import java.util.Dictionary;
-import java.util.HashMap;
 import java.util.Hashtable;
-import java.util.List;
-import java.util.Map;
 
-import org.apache.aries.rsa.discovery.endpoint.EndpointDescriptionParser;
 import org.apache.aries.rsa.discovery.zookeeper.ZooKeeperDiscovery;
-import org.apache.aries.rsa.discovery.zookeeper.util.Utils;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.NoNodeException;
-import org.apache.zookeeper.KeeperException.NodeExistsException;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.ZooKeeper;
+import org.apache.aries.rsa.discovery.zookeeper.repository.ZookeeperEndpointRepository;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.Constants;
 import org.osgi.framework.ServiceRegistration;
@@ -49,28 +31,23 @@ import org.osgi.service.remoteserviceadmin.EndpointEvent;
 import org.osgi.service.remoteserviceadmin.EndpointEventListener;
 import org.osgi.service.remoteserviceadmin.EndpointListener;
 import org.osgi.service.remoteserviceadmin.RemoteConstants;
-import org.osgi.util.tracker.ServiceTracker;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Listens for local Endpoints and publishes them to ZooKeeper.
+ * Listens for local EndpointEvents using old and new style listeners and publishes changes to 
+ * the ZooKeeperEndpointRepository
  */
 @SuppressWarnings("deprecation")
 public class PublishingEndpointListener implements EndpointEventListener, EndpointListener {
 
     private static final Logger LOG = LoggerFactory.getLogger(PublishingEndpointListener.class);
 
-    private final ZooKeeper zk;
-    private final List<EndpointDescription> endpoints = new ArrayList<EndpointDescription>();
-    private boolean closed;
-    private final EndpointDescriptionParser endpointDescriptionParser;
-
     private ServiceRegistration<?> listenerReg;
+    private ZookeeperEndpointRepository repository;
 
-    public PublishingEndpointListener(ZooKeeper zk, BundleContext bctx) {
-        this.zk = zk;
-        endpointDescriptionParser = new EndpointDescriptionParser();
+    public PublishingEndpointListener(ZookeeperEndpointRepository repository) {
+        this.repository = repository;
     }
     
     public void start(BundleContext bctx) {
@@ -109,155 +86,28 @@ public class PublishingEndpointListener implements EndpointEventListener, Endpoi
     
     private void endpointModified(EndpointDescription endpoint, String filter) {
         try {
-            modifyEndpoint(endpoint);
+            repository.modify(endpoint);
         } catch (Exception e) {
             LOG.error("Error modifying endpoint data in zookeeper for endpoint {}", endpoint.getId(), e);
         }
     }
 
-    private void modifyEndpoint(EndpointDescription endpoint) throws URISyntaxException, KeeperException, InterruptedException {
-        Collection<String> interfaces = endpoint.getInterfaces();
-        String endpointKey = getKey(endpoint);
-        Map<String, Object> props = new HashMap<String, Object>(endpoint.getProperties());
-
-        LOG.info("Changing endpoint in zookeeper: {}", endpoint);
-        for (String name : interfaces) {
-            String path = Utils.getZooKeeperPath(name);
-            String fullPath = path + '/' + endpointKey;
-            LOG.info("Changing ZooKeeper node for service with path {}", fullPath);
-            createPath(path, zk);
-            zk.setData(fullPath, getData(endpoint), -1);
-        }
-    }
-
     @Override
     public void endpointAdded(EndpointDescription endpoint, String matchedFilter) {
-        synchronized (endpoints) {
-            if (closed) {
-                return;
-            }
-            if (endpoints.contains(endpoint)) {
-                // TODO -> Should the published endpoint be updated here?
-                return;
-            }
-
-            try {
-                addEndpoint(endpoint);
-                endpoints.add(endpoint);
-            } catch (Exception ex) {
-                LOG.error("Exception while processing the addition of an endpoint.", ex);
-            }
-        }
-    }
-    
-    private byte[] getData(EndpointDescription epd) {
-        ByteArrayOutputStream bos = new ByteArrayOutputStream();
-        endpointDescriptionParser.writeEndpoint(epd, bos);
-        return bos.toByteArray();
-    }
-
-    private void addEndpoint(EndpointDescription endpoint) throws URISyntaxException, KeeperException,
-                                                                  InterruptedException, IOException {
-        Collection<String> interfaces = endpoint.getInterfaces();
-        String endpointKey = getKey(endpoint);
-        LOG.info("Exporting endpoint to zookeeper: {}", endpoint);
-        for (String name : interfaces) {
-            String path = Utils.getZooKeeperPath(name);
-            String fullPath = path + '/' + endpointKey;
-            LOG.info("Creating ZooKeeper node for service with path {}", fullPath);
-            createPath(path, zk);
-            createEphemeralNode(fullPath, getData(endpoint));
-        }
-    }
-
-    private void createEphemeralNode(String fullPath, byte[] data) throws KeeperException, InterruptedException {
         try {
-            zk.create(fullPath, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
-        } catch (NodeExistsException nee) {
-            // this sometimes happens after a ZooKeeper node dies and the ephemeral node
-            // that belonged to the old session was not yet deleted. We need to make our
-            // session the owner of the node so it won't get deleted automatically -
-            // we do this by deleting and recreating it ourselves.
-            LOG.info("node for endpoint already exists, recreating: {}", fullPath);
-            try {
-                zk.delete(fullPath, -1);
-            } catch (NoNodeException nne) {
-                // it's a race condition, but as long as it got deleted - it's ok
-            }
-            zk.create(fullPath, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+            repository.add(endpoint);
+        } catch (Exception ex) {
+            LOG.error("Exception while processing the addition of an endpoint.", ex);
         }
     }
 
     @Override
     public void endpointRemoved(EndpointDescription endpoint, String matchedFilter) {
-        LOG.info("Local EndpointDescription removed: {}", endpoint);
-
-        synchronized (endpoints) {
-            if (closed) {
-                return;
-            }
-            if (!endpoints.contains(endpoint)) {
-                return;
-            }
-
-            try {
-                removeEndpoint(endpoint);
-                endpoints.remove(endpoint);
-            } catch (Exception ex) {
-                LOG.error("Exception while processing the removal of an endpoint", ex);
-            }
-        }
-    }
-
-    private void removeEndpoint(EndpointDescription endpoint) throws UnknownHostException, URISyntaxException {
-        Collection<String> interfaces = endpoint.getInterfaces();
-        String endpointKey = getKey(endpoint);
-        for (String name : interfaces) {
-            String path = Utils.getZooKeeperPath(name);
-            String fullPath = path + '/' + endpointKey;
-            LOG.debug("Removing ZooKeeper node: {}", fullPath);
-            try {
-                zk.delete(fullPath, -1);
-            } catch (Exception ex) {
-                LOG.debug("Error while removing endpoint: {}", ex); // e.g. session expired
-            }
-        }
-    }
-
-    private static void createPath(String path, ZooKeeper zk) throws KeeperException, InterruptedException {
-        StringBuilder current = new StringBuilder();
-        List<String> parts = Utils.removeEmpty(Arrays.asList(path.split("/")));
-        for (String part : parts) {
-            current.append('/');
-            current.append(part);
-            try {
-                if (zk.exists(current.toString(), false) == null) {
-                    zk.create(current.toString(), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-                }
-            } catch (NodeExistsException nee) {
-                // it's not the first node with this path to ever exist - that's normal
-            }
+        try {
+            repository.remove(endpoint);
+        } catch (Exception ex) {
+            LOG.error("Exception while processing the removal of an endpoint", ex);
         }
     }
 
-    private static String getKey(EndpointDescription endpoint) throws URISyntaxException {
-        URI uri = new URI(endpoint.getId());
-        return new StringBuilder().append(uri.getHost()).append("#").append(uri.getPort())
-            .append("#").append(uri.getPath().replace('/', '#')).toString();
-    }
-
-    public void close() {
-        LOG.debug("closing - removing all endpoints");
-        synchronized (endpoints) {
-            closed = true;
-            for (EndpointDescription endpoint : endpoints) {
-                try {
-                    removeEndpoint(endpoint);
-                } catch (Exception ex) {
-                    LOG.error("Exception while removing endpoint during close", ex);
-                }
-            }
-            endpoints.clear();
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/f07ee8b5/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepository.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepository.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepository.java
new file mode 100644
index 0000000..2349c45
--- /dev/null
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepository.java
@@ -0,0 +1,239 @@
+package org.apache.aries.rsa.discovery.zookeeper.repository;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.aries.rsa.discovery.endpoint.EndpointDescriptionParser;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+import org.osgi.service.remoteserviceadmin.EndpointDescription;
+import org.osgi.service.remoteserviceadmin.EndpointEvent;
+import org.osgi.service.remoteserviceadmin.EndpointEventListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ZookeeperEndpointRepository implements Closeable, Watcher {
+    private static final Logger LOG = LoggerFactory.getLogger(ZookeeperEndpointRepository.class);
+    private final ZooKeeper zk;
+    private final EndpointDescriptionParser parser;
+    private EndpointEventListener listener;
+    public static final String PATH_PREFIX = "/osgi/service_registry";
+    
+    private Map<String, EndpointDescription> nodes = new ConcurrentHashMap<String, EndpointDescription>();
+    
+    public ZookeeperEndpointRepository(ZooKeeper zk) {
+        this(zk, null);
+    }
+    
+    public ZookeeperEndpointRepository(ZooKeeper zk, EndpointEventListener listener) {
+        this.zk = zk;
+        this.listener = listener;
+        this.parser = new EndpointDescriptionParser();
+        try {
+            createPath(PATH_PREFIX);
+        } catch (Exception e) {
+            throw new IllegalStateException("Unable to create base path");
+        }
+        // Not yet needed
+        //this.registerWatcher();
+    }
+
+    private void registerWatcher() {
+        try {
+            List<String> children = zk.getChildren(ZookeeperEndpointRepository.PATH_PREFIX, this);
+            System.out.println(children);
+        } catch (KeeperException e) {
+            e.printStackTrace();
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+    }
+    
+    protected void notifyListener(WatchedEvent wevent) {
+        EndpointDescription ep = read(wevent.getPath());
+        if (ep != null) {
+            int type = getEndpointEventType(wevent);
+            EndpointEvent event = new EndpointEvent(type, ep);
+            listener.endpointChanged(event, null);
+        }
+    }
+    
+    private int getEndpointEventType(WatchedEvent wevent) {
+        EventType type = wevent.getType();
+        return EndpointEvent.ADDED;
+    }
+
+    /**
+     * Retrieves data from the given node and parses it into an EndpointDescription.
+     *
+     * @param path a node path
+     * @return endpoint found in the node or null if no endpoint was found
+     */
+    public EndpointDescription read(String path) {
+        try {
+            Stat stat = zk.exists(path, false);
+            if (stat == null || stat.getDataLength() <= 0) {
+                return null;
+            }
+            byte[] data = zk.getData(path, false, null);
+            LOG.debug("Got data for node: {}", path);
+
+            EndpointDescription endpoint = parser.readEndpoint(new ByteArrayInputStream(data));
+            if (endpoint != null) {
+                return endpoint;
+            }
+            LOG.warn("No Discovery information found for node: {}", path);
+        } catch (Exception e) {
+            LOG.error("Problem getting EndpointDescription from node " + path, e);
+        }
+        return null;
+    }
+
+    public void add(EndpointDescription endpoint) throws URISyntaxException, KeeperException,
+    InterruptedException, IOException {
+        Collection<String> interfaces = endpoint.getInterfaces();
+        String endpointKey = getKey(endpoint);
+    
+        LOG.info("Exporting endpoint to zookeeper: {}", endpoint);
+        for (String name : interfaces) {
+            String path = ZookeeperEndpointRepository.getZooKeeperPath(name);
+            String fullPath = path + '/' + endpointKey;
+            LOG.info("Creating ZooKeeper node for service with path {}", fullPath);
+            createPath(path);
+            createEphemeralNode(fullPath, getData(endpoint));
+        }
+    }
+
+    public void modify(EndpointDescription endpoint) throws URISyntaxException, KeeperException, InterruptedException {
+        Collection<String> interfaces = endpoint.getInterfaces();
+        String endpointKey = getKey(endpoint);
+
+        LOG.info("Changing endpoint in zookeeper: {}", endpoint);
+        for (String name : interfaces) {
+            String path = ZookeeperEndpointRepository.getZooKeeperPath(name);
+            String fullPath = path + '/' + endpointKey;
+            LOG.info("Changing ZooKeeper node for service with path {}", fullPath);
+            createPath(path);
+            zk.setData(fullPath, getData(endpoint), -1);
+        }
+    }
+    
+    public void remove(EndpointDescription endpoint) throws UnknownHostException, URISyntaxException {
+        Collection<String> interfaces = endpoint.getInterfaces();
+        String endpointKey = getKey(endpoint);
+        for (String name : interfaces) {
+            String path = ZookeeperEndpointRepository.getZooKeeperPath(name);
+            String fullPath = path + '/' + endpointKey;
+            LOG.debug("Removing ZooKeeper node: {}", fullPath);
+            try {
+                zk.delete(fullPath, -1);
+            } catch (Exception ex) {
+                LOG.debug("Error while removing endpoint: {}", ex); // e.g. session expired
+            }
+        }
+    }
+    
+    public List<EndpointDescription> getAll() throws KeeperException, InterruptedException {
+        return null;
+    }
+
+    @Override
+    public void close() throws IOException {
+
+    }
+
+    private byte[] getData(EndpointDescription epd) {
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        parser.writeEndpoint(epd, bos);
+        return bos.toByteArray();
+    }
+    
+    private void createEphemeralNode(String fullPath, byte[] data) throws KeeperException, InterruptedException {
+        try {
+            zk.create(fullPath, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+        } catch (NodeExistsException nee) {
+            // this sometimes happens after a ZooKeeper node dies and the ephemeral node
+            // that belonged to the old session was not yet deleted. We need to make our
+            // session the owner of the node so it won't get deleted automatically -
+            // we do this by deleting and recreating it ourselves.
+            LOG.info("node for endpoint already exists, recreating: {}", fullPath);
+            try {
+                zk.delete(fullPath, -1);
+            } catch (NoNodeException nne) {
+                // it's a race condition, but as long as it got deleted - it's ok
+            }
+            zk.create(fullPath, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+        }
+    }
+    
+    private void createPath(String path) throws KeeperException, InterruptedException {
+        StringBuilder current = new StringBuilder();
+        List<String> parts = ZookeeperEndpointRepository.removeEmpty(Arrays.asList(path.split("/")));
+        for (String part : parts) {
+            current.append('/');
+            current.append(part);
+            try {
+                if (zk.exists(current.toString(), false) == null) {
+                    zk.create(current.toString(), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+                }
+            } catch (NodeExistsException nee) {
+                // it's not the first node with this path to ever exist - that's normal
+            }
+        }
+    }
+
+    /**
+     * Removes nulls and empty strings from the given string array.
+     *
+     * @param strings an array of strings
+     * @return a new array containing the non-null and non-empty
+     *         elements of the original array in the same order
+     */
+    public static List<String> removeEmpty(List<String> strings) {
+        List<String> result = new ArrayList<String>();
+        if (strings == null) {
+            return result;
+        }
+        for (String s : strings) {
+            if (s != null && !s.isEmpty()) {
+                result.add(s);
+            }
+        }
+        return result;
+    }
+
+    public static String getZooKeeperPath(String name) {
+        return name == null || name.isEmpty() ? PATH_PREFIX : PATH_PREFIX + '/' + name.replace('.', '/');
+    }
+
+    private static String getKey(EndpointDescription endpoint) throws URISyntaxException {
+        URI uri = new URI(endpoint.getId());
+        return new StringBuilder().append(uri.getHost()).append("#").append(uri.getPort())
+            .append("#").append(uri.getPath().replace('/', '#')).toString();
+    }
+
+    @Override
+    public void process(WatchedEvent event) {
+        
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/f07ee8b5/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitor.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitor.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitor.java
index 7078bb8..2c90b3c 100644
--- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitor.java
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitor.java
@@ -25,7 +25,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.aries.rsa.discovery.endpoint.EndpointDescriptionParser;
-import org.apache.aries.rsa.discovery.zookeeper.util.Utils;
+import org.apache.aries.rsa.discovery.zookeeper.repository.ZookeeperEndpointRepository;
 import org.apache.zookeeper.AsyncCallback.StatCallback;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.Code;
@@ -67,7 +67,7 @@ public class InterfaceMonitor implements Watcher, StatCallback {
 
     public InterfaceMonitor(ZooKeeper zk, String objClass, EndpointEventListener endpointListener, String scope) {
         this.zk = zk;
-        this.znode = Utils.getZooKeeperPath(objClass);
+        this.znode = ZookeeperEndpointRepository.getZooKeeperPath(objClass);
         this.recursive = objClass == null || objClass.isEmpty();
         this.endpointListener = endpointListener;
         this.parser = new EndpointDescriptionParser();

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/f07ee8b5/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManager.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManager.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManager.java
index 7d6e4ae..0aa98b3 100644
--- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManager.java
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManager.java
@@ -30,7 +30,7 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import org.apache.aries.rsa.discovery.zookeeper.ZooKeeperDiscovery;
-import org.apache.aries.rsa.discovery.zookeeper.util.Utils;
+import org.apache.aries.rsa.discovery.zookeeper.repository.ZookeeperEndpointRepository;
 import org.apache.aries.rsa.util.StringPlus;
 import org.apache.zookeeper.ZooKeeper;
 import org.osgi.framework.BundleContext;
@@ -250,7 +250,7 @@ public class InterfaceMonitorManager {
     }
 
     protected List<String> getScopes(ServiceReference<?> sref) {
-        return Utils.removeEmpty(StringPlus.normalize(sref.getProperty(EndpointEventListener.ENDPOINT_LISTENER_SCOPE)));
+        return StringPlus.normalize(sref.getProperty(EndpointEventListener.ENDPOINT_LISTENER_SCOPE));
     }
     
     public static String getObjectClass(String scope) {

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/f07ee8b5/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/util/Utils.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/util/Utils.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/util/Utils.java
deleted file mode 100644
index 289ae32..0000000
--- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/util/Utils.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.aries.rsa.discovery.zookeeper.util;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public final class Utils {
-
-    static final String PATH_PREFIX = "/osgi/service_registry";
-
-    private Utils() {
-        // never constructed
-    }
-
-    public static String getZooKeeperPath(String name) {
-        return name == null || name.isEmpty() ? PATH_PREFIX : PATH_PREFIX + '/' + name.replace('.', '/');
-    }
-
-    /**
-     * Removes nulls and empty strings from the given string array.
-     *
-     * @param strings an array of strings
-     * @return a new array containing the non-null and non-empty
-     *         elements of the original array in the same order
-     */
-    public static List<String> removeEmpty(List<String> strings) {
-        List<String> result = new ArrayList<String>();
-        if (strings == null) {
-            return result;
-        }
-        for (String s : strings) {
-            if (s != null && !s.isEmpty()) {
-                result.add(s);
-            }
-        }
-        return result;
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/f07ee8b5/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerTest.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerTest.java b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerTest.java
index b7debf6..a61cf76 100644
--- a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerTest.java
+++ b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerTest.java
@@ -23,13 +23,13 @@ import static org.easymock.EasyMock.expect;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.aries.rsa.discovery.zookeeper.repository.ZookeeperEndpointRepository;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooKeeper;
 import org.easymock.EasyMock;
 import org.easymock.IMocksControl;
-import org.osgi.framework.BundleContext;
 import org.osgi.framework.Constants;
 import org.osgi.service.remoteserviceadmin.EndpointDescription;
 import org.osgi.service.remoteserviceadmin.EndpointEvent;
@@ -44,7 +44,6 @@ public class PublishingEndpointListenerTest extends TestCase {
     public void testEndpointRemovalAdding() throws KeeperException, InterruptedException {
         IMocksControl c = EasyMock.createNiceControl();
 
-        BundleContext ctx = c.createMock(BundleContext.class);
         ZooKeeper zk = c.createMock(ZooKeeper.class);
 
         String path = ENDPOINT_PATH;
@@ -53,7 +52,8 @@ public class PublishingEndpointListenerTest extends TestCase {
 
         c.replay();
 
-        PublishingEndpointListener eli = new PublishingEndpointListener(zk, ctx);
+        ZookeeperEndpointRepository repository = new ZookeeperEndpointRepository(zk);
+        PublishingEndpointListener eli = new PublishingEndpointListener(repository);
         EndpointDescription endpoint = createEndpoint();
         eli.endpointChanged(new EndpointEvent(EndpointEvent.ADDED, endpoint), null);
         eli.endpointChanged(new EndpointEvent(EndpointEvent.ADDED, endpoint), null); // should do nothing
@@ -63,23 +63,6 @@ public class PublishingEndpointListenerTest extends TestCase {
         c.verify();
     }
 
-    public void testClose() throws KeeperException, InterruptedException {
-        IMocksControl c = EasyMock.createNiceControl();
-        BundleContext ctx = c.createMock(BundleContext.class);
-        ZooKeeper zk = c.createMock(ZooKeeper.class);
-        expectCreated(zk, ENDPOINT_PATH);
-        expectDeleted(zk, ENDPOINT_PATH);
-
-        c.replay();
-
-        PublishingEndpointListener eli = new PublishingEndpointListener(zk, ctx);
-        EndpointDescription endpoint = createEndpoint();
-        eli.endpointChanged(new EndpointEvent(EndpointEvent.ADDED, endpoint), null);
-        eli.close(); // should result in zk.delete(...)
-
-        c.verify();
-    }
-
     private void expectCreated(ZooKeeper zk, String path) throws KeeperException, InterruptedException {
         expect(zk.create(EasyMock.eq(path), 
                          (byte[])EasyMock.anyObject(), 

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/f07ee8b5/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepositoryTest.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepositoryTest.java b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepositoryTest.java
new file mode 100644
index 0000000..3a20f5a
--- /dev/null
+++ b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepositoryTest.java
@@ -0,0 +1,116 @@
+package org.apache.aries.rsa.discovery.zookeeper.repository;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.URISyntaxException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.osgi.framework.Constants;
+import org.osgi.service.remoteserviceadmin.EndpointDescription;
+import org.osgi.service.remoteserviceadmin.EndpointEvent;
+import org.osgi.service.remoteserviceadmin.EndpointEventListener;
+import org.osgi.service.remoteserviceadmin.RemoteConstants;
+
+public class ZookeeperEndpointRepositoryTest {
+    
+    private ZooKeeperServer server;
+    private ZooKeeper zk;
+    private ServerCnxnFactory factory;
+
+    @Before
+    public void before() throws IOException, InterruptedException, KeeperException {
+        File target = new File("target");
+        File zookeeperDir = new File(target, "zookeeper");
+        server = new ZooKeeperServer(zookeeperDir, zookeeperDir, 2000);
+        factory = new NIOServerCnxnFactory();
+        int clientPort = getClientPort();
+        factory.configure(new InetSocketAddress(clientPort), 10);
+        factory.startup(server);
+        Watcher watcher = new Watcher() {
+
+            @Override
+            public void process(WatchedEvent event) {
+                System.out.println(event);
+            }
+            
+        };
+        zk = new ZooKeeper("localhost:" + server.getClientPort(), 1000, watcher);
+        printNodes("/");
+    }
+    
+    private int getClientPort() throws IOException {
+        try (ServerSocket serverSocket = new ServerSocket(0)) {
+            return serverSocket.getLocalPort();
+        }
+    }
+
+    @After
+    public void after() throws InterruptedException {
+        zk.close();
+        factory.shutdown();
+    }
+
+    @Test
+    public void test() throws IOException, URISyntaxException, KeeperException, InterruptedException {
+        EndpointEventListener listener = new EndpointEventListener() {
+            
+            @Override
+            public void endpointChanged(EndpointEvent event, String filter) {
+                System.out.println(event);
+            }
+        };
+        ZookeeperEndpointRepository repository = new ZookeeperEndpointRepository(zk, listener);
+        EndpointDescription endpoint = createEndpoint();
+        repository.add(endpoint);
+        
+        String path = "/osgi/service_registry/java/lang/Runnable/test.de#-1##service1";
+        EndpointDescription ep2 = repository.read(path);
+        repository.close();
+    }
+    
+    @Test
+    public void testGetZooKeeperPath() {
+        assertEquals(ZookeeperEndpointRepository.PATH_PREFIX + '/' + "org/example/Test",
+            ZookeeperEndpointRepository.getZooKeeperPath("org.example.Test"));
+
+        // used for the recursive discovery
+        assertEquals(ZookeeperEndpointRepository.PATH_PREFIX, ZookeeperEndpointRepository.getZooKeeperPath(null));
+        assertEquals(ZookeeperEndpointRepository.PATH_PREFIX, ZookeeperEndpointRepository.getZooKeeperPath(""));
+    }
+
+    private EndpointDescription createEndpoint() {
+        Map<String, Object> props = new HashMap<>();
+        props.put(Constants.OBJECTCLASS, new String[] {Runnable.class.getName()});
+        props.put(RemoteConstants.ENDPOINT_ID, "http://test.de/service1");
+        props.put(RemoteConstants.SERVICE_IMPORTED_CONFIGS, "my");
+
+        EndpointDescription endpoint = new EndpointDescription(props);
+        return endpoint;
+    }
+
+    public void printNodes(String path) throws KeeperException, InterruptedException {
+        List<String> children = zk.getChildren(path, false);
+        for (String child : children) {
+            String newPath = path.endsWith("/") ? path : path + "/";
+            String fullPath = newPath + child;
+            System.out.println(fullPath);
+            printNodes(fullPath);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/f07ee8b5/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorTest.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorTest.java b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorTest.java
index 53ddbc4..e09cfbf 100644
--- a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorTest.java
+++ b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorTest.java
@@ -24,7 +24,7 @@ import static org.easymock.EasyMock.expectLastCall;
 
 import java.util.Collections;
 
-import org.apache.aries.rsa.discovery.zookeeper.util.Utils;
+import org.apache.aries.rsa.discovery.zookeeper.repository.ZookeeperEndpointRepository;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher.Event.EventType;
@@ -48,7 +48,7 @@ public class InterfaceMonitorTest extends TestCase {
 
         String scope = "(myProp=test)";
         String interf = "es.schaaf.test";
-        String node = Utils.getZooKeeperPath(interf);
+        String node = ZookeeperEndpointRepository.getZooKeeperPath(interf);
 
         EndpointEventListener endpointListener = c.createMock(EndpointEventListener.class);
         InterfaceMonitor im = new InterfaceMonitor(zk, interf, endpointListener, scope);

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/f07ee8b5/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/util/UtilsTest.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/util/UtilsTest.java b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/util/UtilsTest.java
deleted file mode 100644
index 4d41fb0..0000000
--- a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/util/UtilsTest.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.aries.rsa.discovery.zookeeper.util;
-
-import org.apache.aries.rsa.discovery.zookeeper.util.Utils;
-
-import junit.framework.TestCase;
-
-public class UtilsTest extends TestCase {
-
-    public void testGetZooKeeperPath() {
-        assertEquals(Utils.PATH_PREFIX + '/' + "org/example/Test",
-            Utils.getZooKeeperPath("org.example.Test"));
-
-        // used for the recursive discovery
-        assertEquals(Utils.PATH_PREFIX, Utils.getZooKeeperPath(null));
-        assertEquals(Utils.PATH_PREFIX, Utils.getZooKeeperPath(""));
-    }
-
-
-}


[8/9] aries-rsa git commit: [ARIES-1769] Fixing wrong intent names and exception in timeout case

Posted by cs...@apache.org.
[ARIES-1769] Fixing wrong intent names and exception in timeout case


Project: http://git-wip-us.apache.org/repos/asf/aries-rsa/repo
Commit: http://git-wip-us.apache.org/repos/asf/aries-rsa/commit/9a08b235
Tree: http://git-wip-us.apache.org/repos/asf/aries-rsa/tree/9a08b235
Diff: http://git-wip-us.apache.org/repos/asf/aries-rsa/diff/9a08b235

Branch: refs/heads/master
Commit: 9a08b23533b4b085b10129d8e06d07b677a6c8d9
Parents: 362b68d
Author: Christian Schneider <cs...@adobe.com>
Authored: Thu Feb 8 17:31:10 2018 +0100
Committer: Christian Schneider <cs...@adobe.com>
Committed: Thu Feb 8 17:40:34 2018 +0100

----------------------------------------------------------------------
 .../org/apache/aries/rsa/provider/tcp/TCPProvider.java  |  2 +-
 .../org/apache/aries/rsa/provider/tcp/TcpEndpoint.java  |  2 +-
 .../aries/rsa/provider/tcp/TcpProviderIntentTest.java   | 12 ++++++++++++
 .../apache/aries/rsa/provider/tcp/TcpProviderTest.java  |  8 ++++++--
 4 files changed, 20 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/9a08b235/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TCPProvider.java
----------------------------------------------------------------------
diff --git a/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TCPProvider.java b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TCPProvider.java
index bceb063..6d3b6ad 100644
--- a/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TCPProvider.java
+++ b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TCPProvider.java
@@ -40,7 +40,7 @@ import org.slf4j.LoggerFactory;
 @SuppressWarnings("rawtypes")
 public class TCPProvider implements DistributionProvider {
     static final String TCP_CONFIG_TYPE = "aries.tcp";
-    private static final String[] SUPPORTED_INTENTS = { "osgi.basic", "osgi.sync"};
+    private static final String[] SUPPORTED_INTENTS = { "osgi.basic", "osgi.async"};
     
     private Logger logger = LoggerFactory.getLogger(TCPProvider.class);
 

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/9a08b235/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpEndpoint.java
----------------------------------------------------------------------
diff --git a/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpEndpoint.java b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpEndpoint.java
index fc207c3..00d06f2 100644
--- a/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpEndpoint.java
+++ b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpEndpoint.java
@@ -45,7 +45,7 @@ public class TcpEndpoint implements Endpoint {
         String endpointId = String.format("tcp://%s:%s",hostName, tcpServer.getPort());
         effectiveProperties.put(RemoteConstants.ENDPOINT_ID, endpointId);
         effectiveProperties.put(RemoteConstants.SERVICE_EXPORTED_CONFIGS, "");
-        effectiveProperties.put(RemoteConstants.SERVICE_INTENTS, Arrays.asList("osgi.basic, osgi.async"));
+        effectiveProperties.put(RemoteConstants.SERVICE_INTENTS, Arrays.asList("osgi.basic", "osgi.async"));
         
         // tck tests for one such property ... so we provide it
         effectiveProperties.put(TCPProvider.TCP_CONFIG_TYPE + ".id", endpointId);

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/9a08b235/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderIntentTest.java
----------------------------------------------------------------------
diff --git a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderIntentTest.java b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderIntentTest.java
index c339ca6..1d7928c 100644
--- a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderIntentTest.java
+++ b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderIntentTest.java
@@ -18,8 +18,10 @@
  */
 package org.apache.aries.rsa.provider.tcp;
 
+import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
 
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -49,6 +51,16 @@ public class TcpProviderIntentTest {
     }
     
     @Test
+    public void basicAndAsnycIntents() {
+        Map<String, Object> props = new HashMap<String, Object>();
+        EndpointHelper.addObjectClass(props, exportedInterfaces);
+        String[] standardIntents = new String[] {"osgi.basic", "osgi.async"};
+        props.put(RemoteConstants.SERVICE_EXPORTED_INTENTS, standardIntents);
+        Endpoint ep = provider.exportService(myService, bc, props, exportedInterfaces);
+        Assert.assertThat("Service should be exported as the intents: " + Arrays.toString(standardIntents) + " must be supported", ep, notNullValue());
+    }
+    
+    @Test
     public void unknownIntent() {
         Map<String, Object> props = new HashMap<String, Object>();
         EndpointHelper.addObjectClass(props, exportedInterfaces);

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/9a08b235/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderTest.java
----------------------------------------------------------------------
diff --git a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderTest.java b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderTest.java
index 538b52b..67678f3 100644
--- a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderTest.java
+++ b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderTest.java
@@ -18,8 +18,10 @@
  */
 package org.apache.aries.rsa.provider.tcp;
 
+import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.core.StringStartsWith.startsWith;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
 
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
@@ -47,6 +49,7 @@ import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceException;
 import org.osgi.util.promise.Promise;
 
 public class TcpProviderTest {
@@ -81,8 +84,9 @@ public class TcpProviderTest {
         try {
             myServiceProxy.callSlow(TIMEOUT + 100);
             Assert.fail("Expecting timeout");
-        } catch (RuntimeException e) {
-            Assert.assertEquals(SocketTimeoutException.class, e.getCause().getClass());
+        } catch (ServiceException e) {
+            assertThat(e.getCause().getClass().getName(), equalTo(SocketTimeoutException.class.getName()));
+            assertThat(e.getType(), equalTo(ServiceException.REMOTE));
         }
     }
 


[9/9] aries-rsa git commit: [ARIES-1776] Fixes for tck tests with SecurityManager

Posted by cs...@apache.org.
[ARIES-1776] Fixes for tck tests with SecurityManager


Project: http://git-wip-us.apache.org/repos/asf/aries-rsa/repo
Commit: http://git-wip-us.apache.org/repos/asf/aries-rsa/commit/75448368
Tree: http://git-wip-us.apache.org/repos/asf/aries-rsa/tree/75448368
Diff: http://git-wip-us.apache.org/repos/asf/aries-rsa/diff/75448368

Branch: refs/heads/master
Commit: 75448368d0efecbef48464bbf10791986e20c4b0
Parents: 2033037
Author: Christian Schneider <cs...@adobe.com>
Authored: Thu Feb 8 17:25:52 2018 +0100
Committer: Christian Schneider <cs...@adobe.com>
Committed: Thu Feb 8 17:40:34 2018 +0100

----------------------------------------------------------------------
 .../aries/rsa/core/RemoteServiceAdminCore.java  | 48 ++++++++++++++------
 .../rsa/core/RemoteServiceAdminInstance.java    |  7 +--
 .../aries/rsa/core/event/EventAdminSender.java  | 13 ++++--
 .../rsa/core/RemoteServiceAdminCoreTest.java    | 29 ++++--------
 4 files changed, 56 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/75448368/rsa/src/main/java/org/apache/aries/rsa/core/RemoteServiceAdminCore.java
----------------------------------------------------------------------
diff --git a/rsa/src/main/java/org/apache/aries/rsa/core/RemoteServiceAdminCore.java b/rsa/src/main/java/org/apache/aries/rsa/core/RemoteServiceAdminCore.java
index 654e61d..1586307 100644
--- a/rsa/src/main/java/org/apache/aries/rsa/core/RemoteServiceAdminCore.java
+++ b/rsa/src/main/java/org/apache/aries/rsa/core/RemoteServiceAdminCore.java
@@ -18,6 +18,8 @@
  */
 package org.apache.aries.rsa.core;
 
+import java.security.AccessController;
+import java.security.PrivilegedAction;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -44,6 +46,7 @@ import org.osgi.framework.ServiceListener;
 import org.osgi.framework.ServiceReference;
 import org.osgi.framework.ServiceRegistration;
 import org.osgi.service.remoteserviceadmin.EndpointDescription;
+import org.osgi.service.remoteserviceadmin.EndpointPermission;
 import org.osgi.service.remoteserviceadmin.ExportReference;
 import org.osgi.service.remoteserviceadmin.ExportRegistration;
 import org.osgi.service.remoteserviceadmin.ImportReference;
@@ -210,22 +213,29 @@ public class RemoteServiceAdminCore implements RemoteServiceAdmin {
         return null;
     }
 
-    private ExportRegistration exportService(List<String> interfaceNames,
-            ServiceReference<?> serviceReference, Map<String, Object> serviceProperties) {
+    private ExportRegistration exportService(
+            final List<String> interfaceNames,
+            final ServiceReference<?> serviceReference, 
+            final Map<String, Object> serviceProperties) {
         LOG.info("interfaces selected for export: " + interfaceNames);
 
         try {
-            Class<?>[] interfaces = getInterfaces(interfaceNames, serviceReference.getBundle());
-            Map<String, Object> eprops = createEndpointProps(serviceProperties, interfaces);
-            Bundle bundle = serviceReference.getBundle();
-            if (bundle == null) {
+            checkPermission(new EndpointPermission("*", EndpointPermission.EXPORT));
+            Bundle serviceBundle = serviceReference.getBundle();
+            if (serviceBundle == null) {
                 throw new IllegalStateException("Service is already unregistered");
             }
-            BundleContext serviceContext = bundle.getBundleContext();
+            final BundleContext serviceContext = serviceBundle.getBundleContext();
+            final Object serviceO = serviceContext.getService(serviceReference);
+            final Class<?>[] interfaces = getInterfaces(serviceO, interfaceNames);
+            final Map<String, Object> eprops = createEndpointProps(serviceProperties, interfaces);
             
             // TODO unget service when export is destroyed
-            Object serviceO = serviceContext.getService(serviceReference);
-            Endpoint endpoint = provider.exportService(serviceO, serviceContext, eprops, interfaces);
+            Endpoint endpoint = AccessController.doPrivileged(new PrivilegedAction<Endpoint>() {
+                public Endpoint run() {
+                    return provider.exportService(serviceO, serviceContext, eprops, interfaces);
+                }
+            });
             if (endpoint == null) {
                 return null;
             }
@@ -238,11 +248,16 @@ public class RemoteServiceAdminCore implements RemoteServiceAdmin {
         }
     }
     
-    private Class<?>[] getInterfaces(List<String> interfaceNames, 
-                                         Bundle serviceBundle) throws ClassNotFoundException {
+    private Class<?>[] getInterfaces(
+            Object serviceO, 
+            List<String> interfaceNames
+            ) throws ClassNotFoundException {
         List<Class<?>> interfaces = new ArrayList<>();
-        for (String interfaceName : interfaceNames) {
-            interfaces.add(serviceBundle.loadClass(interfaceName));
+        Class<?>[] allInterfaces = serviceO.getClass().getInterfaces();
+        for (Class<?> iface : allInterfaces) {
+            if (interfaceNames.contains(iface.getName())) {
+                interfaces.add(iface);
+            }
         }
         return interfaces.toArray(new Class[]{});
     }
@@ -649,4 +664,11 @@ public class RemoteServiceAdminCore implements RemoteServiceAdmin {
             }
         }
     }
+    
+    private void checkPermission(EndpointPermission permission) {
+        SecurityManager sm = System.getSecurityManager();
+        if (sm != null) {
+            sm.checkPermission(permission);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/75448368/rsa/src/main/java/org/apache/aries/rsa/core/RemoteServiceAdminInstance.java
----------------------------------------------------------------------
diff --git a/rsa/src/main/java/org/apache/aries/rsa/core/RemoteServiceAdminInstance.java b/rsa/src/main/java/org/apache/aries/rsa/core/RemoteServiceAdminInstance.java
index cd435ba..7158f52 100644
--- a/rsa/src/main/java/org/apache/aries/rsa/core/RemoteServiceAdminInstance.java
+++ b/rsa/src/main/java/org/apache/aries/rsa/core/RemoteServiceAdminInstance.java
@@ -52,12 +52,7 @@ public class RemoteServiceAdminInstance implements RemoteServiceAdmin {
     @Override
     @SuppressWarnings("rawtypes")
     public List<ExportRegistration> exportService(final ServiceReference ref, final Map properties) {
-        checkPermission(new EndpointPermission("*", EndpointPermission.EXPORT));
-        return AccessController.doPrivileged(new PrivilegedAction<List<ExportRegistration>>() {
-            public List<ExportRegistration> run() {
-                return closed ? Collections.<ExportRegistration>emptyList() : rsaCore.exportService(ref, properties);
-            }
-        });
+        return closed ? Collections.<ExportRegistration>emptyList() : rsaCore.exportService(ref, properties);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/75448368/rsa/src/main/java/org/apache/aries/rsa/core/event/EventAdminSender.java
----------------------------------------------------------------------
diff --git a/rsa/src/main/java/org/apache/aries/rsa/core/event/EventAdminSender.java b/rsa/src/main/java/org/apache/aries/rsa/core/event/EventAdminSender.java
index f42afc9..9f3e55c 100644
--- a/rsa/src/main/java/org/apache/aries/rsa/core/event/EventAdminSender.java
+++ b/rsa/src/main/java/org/apache/aries/rsa/core/event/EventAdminSender.java
@@ -1,5 +1,7 @@
 package org.apache.aries.rsa.core.event;
 
+import java.security.AccessController;
+import java.security.PrivilegedAction;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -30,11 +32,16 @@ public class EventAdminSender {
     }
 
     public void send(RemoteServiceAdminEvent rsaEvent) {
-       Event event = toEvent(rsaEvent);
+       final Event event = toEvent(rsaEvent);
        ServiceReference<EventAdmin> sref = this.context.getServiceReference(EventAdmin.class);
        if (sref != null) {
-           EventAdmin eventAdmin = this.context.getService(sref);
-           eventAdmin.postEvent(event);
+           final EventAdmin eventAdmin = this.context.getService(sref);
+           AccessController.doPrivileged(new PrivilegedAction<Void>() {
+               public Void run() {
+                   eventAdmin.postEvent(event);
+                   return null;
+               }
+           });
            this.context.ungetService(sref);           
        }
     }

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/75448368/rsa/src/test/java/org/apache/aries/rsa/core/RemoteServiceAdminCoreTest.java
----------------------------------------------------------------------
diff --git a/rsa/src/test/java/org/apache/aries/rsa/core/RemoteServiceAdminCoreTest.java b/rsa/src/test/java/org/apache/aries/rsa/core/RemoteServiceAdminCoreTest.java
index 02f6f18..c6f3378 100644
--- a/rsa/src/test/java/org/apache/aries/rsa/core/RemoteServiceAdminCoreTest.java
+++ b/rsa/src/test/java/org/apache/aries/rsa/core/RemoteServiceAdminCoreTest.java
@@ -212,6 +212,7 @@ public class RemoteServiceAdminCoreTest {
         ServiceReference sref = mockServiceReference(sProps);
 
         provider.endpoint = createEndpoint(sProps);
+        ServiceReference sref2 = mockServiceReference(sProps);
         c.replay();
 
         // Export the service for the first time
@@ -230,7 +231,6 @@ public class RemoteServiceAdminCoreTest {
 
         // Ask to export the same service again, this should not go through the whole process again but simply return
         // a copy of the first instance.
-        ServiceReference sref2 = mockServiceReference(sProps);
         List<ExportRegistration> eregs2 = rsaCore.exportService(sref2, null);
         assertEquals(1, eregs2.size());
         ExportRegistration ereg2 = eregs2.iterator().next();
@@ -296,6 +296,7 @@ public class RemoteServiceAdminCoreTest {
         sProps.put("service.exported.interfaces", "*");
         ServiceReference sref = mockServiceReference(sProps);
 
+        c.replay();
         provider.ex = new TestException();
 
         List<ExportRegistration> ereg = rsaCore.exportService(sref, sProps);
@@ -304,6 +305,7 @@ public class RemoteServiceAdminCoreTest {
 
         Collection<ExportReference> exportedServices = rsaCore.getExportedServices();
         assertEquals("No service was exported", 0, exportedServices.size());
+        c.verify();
     }
 
     @Test
@@ -345,34 +347,23 @@ public class RemoteServiceAdminCoreTest {
     }
 
     private ServiceReference mockServiceReference(final Map<String, Object> sProps) {
-        BundleContext bc = EasyMock.createNiceMock(BundleContext.class);
-    
-        Bundle sb = EasyMock.createNiceMock(Bundle.class);
+        BundleContext bc = c.createMock(BundleContext.class);
+        Bundle sb = c.createMock(Bundle.class);
         expect(sb.getBundleContext()).andReturn(bc).anyTimes();
-        try {
-            expect((Class)sb.loadClass(Runnable.class.getName())).andReturn(Runnable.class);
-        } catch (ClassNotFoundException e) {
-            throw new IllegalStateException(e.getMessage(), e);
-        }
-        EasyMock.replay(sb);
-    
         expect(bc.getBundle()).andReturn(sb).anyTimes();
-        EasyMock.replay(bc);
     
-        ServiceReference sref = EasyMock.createNiceMock(ServiceReference.class);
+        String[] propKeys = sProps.keySet().toArray(new String[] {});
+        ServiceReference sref = c.createMock(ServiceReference.class);
         expect(sref.getBundle()).andReturn(sb).anyTimes();
-        expect(sref.getPropertyKeys()).andReturn(sProps.keySet().toArray(new String[] {})).anyTimes();
+        expect(sref.getPropertyKeys()).andReturn(propKeys).anyTimes();
         expect(sref.getProperty((String) EasyMock.anyObject())).andAnswer(new IAnswer<Object>() {
             @Override
             public Object answer() throws Throwable {
                 return sProps.get(EasyMock.getCurrentArguments()[0]);
             }
         }).anyTimes();
-        EasyMock.replay(sref);
-        
-        Runnable svcObject = EasyMock.createNiceMock(Runnable.class);
-        EasyMock.replay(svcObject);
-
+        Runnable svcObject = c.createMock(Runnable.class);
+        expect(bc.getService(sref)).andReturn(svcObject).anyTimes();
         return sref;
     }