You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aries.apache.org by cs...@apache.org on 2018/02/08 16:41:01 UTC
[1/9] aries-rsa git commit: [ARIES-1758] First impl for serializing
Version
Repository: aries-rsa
Updated Branches:
refs/heads/master 44943bcef -> 19849747e
[ARIES-1758] First impl for serializing Version
Project: http://git-wip-us.apache.org/repos/asf/aries-rsa/repo
Commit: http://git-wip-us.apache.org/repos/asf/aries-rsa/commit/19849747
Tree: http://git-wip-us.apache.org/repos/asf/aries-rsa/tree/19849747
Diff: http://git-wip-us.apache.org/repos/asf/aries-rsa/diff/19849747
Branch: refs/heads/master
Commit: 19849747e5d300e4613bd9759cacdfe0570edad9
Parents: 9a08b23
Author: Christian Schneider <cs...@adobe.com>
Authored: Thu Feb 8 17:33:36 2018 +0100
Committer: Christian Schneider <cs...@adobe.com>
Committed: Thu Feb 8 17:40:34 2018 +0100
----------------------------------------------------------------------
.../aries/rsa/provider/tcp/MethodInvoker.java | 18 ++++-
.../aries/rsa/provider/tcp/SerVersion.java | 25 +++++++
.../aries/rsa/provider/tcp/TCPServer.java | 1 +
.../rsa/provider/tcp/TcpInvocationHandler.java | 38 +++++++++-
.../rsa/provider/tcp/VersionDeserializer.java | 67 +++++++++++++++++
.../rsa/provider/tcp/VersionSerializer.java | 79 ++++++++++++++++++++
.../provider/tcp/TcpProviderPrimitiveTest.java | 14 ++++
.../tcp/myservice/PrimitiveService.java | 6 ++
.../tcp/myservice/PrimitiveServiceImpl.java | 12 +++
9 files changed, 256 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/19849747/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/MethodInvoker.java
----------------------------------------------------------------------
diff --git a/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/MethodInvoker.java b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/MethodInvoker.java
index d5f2a03..dadf5f7 100644
--- a/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/MethodInvoker.java
+++ b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/MethodInvoker.java
@@ -24,6 +24,8 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
+import org.osgi.framework.Version;
+
public class MethodInvoker {
private HashMap<Object, Object> primTypes;
@@ -39,9 +41,11 @@ public class MethodInvoker {
this.primTypes.put(Float.TYPE, Float.class);
this.primTypes.put(Double.TYPE, Double.class);
this.primTypes.put(Boolean.TYPE, Boolean.class);
+ this.primTypes.put(Character.TYPE, Character.class);
}
public Object invoke(String methodName, Object[] args) {
+ args = VersionDeserializer.replaceAr(args);
Class<?>[] parameterTypesAr = getTypes(args);
Method method = null;
try {
@@ -52,6 +56,18 @@ public class MethodInvoker {
}
}
+ private void readReplaceVersion(Object[] args) {
+ if (args != null) {
+ for (int c=0; c<args.length; c++) {
+ Object current = args[c];
+ if (current instanceof SerVersion) {
+ SerVersion serVersion = (SerVersion)current;
+ args[c] = new Version(serVersion.getVersion());
+ }
+ }
+ }
+ }
+
private Method getMethod(String methodName, Class<?>[] parameterTypesAr) {
try {
return service.getClass().getMethod(methodName, parameterTypesAr);
@@ -87,7 +103,7 @@ public class MethodInvoker {
}
return type.isAssignableFrom(paramType);
}
-
+
private Class<?>[] getTypes(Object[] args) {
List<Class<?>> parameterTypes = new ArrayList<>();
if (args != null) {
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/19849747/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/SerVersion.java
----------------------------------------------------------------------
diff --git a/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/SerVersion.java b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/SerVersion.java
new file mode 100644
index 0000000..ad912df
--- /dev/null
+++ b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/SerVersion.java
@@ -0,0 +1,25 @@
+package org.apache.aries.rsa.provider.tcp;
+
+import java.io.Serializable;
+
+import org.osgi.framework.Version;
+
+public class SerVersion implements Serializable {
+ private static final long serialVersionUID = 4725855052866235835L;
+
+ private String version;
+
+ public SerVersion() {
+ }
+
+ public SerVersion(Version version) {
+ this.version = version.toString();
+ }
+
+ public String getVersion() {
+ return version;
+ }
+ public void setVersion(String version) {
+ this.version = version;
+ }
+}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/19849747/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TCPServer.java
----------------------------------------------------------------------
diff --git a/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TCPServer.java b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TCPServer.java
index ce1b06a..fca6df3 100644
--- a/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TCPServer.java
+++ b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TCPServer.java
@@ -86,6 +86,7 @@ public class TCPServer implements Closeable, Runnable {
Object[] args = (Object[])ois.readObject();
Object result = invoker.invoke(methodName, args);
result = resolveAsnyc(result);
+ result = VersionSerializer.replace(result);
if (result instanceof InvocationTargetException) {
result = ((InvocationTargetException) result).getCause();
}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/19849747/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpInvocationHandler.java
----------------------------------------------------------------------
diff --git a/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpInvocationHandler.java b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpInvocationHandler.java
index fbda0ec..0fd3526 100644
--- a/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpInvocationHandler.java
+++ b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpInvocationHandler.java
@@ -24,12 +24,17 @@ import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.net.Socket;
+import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Future;
import java.util.function.Supplier;
+import org.osgi.framework.ServiceException;
+import org.osgi.framework.Version;
import org.osgi.util.promise.Deferred;
import org.osgi.util.promise.Promise;
@@ -90,18 +95,23 @@ public class TcpInvocationHandler implements InvocationHandler {
}
private Object handleSyncCall(Method method, Object[] args) throws Throwable {
+ args = (Object[])VersionSerializer.replace(args);
Object result;
try (
- Socket socket = new Socket(this.host, this.port);
+ Socket socket = openSocket();
ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream())
) {
socket.setSoTimeout(timeoutMillis);
out.writeObject(method.getName());
+
out.writeObject(args);
out.flush();
result = parseResult(socket);
+ result = VersionDeserializer.replace(result);
+ } catch (SocketTimeoutException e) {
+ throw new ServiceException("Timeout calling " + host + ":" + port + " method: " + method.getName(), ServiceException.REMOTE, e);
} catch (Throwable e) {
- throw new RuntimeException("Error calling " + host + ":" + port + " method: " + method.getName(), e);
+ throw new ServiceException("Error calling " + host + ":" + port + " method: " + method.getName(), ServiceException.REMOTE, e);
}
if (result instanceof Throwable) {
throw (Throwable)result;
@@ -109,9 +119,31 @@ public class TcpInvocationHandler implements InvocationHandler {
return result;
}
+ private Socket openSocket() throws UnknownHostException, IOException {
+ return AccessController.doPrivileged(new PrivilegedAction<Socket>() {
+
+ @Override
+ public Socket run() {
+ try {
+ return new Socket(host, port);
+ } catch (Exception e) {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ }
+ });
+ }
+
private Object parseResult(Socket socket) throws Throwable {
try (ObjectInputStream in = new LoaderObjectInputStream(socket.getInputStream(), cl)) {
- return in.readObject();
+ return readReplaceVersion(in.readObject());
+ }
+ }
+
+ private Object readReplaceVersion(Object readObject) {
+ if (readObject instanceof SerVersion) {
+ return new Version(((SerVersion)readObject).getVersion());
+ } else {
+ return readObject;
}
}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/19849747/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/VersionDeserializer.java
----------------------------------------------------------------------
diff --git a/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/VersionDeserializer.java b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/VersionDeserializer.java
new file mode 100644
index 0000000..84d965e
--- /dev/null
+++ b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/VersionDeserializer.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.provider.tcp;
+
+import org.osgi.framework.Version;
+
+public class VersionDeserializer {
+
+ public static Object[] replaceAr(Object[] args) {
+ if (args == null) {
+ return null;
+ }
+ Object[] result = new Object[args.length];
+ for (int c=0; c<args.length; c++) {
+ result[c] = replace(args[c]);
+ }
+ return result;
+ }
+
+ public static Object replace(Object obj) {
+ if (obj == null) {
+ return obj;
+ }
+ if (obj.getClass().isArray()) {
+ if (obj.getClass().getComponentType() == SerVersion.class) {
+ return replaceVersionAr((SerVersion[]) obj);
+ } else if (obj.getClass().getComponentType() == Object.class) {
+ return replaceAr((Object[]) obj);
+ } else {
+ return obj;
+ }
+ } else if (obj instanceof SerVersion) {
+ SerVersion serVersion = (SerVersion) obj;
+ return Version.parseVersion(serVersion.getVersion());
+ } else {
+ return obj;
+ }
+ }
+
+ private static Version[] replaceVersionAr(SerVersion[] obj) {
+ if (obj == null) {
+ return null;
+ }
+ Version[] result = new Version[obj.length];
+ for (int c=0; c<obj.length; c++) {
+ result[c] = Version.parseVersion(obj[c].getVersion());
+ }
+ return result;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/19849747/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/VersionSerializer.java
----------------------------------------------------------------------
diff --git a/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/VersionSerializer.java b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/VersionSerializer.java
new file mode 100644
index 0000000..2de7114
--- /dev/null
+++ b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/VersionSerializer.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.provider.tcp;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.osgi.framework.Version;
+
+public class VersionSerializer {
+
+ public static Object replace(Object obj) {
+ if (obj == null) {
+ return null;
+ }
+ if (obj.getClass().isArray()) {
+ if (obj.getClass().getComponentType() == Version.class) {
+ return replaceVersionAr((Version[]) obj);
+ } else if (obj.getClass().getComponentType() == Object.class) {
+ return replaceAr((Object[]) obj);
+ } else {
+ return obj;
+ }
+ } else if (obj instanceof List) {
+ return replaceList((List)obj);
+ } else if (obj instanceof Version) {
+ return new SerVersion((Version) obj);
+ } else {
+ return obj;
+ }
+ }
+
+ private static Object replaceList(List<Version> obj) {
+ List<SerVersion> result = new ArrayList<>();
+ for (Version version : obj) {
+ result.add(new SerVersion(version));
+ }
+ return result;
+ }
+
+ private static SerVersion[] replaceVersionAr(Version[] obj) {
+ if (obj == null) {
+ return null;
+ }
+ SerVersion[] result = new SerVersion[obj.length];
+ for (int c=0; c<obj.length; c++) {
+ result[c] = new SerVersion((Version) obj[c]);
+ }
+ return result;
+ }
+
+ public static Object[] replaceAr(Object[] args) {
+ if (args == null) {
+ return null;
+ }
+ Object[] result = new Object[args.length];
+ for (int c=0; c<args.length; c++) {
+ result[c] = replace(args[c]);
+ }
+ return result;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/19849747/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderPrimitiveTest.java
----------------------------------------------------------------------
diff --git a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderPrimitiveTest.java b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderPrimitiveTest.java
index 2c08104..8badef7 100644
--- a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderPrimitiveTest.java
+++ b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderPrimitiveTest.java
@@ -18,7 +18,10 @@
*/
package org.apache.aries.rsa.provider.tcp;
+import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.core.StringStartsWith.startsWith;
+import static org.junit.Assert.assertThat;
+import static org.osgi.framework.Version.parseVersion;
import java.io.IOException;
import java.util.HashMap;
@@ -34,6 +37,7 @@ import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.osgi.framework.BundleContext;
+import org.osgi.framework.Version;
public class TcpProviderPrimitiveTest {
@@ -98,6 +102,16 @@ public class TcpProviderPrimitiveTest {
public void testByteAr() {
Assert.assertArrayEquals(new byte[]{1}, myServiceProxy.callByteAr(new byte[]{1}));
}
+
+ @Test
+ public void testVersion() {
+ assertThat(myServiceProxy.callVersion(parseVersion("1.0.0")), equalTo(parseVersion("1.0.0")));
+ }
+
+ @Test
+ public void testVersionAr() {
+ assertThat(myServiceProxy.callVersionAr(new Version[] {parseVersion("1.0.0")}), equalTo(new Version[] {parseVersion("1.0.0")}));
+ }
@AfterClass
public static void close() throws IOException {
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/19849747/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/PrimitiveService.java
----------------------------------------------------------------------
diff --git a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/PrimitiveService.java b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/PrimitiveService.java
index d7988a5..679c2fa 100644
--- a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/PrimitiveService.java
+++ b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/PrimitiveService.java
@@ -18,6 +18,8 @@
*/
package org.apache.aries.rsa.provider.tcp.myservice;
+import org.osgi.framework.Version;
+
public interface PrimitiveService {
byte callByte(byte num);
@@ -35,4 +37,8 @@ public interface PrimitiveService {
boolean callBoolean(boolean bool);
byte[] callByteAr(byte[] byteAr);
+
+ Version callVersion(Version version);
+
+ Version[] callVersionAr(Version[] version);
}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/19849747/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/PrimitiveServiceImpl.java
----------------------------------------------------------------------
diff --git a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/PrimitiveServiceImpl.java b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/PrimitiveServiceImpl.java
index 1e6e5fd..9039dc2 100644
--- a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/PrimitiveServiceImpl.java
+++ b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/PrimitiveServiceImpl.java
@@ -1,5 +1,7 @@
package org.apache.aries.rsa.provider.tcp.myservice;
+import org.osgi.framework.Version;
+
public class PrimitiveServiceImpl implements PrimitiveService {
@Override
@@ -43,4 +45,14 @@ public class PrimitiveServiceImpl implements PrimitiveService {
return byteAr;
}
+ @Override
+ public Version callVersion(Version version) {
+ return version;
+ }
+
+ @Override
+ public Version[] callVersionAr(Version[] version) {
+ return version;
+ }
+
}
[2/9] aries-rsa git commit: Fix for NPE
Posted by cs...@apache.org.
Fix for NPE
Project: http://git-wip-us.apache.org/repos/asf/aries-rsa/repo
Commit: http://git-wip-us.apache.org/repos/asf/aries-rsa/commit/362b68d6
Tree: http://git-wip-us.apache.org/repos/asf/aries-rsa/tree/362b68d6
Diff: http://git-wip-us.apache.org/repos/asf/aries-rsa/diff/362b68d6
Branch: refs/heads/master
Commit: 362b68d69225f78002e7db0118b76c9cec1b4b4f
Parents: 7544836
Author: Christian Schneider <cs...@adobe.com>
Authored: Thu Feb 8 17:27:05 2018 +0100
Committer: Christian Schneider <cs...@adobe.com>
Committed: Thu Feb 8 17:40:34 2018 +0100
----------------------------------------------------------------------
.../zookeeper/repository/ZookeeperEndpointRepository.java | 8 ++++++--
1 file changed, 6 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/362b68d6/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepository.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepository.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepository.java
index c5c03a4..6e2641f 100644
--- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepository.java
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepository.java
@@ -247,13 +247,17 @@ public class ZookeeperEndpointRepository implements Closeable, Watcher {
private void handleRemoved(String path) {
EndpointDescription endpoint = nodes.remove(path);
EndpointEvent event = new EndpointEvent(EndpointEvent.REMOVED, endpoint);
- listener.endpointChanged(event, null);
+ if (listener != null) {
+ listener.endpointChanged(event, null);
+ }
}
private void handleChanged(String path, EndpointDescription endpoint) {
EndpointDescription old = nodes.put(path, endpoint);
EndpointEvent event = new EndpointEvent(old == null ? EndpointEvent.ADDED : EndpointEvent.MODIFIED, endpoint);
- listener.endpointChanged(event, null);
+ if (listener != null) {
+ listener.endpointChanged(event, null);
+ }
}
}
[6/9] aries-rsa git commit: [ARIES-1775] Remove DiscoveryPlugin
Posted by cs...@apache.org.
[ARIES-1775] Remove DiscoveryPlugin
Project: http://git-wip-us.apache.org/repos/asf/aries-rsa/repo
Commit: http://git-wip-us.apache.org/repos/asf/aries-rsa/commit/fa57fb2b
Tree: http://git-wip-us.apache.org/repos/asf/aries-rsa/tree/fa57fb2b
Diff: http://git-wip-us.apache.org/repos/asf/aries-rsa/diff/fa57fb2b
Branch: refs/heads/master
Commit: fa57fb2b9374e1bbe00f563caaca4e22c08535eb
Parents: ed5adf8
Author: Christian Schneider <cs...@adobe.com>
Authored: Wed Feb 7 11:16:38 2018 +0100
Committer: Christian Schneider <cs...@adobe.com>
Committed: Thu Feb 8 17:40:34 2018 +0100
----------------------------------------------------------------------
.../zookeeper/publish/DiscoveryPlugin.java | 54 ----------
.../publish/PublishingEndpointListener.java | 27 -----
.../publish/PublishingEndpointListenerTest.java | 103 -------------------
3 files changed, 184 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/fa57fb2b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/DiscoveryPlugin.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/DiscoveryPlugin.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/DiscoveryPlugin.java
deleted file mode 100644
index 41afbf2..0000000
--- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/DiscoveryPlugin.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.aries.rsa.discovery.zookeeper.publish;
-
-import java.util.Map;
-
-/**
- * This interface allows transformation of service registration information before it is pushed into the ZooKeeper
- * discovery system.
- * It can be useful for situations where a host name or port number needs to be changed in cases where the host running
- * the service is known differently from the outside to what the local Java process thinks it is.
- * Extra service properties can also be added to the registration which can be useful to refine the remote service
- * lookup process. <p>
- *
- * DiscoveryPlugins use the OSGi WhiteBoard pattern. To add one to the system, register an instance under this interface
- * with the OSGi Service Registry. All registered DiscoveryPlugin instances are visited and given a chance to
- * process the information before it is pushed into ZooKeeper. <p>
- *
- * Note that the changes made using this plugin do not modify the local service registration.
- *
- */
-public interface DiscoveryPlugin {
-
- /**
- * Process service registration information. Plugins can change this information before it is published into the
- * ZooKeeper discovery system.
- *
- * @param mutableProperties A map of service registration properties. The map is mutable and any changes to the map
- * will be reflected in the ZooKeeper registration.
- * @param endpointKey The key under which the service is registered in ZooKeeper. This key typically has the
- * following format: hostname#port##context. While the actual value of this key is not actually used by the
- * system (people can use it as a hint to understand where the service is located), the value <i>must</i> be
- * unique for all services of a given type.
- * @return The <tt>endpointKey</tt> value to be used. If there is no need to change this simply return the value
- * of the <tt>endpointKey</tt> parameter.
- */
- String process(Map<String, Object> mutableProperties, String endpointKey);
-}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/fa57fb2b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListener.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListener.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListener.java
index 6dc0a08..c3bf01f 100644
--- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListener.java
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListener.java
@@ -66,7 +66,6 @@ public class PublishingEndpointListener implements EndpointEventListener, Endpoi
private boolean closed;
private final EndpointDescriptionParser endpointDescriptionParser;
- private ServiceTracker<DiscoveryPlugin, DiscoveryPlugin> discoveryPluginTracker;
private ServiceRegistration<?> listenerReg;
public PublishingEndpointListener(ZooKeeper zk, BundleContext bctx) {
@@ -75,9 +74,6 @@ public class PublishingEndpointListener implements EndpointEventListener, Endpoi
}
public void start(BundleContext bctx) {
- discoveryPluginTracker = new ServiceTracker<DiscoveryPlugin, DiscoveryPlugin>(bctx,
- DiscoveryPlugin.class, null);
- discoveryPluginTracker.open();
Dictionary<String, String> props = new Hashtable<String, String>();
String uuid = bctx.getProperty(Constants.FRAMEWORK_UUID);
props.put(EndpointEventListener.ENDPOINT_LISTENER_SCOPE,
@@ -124,15 +120,6 @@ public class PublishingEndpointListener implements EndpointEventListener, Endpoi
String endpointKey = getKey(endpoint);
Map<String, Object> props = new HashMap<String, Object>(endpoint.getProperties());
- // process plugins
- Object[] plugins = discoveryPluginTracker.getServices();
- if (plugins != null) {
- for (Object plugin : plugins) {
- if (plugin instanceof DiscoveryPlugin) {
- endpointKey = ((DiscoveryPlugin)plugin).process(props, endpointKey);
- }
- }
- }
LOG.info("Changing endpoint in zookeeper: {}", endpoint);
for (String name : interfaces) {
String path = Utils.getZooKeeperPath(name);
@@ -173,17 +160,6 @@ public class PublishingEndpointListener implements EndpointEventListener, Endpoi
InterruptedException, IOException {
Collection<String> interfaces = endpoint.getInterfaces();
String endpointKey = getKey(endpoint);
- Map<String, Object> props = new HashMap<String, Object>(endpoint.getProperties());
-
- // process plugins
- Object[] plugins = discoveryPluginTracker != null ? discoveryPluginTracker.getServices() : null;
- if (plugins != null) {
- for (Object plugin : plugins) {
- if (plugin instanceof DiscoveryPlugin) {
- endpointKey = ((DiscoveryPlugin)plugin).process(props, endpointKey);
- }
- }
- }
LOG.info("Exporting endpoint to zookeeper: {}", endpoint);
for (String name : interfaces) {
String path = Utils.getZooKeeperPath(name);
@@ -283,8 +259,5 @@ public class PublishingEndpointListener implements EndpointEventListener, Endpoi
}
endpoints.clear();
}
- if (discoveryPluginTracker != null) {
- discoveryPluginTracker.close();
- }
}
}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/fa57fb2b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerTest.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerTest.java b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerTest.java
index fcdc9f7..b7debf6 100644
--- a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerTest.java
+++ b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerTest.java
@@ -20,27 +20,17 @@ package org.apache.aries.rsa.discovery.zookeeper.publish;
import static org.easymock.EasyMock.expect;
-import java.io.ByteArrayOutputStream;
-import java.lang.reflect.Field;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
-import org.apache.aries.rsa.discovery.endpoint.EndpointDescriptionParser;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.easymock.EasyMock;
-import org.easymock.IAnswer;
import org.easymock.IMocksControl;
import org.osgi.framework.BundleContext;
import org.osgi.framework.Constants;
-import org.osgi.framework.Filter;
-import org.osgi.framework.FrameworkUtil;
-import org.osgi.framework.InvalidSyntaxException;
-import org.osgi.framework.ServiceListener;
-import org.osgi.framework.ServiceReference;
import org.osgi.service.remoteserviceadmin.EndpointDescription;
import org.osgi.service.remoteserviceadmin.EndpointEvent;
import org.osgi.service.remoteserviceadmin.RemoteConstants;
@@ -73,49 +63,6 @@ public class PublishingEndpointListenerTest extends TestCase {
c.verify();
}
- public void testDiscoveryPlugin() throws Exception {
- BundleContext ctx = EasyMock.createMock(BundleContext.class);
- stubCreateFilter(ctx);
- ctx.addServiceListener(EasyMock.isA(ServiceListener.class),
- EasyMock.eq("(objectClass=" + DiscoveryPlugin.class.getName() + ")"));
-
- ServiceReference<DiscoveryPlugin> sr1 = createAppendPlugin(ctx);
- ServiceReference<DiscoveryPlugin> sr2 = createPropertyPlugin(ctx);
-
- EasyMock.expect(ctx.getServiceReferences(DiscoveryPlugin.class.getName(), null))
- .andReturn(new ServiceReference[]{sr1, sr2}).anyTimes();
- EasyMock.replay(ctx);
-
- EndpointDescription endpoint = createEndpoint();
-
- Map<String, Object> expectedProps = new HashMap<String, Object>(endpoint.getProperties());
- expectedProps.put("endpoint.id", "http://google.de:80/test/sub/appended");
- expectedProps.put("foo", "bar");
- expectedProps.put("service.imported", "true");
-
- final ZooKeeper zk = EasyMock.createNiceMock(ZooKeeper.class);
- String expectedFullPath = "/osgi/service_registry/org/foo/myClass/some.machine#9876##test";
-
- EndpointDescription epd = new EndpointDescription(expectedProps);
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- new EndpointDescriptionParser().writeEndpoint(epd, bos);
- byte[] data = bos.toByteArray();
- expectCreated(zk, expectedFullPath, EasyMock.aryEq(data));
- EasyMock.replay(zk);
-
- PublishingEndpointListener eli = new PublishingEndpointListener(zk, ctx);
-
- List<EndpointDescription> endpoints = getEndpoints(eli);
- assertEquals("Precondition", 0, endpoints.size());
- eli.endpointChanged(new EndpointEvent(EndpointEvent.ADDED, endpoint), null);
- assertEquals(1, endpoints.size());
-
- //TODO enable
- //EasyMock.verify(zk);
- }
-
-
-
public void testClose() throws KeeperException, InterruptedException {
IMocksControl c = EasyMock.createNiceControl();
BundleContext ctx = c.createMock(BundleContext.class);
@@ -133,56 +80,6 @@ public class PublishingEndpointListenerTest extends TestCase {
c.verify();
}
- @SuppressWarnings("unchecked")
- private ServiceReference<DiscoveryPlugin> createAppendPlugin(BundleContext ctx) {
- DiscoveryPlugin plugin1 = new DiscoveryPlugin() {
- public String process(Map<String, Object> mutableProperties, String endpointKey) {
- String eid = (String) mutableProperties.get("endpoint.id");
- mutableProperties.put("endpoint.id", eid + "/appended");
- return endpointKey;
- }
- };
- ServiceReference<DiscoveryPlugin> sr1 = EasyMock.createMock(ServiceReference.class);
- EasyMock.expect(ctx.getService(sr1)).andReturn(plugin1).anyTimes();
- return sr1;
- }
-
- @SuppressWarnings("unchecked")
- private ServiceReference<DiscoveryPlugin> createPropertyPlugin(BundleContext ctx) {
- DiscoveryPlugin plugin2 = new DiscoveryPlugin() {
- public String process(Map<String, Object> mutableProperties, String endpointKey) {
- mutableProperties.put("foo", "bar");
- return endpointKey.replaceAll("localhost", "some.machine");
- }
- };
- ServiceReference<DiscoveryPlugin> sr2 = EasyMock.createMock(ServiceReference.class);
- EasyMock.expect(ctx.getService(sr2)).andReturn(plugin2).anyTimes();
- return sr2;
- }
-
- @SuppressWarnings("unchecked")
- private List<EndpointDescription> getEndpoints(PublishingEndpointListener eli) throws Exception {
- Field field = eli.getClass().getDeclaredField("endpoints");
- field.setAccessible(true);
- return (List<EndpointDescription>) field.get(eli);
- }
-
- private void stubCreateFilter(BundleContext ctx) throws InvalidSyntaxException {
- EasyMock.expect(ctx.createFilter(EasyMock.isA(String.class))).andAnswer(new IAnswer<Filter>() {
- public Filter answer() throws Throwable {
- return FrameworkUtil.createFilter((String) EasyMock.getCurrentArguments()[0]);
- }
- }).anyTimes();
- }
-
- private void expectCreated(ZooKeeper zk, String path, byte[] dataMatcher) throws KeeperException, InterruptedException {
- expect(zk.create(EasyMock.eq(path),
- dataMatcher,
- EasyMock.eq(Ids.OPEN_ACL_UNSAFE),
- EasyMock.eq(CreateMode.EPHEMERAL)))
- .andReturn("");
- }
-
private void expectCreated(ZooKeeper zk, String path) throws KeeperException, InterruptedException {
expect(zk.create(EasyMock.eq(path),
(byte[])EasyMock.anyObject(),
[7/9] aries-rsa git commit: Add Awaitility for tests
Posted by cs...@apache.org.
Add Awaitility for tests
Project: http://git-wip-us.apache.org/repos/asf/aries-rsa/repo
Commit: http://git-wip-us.apache.org/repos/asf/aries-rsa/commit/20330373
Tree: http://git-wip-us.apache.org/repos/asf/aries-rsa/tree/20330373
Diff: http://git-wip-us.apache.org/repos/asf/aries-rsa/diff/20330373
Branch: refs/heads/master
Commit: 20330373407397e9bef73ebfcf4ca2f372a77e5b
Parents: ca922a4
Author: Christian Schneider <cs...@adobe.com>
Authored: Thu Feb 8 09:30:52 2018 +0100
Committer: Christian Schneider <cs...@adobe.com>
Committed: Thu Feb 8 17:40:34 2018 +0100
----------------------------------------------------------------------
parent/pom.xml | 11 +++++++++++
1 file changed, 11 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/20330373/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index 09e9cc7..af59c55 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -85,6 +85,11 @@
<artifactId>shazamcrest</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<dependencyManagement>
@@ -154,6 +159,12 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ <version>3.0.0</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>${zookeeper.version}</version>
[5/9] aries-rsa git commit: [ARIES-1774] Switch to watching all
endpoints in zookeeper to be able to pass tck
Posted by cs...@apache.org.
[ARIES-1774] Switch to watching all endpoints in zookeeper to be able to pass tck
Project: http://git-wip-us.apache.org/repos/asf/aries-rsa/repo
Commit: http://git-wip-us.apache.org/repos/asf/aries-rsa/commit/ca922a42
Tree: http://git-wip-us.apache.org/repos/asf/aries-rsa/tree/ca922a42
Diff: http://git-wip-us.apache.org/repos/asf/aries-rsa/diff/ca922a42
Branch: refs/heads/master
Commit: ca922a42e0705150d39c6f6955d647f7b1c3ad39
Parents: f07ee8b
Author: Christian Schneider <cs...@adobe.com>
Authored: Thu Feb 8 09:30:22 2018 +0100
Committer: Christian Schneider <cs...@adobe.com>
Committed: Thu Feb 8 17:40:34 2018 +0100
----------------------------------------------------------------------
.../discovery/zookeeper/ZooKeeperDiscovery.java | 3 +-
.../repository/ZookeeperEndpointRepository.java | 162 ++++++-----
.../subscribe/EndpointListenerTracker.java | 16 +-
.../zookeeper/subscribe/InterfaceMonitor.java | 266 -------------------
.../subscribe/InterfaceMonitorManager.java | 213 ++++-----------
.../ZookeeperEndpointRepositoryTest.java | 25 +-
.../subscribe/InterfaceMonitorManagerTest.java | 71 ++---
.../subscribe/InterfaceMonitorTest.java | 71 -----
8 files changed, 198 insertions(+), 629 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/ca922a42/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java
index d265a22..50d9598 100644
--- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java
@@ -98,7 +98,8 @@ public class ZooKeeperDiscovery implements Watcher, ManagedService {
repository = new ZookeeperEndpointRepository(zkClient);
endpointListener = new PublishingEndpointListener(repository);
endpointListener.start(bctx);
- imManager = new InterfaceMonitorManager(bctx, zkClient);
+ imManager = new InterfaceMonitorManager(repository);
+ repository.addListener(imManager);
endpointListenerTracker = new EndpointListenerTracker(bctx, imManager);
endpointListenerTracker.open();
started = true;
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/ca922a42/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepository.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepository.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepository.java
index 2349c45..c5c03a4 100644
--- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepository.java
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepository.java
@@ -15,6 +15,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.aries.rsa.discovery.endpoint.EndpointDescriptionParser;
+import org.apache.aries.rsa.discovery.zookeeper.subscribe.InterfaceMonitorManager;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
@@ -53,33 +54,11 @@ public class ZookeeperEndpointRepository implements Closeable, Watcher {
} catch (Exception e) {
throw new IllegalStateException("Unable to create base path");
}
- // Not yet needed
- //this.registerWatcher();
+ this.registerWatcher();
}
- private void registerWatcher() {
- try {
- List<String> children = zk.getChildren(ZookeeperEndpointRepository.PATH_PREFIX, this);
- System.out.println(children);
- } catch (KeeperException e) {
- e.printStackTrace();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
-
- protected void notifyListener(WatchedEvent wevent) {
- EndpointDescription ep = read(wevent.getPath());
- if (ep != null) {
- int type = getEndpointEventType(wevent);
- EndpointEvent event = new EndpointEvent(type, ep);
- listener.endpointChanged(event, null);
- }
- }
-
- private int getEndpointEventType(WatchedEvent wevent) {
- EventType type = wevent.getType();
- return EndpointEvent.ADDED;
+ public void addListener(EndpointEventListener listener) {
+ this.listener = listener;
}
/**
@@ -89,23 +68,7 @@ public class ZookeeperEndpointRepository implements Closeable, Watcher {
* @return endpoint found in the node or null if no endpoint was found
*/
public EndpointDescription read(String path) {
- try {
- Stat stat = zk.exists(path, false);
- if (stat == null || stat.getDataLength() <= 0) {
- return null;
- }
- byte[] data = zk.getData(path, false, null);
- LOG.debug("Got data for node: {}", path);
-
- EndpointDescription endpoint = parser.readEndpoint(new ByteArrayInputStream(data));
- if (endpoint != null) {
- return endpoint;
- }
- LOG.warn("No Discovery information found for node: {}", path);
- } catch (Exception e) {
- LOG.error("Problem getting EndpointDescription from node " + path, e);
- }
- return null;
+ return nodes.get(path);
}
public void add(EndpointDescription endpoint) throws URISyntaxException, KeeperException,
@@ -113,6 +76,8 @@ public class ZookeeperEndpointRepository implements Closeable, Watcher {
Collection<String> interfaces = endpoint.getInterfaces();
String endpointKey = getKey(endpoint);
+ createEphemeralNode(ZookeeperEndpointRepository.getZooKeeperPath("") + endpointKey, getData(endpoint));
+
LOG.info("Exporting endpoint to zookeeper: {}", endpoint);
for (String name : interfaces) {
String path = ZookeeperEndpointRepository.getZooKeeperPath(name);
@@ -152,8 +117,42 @@ public class ZookeeperEndpointRepository implements Closeable, Watcher {
}
}
- public List<EndpointDescription> getAll() throws KeeperException, InterruptedException {
- return null;
+ public Collection<EndpointDescription> getAll() {
+ return nodes.values();
+ }
+
+ /**
+ * Removes nulls and empty strings from the given string array.
+ *
+ * @param strings an array of strings
+ * @return a new array containing the non-null and non-empty
+ * elements of the original array in the same order
+ */
+ public static List<String> removeEmpty(List<String> strings) {
+ List<String> result = new ArrayList<String>();
+ if (strings == null) {
+ return result;
+ }
+ for (String s : strings) {
+ if (s != null && !s.isEmpty()) {
+ result.add(s);
+ }
+ }
+ return result;
+ }
+
+ public static String getZooKeeperPath(String name) {
+ return name == null || name.isEmpty() ? PATH_PREFIX : PATH_PREFIX + '/' + name.replace('.', '/');
+ }
+
+ @Override
+ public void process(WatchedEvent event) {
+ LOG.info("Received event {}", event);
+ if (event.getType() == EventType.NodeDeleted) {
+ handleRemoved(event.getPath());
+ return;
+ }
+ watchRecursive(event.getPath());
}
@Override
@@ -161,6 +160,28 @@ public class ZookeeperEndpointRepository implements Closeable, Watcher {
}
+ private void registerWatcher() {
+ try {
+ watchRecursive(ZookeeperEndpointRepository.PATH_PREFIX);
+ } catch (Exception e) {
+ LOG.info(e.getMessage(), e);
+ }
+ }
+
+ private void watchRecursive(String path) {
+ LOG.info("Watching {}", path);
+ handleZNodeChanged(path);
+ try {
+ List<String> children = zk.getChildren(path, this);
+ for (String child : children) {
+ String childPath = (path.endsWith("/") ? path : path + "/") + child;
+ watchRecursive(childPath);
+ }
+ } catch (Exception e) {
+ LOG.info(e.getMessage(), e);
+ }
+ }
+
private byte[] getData(EndpointDescription epd) {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
parser.writeEndpoint(epd, bos);
@@ -201,39 +222,38 @@ public class ZookeeperEndpointRepository implements Closeable, Watcher {
}
}
- /**
- * Removes nulls and empty strings from the given string array.
- *
- * @param strings an array of strings
- * @return a new array containing the non-null and non-empty
- * elements of the original array in the same order
- */
- public static List<String> removeEmpty(List<String> strings) {
- List<String> result = new ArrayList<String>();
- if (strings == null) {
- return result;
- }
- for (String s : strings) {
- if (s != null && !s.isEmpty()) {
- result.add(s);
- }
- }
- return result;
- }
-
- public static String getZooKeeperPath(String name) {
- return name == null || name.isEmpty() ? PATH_PREFIX : PATH_PREFIX + '/' + name.replace('.', '/');
- }
-
private static String getKey(EndpointDescription endpoint) throws URISyntaxException {
URI uri = new URI(endpoint.getId());
return new StringBuilder().append(uri.getHost()).append("#").append(uri.getPort())
.append("#").append(uri.getPath().replace('/', '#')).toString();
}
- @Override
- public void process(WatchedEvent event) {
-
+ private void handleZNodeChanged(String path) {
+ try {
+ Stat stat = new Stat();
+ byte[] data = zk.getData(path, false, stat);
+ if (data == null || data.length == 0) {
+ return;
+ }
+ EndpointDescription endpoint = parser.readEndpoint(new ByteArrayInputStream(data));
+ if (endpoint != null) {
+ handleChanged(path, endpoint);
+ }
+ } catch (Exception e) {
+ LOG.info(e.getMessage(), e);
+ }
+ }
+
+ private void handleRemoved(String path) {
+ EndpointDescription endpoint = nodes.remove(path);
+ EndpointEvent event = new EndpointEvent(EndpointEvent.REMOVED, endpoint);
+ listener.endpointChanged(event, null);
+ }
+
+ private void handleChanged(String path, EndpointDescription endpoint) {
+ EndpointDescription old = nodes.put(path, endpoint);
+ EndpointEvent event = new EndpointEvent(old == null ? EndpointEvent.ADDED : EndpointEvent.MODIFIED, endpoint);
+ listener.endpointChanged(event, null);
}
}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/ca922a42/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/EndpointListenerTracker.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/EndpointListenerTracker.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/EndpointListenerTracker.java
index 0ed1097..d4805d0 100644
--- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/EndpointListenerTracker.java
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/EndpointListenerTracker.java
@@ -51,21 +51,23 @@ public class EndpointListenerTracker extends ServiceTracker {
}
@Override
- public Object addingService(ServiceReference endpointListener) {
- imManager.addInterest(endpointListener);
- return null;
+ public Object addingService(ServiceReference sref) {
+ Object epListener = super.addingService(sref);
+ imManager.addInterest(sref, epListener);
+ return epListener;
}
@Override
- public void modifiedService(ServiceReference endpointListener, Object service) {
+ public void modifiedService(ServiceReference sref, Object epListener) {
// called when an EndpointListener updates its service properties,
// e.g. when its interest scope is expanded/reduced
- imManager.addInterest(endpointListener);
+ imManager.addInterest(sref, epListener);
}
@Override
- public void removedService(ServiceReference endpointListener, Object service) {
- imManager.removeInterest(endpointListener);
+ public void removedService(ServiceReference sref, Object epListener) {
+ imManager.removeInterest(sref);
+ super.removedService(sref, epListener);
}
}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/ca922a42/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitor.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitor.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitor.java
deleted file mode 100644
index 2c90b3c..0000000
--- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitor.java
+++ /dev/null
@@ -1,266 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.aries.rsa.discovery.zookeeper.subscribe;
-
-import java.io.ByteArrayInputStream;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.aries.rsa.discovery.endpoint.EndpointDescriptionParser;
-import org.apache.aries.rsa.discovery.zookeeper.repository.ZookeeperEndpointRepository;
-import org.apache.zookeeper.AsyncCallback.StatCallback;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.Code;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.Stat;
-import org.osgi.service.remoteserviceadmin.EndpointDescription;
-import org.osgi.service.remoteserviceadmin.EndpointEvent;
-import org.osgi.service.remoteserviceadmin.EndpointEventListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Monitors ZooKeeper for changes in published endpoints.
- * <p>
- * Specifically, it monitors the node path associated with a given interface class,
- * whose data is a serialized version of an EndpointDescription, and notifies an
- * EndpointListener when changes are detected (which can then propagate the
- * notification to other EndpointListeners with a matching scope).
- * <p>
- * Note that the EndpointListener is used here as a decoupling interface for
- * convenience, and is not necessarily used according to its documented contract.
- */
-public class InterfaceMonitor implements Watcher, StatCallback {
-
- private static final Logger LOG = LoggerFactory.getLogger(InterfaceMonitor.class);
-
- private final String znode;
- private final ZooKeeper zk;
- private final EndpointEventListener endpointListener;
- private final boolean recursive;
- private volatile boolean closed;
-
- // This map reference changes, so don't synchronize on it
- private Map<String, EndpointDescription> nodes = new HashMap<String, EndpointDescription>();
-
- private EndpointDescriptionParser parser;
-
- public InterfaceMonitor(ZooKeeper zk, String objClass, EndpointEventListener endpointListener, String scope) {
- this.zk = zk;
- this.znode = ZookeeperEndpointRepository.getZooKeeperPath(objClass);
- this.recursive = objClass == null || objClass.isEmpty();
- this.endpointListener = endpointListener;
- this.parser = new EndpointDescriptionParser();
- LOG.debug("Creating new InterfaceMonitor {} for scope [{}] and objectClass [{}]",
- new Object[] {recursive ? "(recursive)" : "", scope, objClass});
- }
-
- /**
- * Returns all endpoints that are currently known to this monitor.
- *
- * @return all endpoints that are currently known to this monitor
- */
- public synchronized List<EndpointDescription> getEndpoints() {
- return new ArrayList<EndpointDescription>(nodes.values());
- }
-
- public void start() {
- watch();
- }
-
- private void watch() {
- LOG.debug("registering a ZooKeeper.exists({}) callback", znode);
- zk.exists(znode, this, this, null);
- zk.getData(znode, this, new DataCallback() {
-
- @Override
- public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
- processDelta();
- }
- }, null);
- }
-
- /**
- * Zookeeper Watcher interface callback.
- */
- public void process(WatchedEvent event) {
- LOG.debug("ZooKeeper watcher callback on node {} for event {}", znode, event);
- processDelta();
- }
-
- /**
- * Zookeeper StatCallback interface callback.
- */
- @SuppressWarnings("deprecation")
- public void processResult(int rc, String path, Object ctx, Stat stat) {
- LOG.debug("ZooKeeper callback on node: {} code: {}", znode, rc);
-
- switch (rc) {
- case Code.Ok:
- case Code.NoNode:
- processDelta();
- return;
-
- case Code.SessionExpired:
- case Code.NoAuth:
- case Code.ConnectionLoss:
- return;
-
- default:
- watch();
- }
- }
-
- private void processDelta() {
- if (closed) {
- return;
- }
-
- if (zk.getState() != ZooKeeper.States.CONNECTED) {
- LOG.debug("ZooKeeper connection was already closed! Not processing changed event.");
- return;
- }
-
- try {
- if (zk.exists(znode, false) != null) {
- zk.getChildren(znode, this);
- refreshNodes();
- } else {
- LOG.debug("znode {} doesn't exist -> not processing any changes", znode);
- }
- } catch (Exception e) {
- if (zk.getState() != ZooKeeper.States.CONNECTED) {
- LOG.debug("Error getting Zookeeper data: " + e); // e.g. session expired, handled by ZooKeeperDiscovery
- } else {
- LOG.error("Error getting ZooKeeper data.", e);
- }
- }
- }
-
- public synchronized void close() {
- closed = true;
- for (EndpointDescription endpoint : nodes.values()) {
- EndpointEvent event = new EndpointEvent(EndpointEvent.REMOVED, endpoint);
- endpointListener.endpointChanged(event, null);
- }
- nodes.clear();
- }
-
- private synchronized void refreshNodes() {
- if (closed) {
- return;
- }
- LOG.info("Processing change on node: {}", znode);
-
- Map<String, EndpointDescription> newNodes = new HashMap<String, EndpointDescription>();
- Map<String, EndpointDescription> prevNodes = new HashMap<String, EndpointDescription>(nodes);
- processChildren(znode, newNodes, prevNodes);
-
- // whatever is left in prevNodes now has been removed from Discovery
- LOG.debug("processChildren done. Nodes that are missing now and need to be removed: {}", prevNodes.values());
- for (EndpointDescription endpoint : prevNodes.values()) {
- EndpointEvent event = new EndpointEvent(EndpointEvent.REMOVED, endpoint);
- endpointListener.endpointChanged(event, null);
- }
- nodes = newNodes;
- }
-
- /**
- * Iterates through all child nodes of the given node and tries to find
- * endpoints. If the recursive flag is set it also traverses into the child
- * nodes.
- *
- * @return true if an endpoint was found and if the node therefore needs to
- * be monitored for changes
- */
- private boolean processChildren(String zn, Map<String, EndpointDescription> newNodes,
- Map<String, EndpointDescription> prevNodes) {
- List<String> children;
- try {
- LOG.debug("Processing the children of {}", zn);
- children = zk.getChildren(zn, false);
-
- boolean foundANode = false;
- for (String child : children) {
- String childZNode = zn + '/' + child;
- EndpointDescription endpoint = getEndpointDescriptionFromNode(childZNode);
- if (endpoint != null) {
- EndpointDescription prevEndpoint = prevNodes.get(child);
-
- newNodes.put(child, endpoint);
- prevNodes.remove(child);
- foundANode = true;
- LOG.debug("Properties: {}", endpoint.getProperties());
- if (prevEndpoint == null) {
- // This guy is new
- LOG.info("found new node " + zn + "/[" + child + "] ( []->child ) props: "
- + endpoint.getProperties().values());
- EndpointEvent event = new EndpointEvent(EndpointEvent.ADDED, endpoint);
- endpointListener.endpointChanged(event, null);
- } else if (!prevEndpoint.getProperties().equals(endpoint.getProperties())) {
- LOG.info("Found changed node " + zn + "/[" + child + "] ( []->child ) props: "
- + endpoint.getProperties().values());
- EndpointEvent event = new EndpointEvent(EndpointEvent.MODIFIED, endpoint);
- endpointListener.endpointChanged(event, null);
- }
- }
- if (recursive && processChildren(childZNode, newNodes, prevNodes)) {
- zk.getChildren(childZNode, this);
- }
- }
-
- return foundANode;
- } catch (KeeperException e) {
- LOG.error("Problem processing ZooKeeper node", e);
- } catch (InterruptedException e) {
- LOG.error("Problem processing ZooKeeper node", e);
- }
- return false;
- }
-
- /**
- * Retrieves data from the given node and parses it into an EndpointDescription.
- *
- * @param node a node path
- * @return endpoint found in the node or null if no endpoint was found
- */
- private EndpointDescription getEndpointDescriptionFromNode(String node) {
- try {
- Stat stat = zk.exists(node, false);
- if (stat == null || stat.getDataLength() <= 0) {
- return null;
- }
- byte[] data = zk.getData(node, false, null);
- LOG.debug("Got data for node: {}", node);
-
- EndpointDescription endpoint = parser.readEndpoint(new ByteArrayInputStream(data));
- if (endpoint != null) {
- return endpoint;
- }
- LOG.warn("No Discovery information found for node: {}", node);
- } catch (Exception e) {
- LOG.error("Problem getting EndpointDescription from node " + node, e);
- }
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/ca922a42/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManager.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManager.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManager.java
index 0aa98b3..7b5c7d2 100644
--- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManager.java
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManager.java
@@ -18,23 +18,13 @@
*/
package org.apache.aries.rsa.discovery.zookeeper.subscribe;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Dictionary;
import java.util.HashMap;
-import java.util.Hashtable;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
import org.apache.aries.rsa.discovery.zookeeper.ZooKeeperDiscovery;
import org.apache.aries.rsa.discovery.zookeeper.repository.ZookeeperEndpointRepository;
import org.apache.aries.rsa.util.StringPlus;
-import org.apache.zookeeper.ZooKeeper;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.Filter;
import org.osgi.framework.ServiceReference;
import org.osgi.service.remoteserviceadmin.EndpointDescription;
import org.osgi.service.remoteserviceadmin.EndpointEvent;
@@ -50,165 +40,95 @@ import org.slf4j.LoggerFactory;
* These events are then forwarded to all interested EndpointEventListeners.
*/
@SuppressWarnings({"deprecation", "rawtypes"})
-public class InterfaceMonitorManager {
+public class InterfaceMonitorManager implements EndpointEventListener {
private static final Logger LOG = LoggerFactory.getLogger(InterfaceMonitorManager.class);
- private static final Pattern OBJECTCLASS_PATTERN = Pattern.compile(".*\\(objectClass=([^)]+)\\).*");
- private final BundleContext bctx;
- private final ZooKeeper zk;
- // map of EndpointEventListeners and the scopes they are interested in
- private final Map<ServiceReference, List<String>> epListenerScopes =
- new HashMap<ServiceReference, List<String>>();
- // map of scopes and their interest data
- private final Map<String, Interest> interests = new HashMap<String, Interest>();
+ private final ZookeeperEndpointRepository repository;
+ private final Map<ServiceReference, Interest> interests = new HashMap<ServiceReference, Interest>();
protected static class Interest {
- List<ServiceReference> epListeners =
- new CopyOnWriteArrayList<ServiceReference>();
- InterfaceMonitor monitor;
+ List<String> scopes;
+ Object epListener;
}
- public InterfaceMonitorManager(BundleContext bctx, ZooKeeper zk) {
- this.bctx = bctx;
- this.zk = zk;
+ public InterfaceMonitorManager(ZookeeperEndpointRepository repository) {
+ this.repository = repository;
}
- public void addInterest(ServiceReference<?> eplistener) {
- if (isOurOwnEndpointEventListener(eplistener)) {
+ public void addInterest(ServiceReference<?> sref, Object epListener) {
+ if (isOurOwnEndpointEventListener(sref)) {
LOG.debug("Skipping our own EndpointEventListener");
return;
}
- List<String> scopes = getScopes(eplistener);
+ List<String> scopes = getScopes(sref);
LOG.debug("adding Interests: {}", scopes);
- for (String scope : scopes) {
- String objClass = getObjectClass(scope);
- addInterest(eplistener, scope, objClass);
- }
- }
-
- private static boolean isOurOwnEndpointEventListener(ServiceReference<?> EndpointEventListener) {
- return Boolean.parseBoolean(String.valueOf(
- EndpointEventListener.getProperty(ZooKeeperDiscovery.DISCOVERY_ZOOKEEPER_ID)));
- }
-
- public synchronized void addInterest(ServiceReference epListener,
- String scope, String objClass) {
// get or create interest for given scope and add listener to it
- Interest interest = interests.get(scope);
+ Interest interest = interests.get(epListener);
if (interest == null) {
// create interest, add listener and start monitor
interest = new Interest();
- interests.put(scope, interest);
- interest.epListeners.add(epListener); // add it before monitor starts so we don't miss events
- interest.monitor = createInterfaceMonitor(scope, objClass, interest);
- interest.monitor.start();
- } else {
- // interest already exists, so just add listener to it
- if (!interest.epListeners.contains(epListener)) {
- interest.epListeners.add(epListener);
- }
- // notify listener of all known endpoints for given scope
- // (as EndpointEventListener contract requires of all added/modified listeners)
- for (EndpointDescription endpoint : interest.monitor.getEndpoints()) {
- EndpointEvent event = new EndpointEvent(EndpointEvent.ADDED, endpoint);
- notifyListeners(event, scope, Arrays.asList(epListener));
- }
- }
-
- // add scope to listener's scopes list
- List<String> scopes = epListenerScopes.get(epListener);
- if (scopes == null) {
- scopes = new ArrayList<String>(1);
- epListenerScopes.put(epListener, scopes);
- }
- if (!scopes.contains(scope)) {
- scopes.add(scope);
+ interest.epListener = epListener;
+ interest.scopes = scopes;
+ interests.put(sref, interest);
+ sendExistingEndpoints(scopes, epListener);
}
}
- public synchronized void removeInterest(ServiceReference<EndpointEventListener> EndpointEventListener) {
- LOG.info("removing EndpointEventListener interests: {}", EndpointEventListener);
- List<String> scopes = epListenerScopes.get(EndpointEventListener);
- if (scopes == null) {
- return;
+ private void sendExistingEndpoints(List<String> scopes, Object epListener) {
+ for (EndpointDescription endpoint : repository.getAll()) {
+ EndpointEvent event = new EndpointEvent(EndpointEvent.ADDED, endpoint);
+ notifyListener(event, scopes, epListener);
}
+ }
- for (String scope : scopes) {
- Interest interest = interests.get(scope);
- if (interest != null) {
- interest.epListeners.remove(EndpointEventListener);
- if (interest.epListeners.isEmpty()) {
- interest.monitor.close();
- interests.remove(scope);
- }
- }
- }
- epListenerScopes.remove(EndpointEventListener);
+ private static boolean isOurOwnEndpointEventListener(ServiceReference<?> EndpointEventListener) {
+ return Boolean.parseBoolean(String.valueOf(
+ EndpointEventListener.getProperty(ZooKeeperDiscovery.DISCOVERY_ZOOKEEPER_ID)));
}
- protected InterfaceMonitor createInterfaceMonitor(final String scope, String objClass, final Interest interest) {
- // holding this object's lock in the callbacks can lead to a deadlock with InterfaceMonitor
- EndpointEventListener listener = new EndpointEventListener() {
+ public synchronized void removeInterest(ServiceReference<EndpointEventListener> epListenerRef) {
+ LOG.info("removing EndpointEventListener interests: {}", epListenerRef);
+ interests.remove(epListenerRef);
+ }
- @Override
- public void endpointChanged(EndpointEvent event, String filter) {
- notifyListeners(event, scope, interest.epListeners);
- }
- };
- return new InterfaceMonitor(zk, objClass, listener, scope);
+ @Override
+ public void endpointChanged(EndpointEvent event, String filter) {
+ for (Interest interest : interests.values()) {
+ notifyListener(event, interest.scopes, interest.epListener);
+ }
}
- private void notifyListeners(EndpointEvent event, String currentScope,
- List<ServiceReference> epListeners) {
+ private void notifyListener(EndpointEvent event, List<String> scopes, Object service) {
EndpointDescription endpoint = event.getEndpoint();
- for (ServiceReference<?> epListenerRef : epListeners) {
- if (epListenerRef.getBundle() == null) {
- LOG.info("listening service was unregistered, ignoring");
- }
- Object service = bctx.getService(epListenerRef);
- LOG.trace("matching {} against {}", endpoint, currentScope);
- if (matchFilter(bctx, currentScope, endpoint)) {
- LOG.debug("Matched {} against {}", endpoint, currentScope);
- try {
- if (service instanceof EndpointEventListener) {
- EndpointEventListener epeListener = (EndpointEventListener)service;
- notifyListener(event, currentScope, epeListener);
- } else if (service instanceof EndpointListener) {
- EndpointListener epListener = (EndpointListener)service;
- notifyListener(event, currentScope, epListener);
- }
- } finally {
- if (service != null) {
- bctx.ungetService(epListenerRef);
- }
- }
- }
+ String currentScope = getFirstMatch(scopes, endpoint);
+ if (currentScope == null) {
+ return;
}
- }
-
- private static boolean matchFilter(BundleContext bctx, String filter, EndpointDescription endpoint) {
- if (filter == null) {
- return false;
+ LOG.debug("Matched {} against {}", endpoint, currentScope);
+ if (service instanceof EndpointEventListener) {
+ notifyEEListener(event, currentScope, (EndpointEventListener)service);
+ } else if (service instanceof EndpointListener) {
+ notifyEListener(event, currentScope, (EndpointListener)service);
}
+ }
- try {
- Filter f = bctx.createFilter(filter);
- Dictionary<String, Object> dict = new Hashtable<String, Object>(endpoint.getProperties());
- return f.match(dict);
- } catch (Exception e) {
- return false;
+ private String getFirstMatch(List<String> scopes, EndpointDescription endpoint) {
+ for (String scope : scopes) {
+ if (endpoint.matches(scope)) {
+ return scope;
+ }
}
+ return null;
}
-
- private void notifyListener(EndpointEvent event, String currentScope, EndpointEventListener listener) {
+ private void notifyEEListener(EndpointEvent event, String currentScope, EndpointEventListener listener) {
EndpointDescription endpoint = event.getEndpoint();
LOG.info("Calling endpointchanged on class {} for filter {}, type {}, endpoint {} ", listener, currentScope, endpoint);
listener.endpointChanged(event, currentScope);
}
- private void notifyListener(EndpointEvent event, String currentScope, EndpointListener listener) {
+ private void notifyEListener(EndpointEvent event, String currentScope, EndpointListener listener) {
EndpointDescription endpoint = event.getEndpoint();
LOG.info("Calling old listener on class {} for filter {}, type {}, endpoint {} ", listener, currentScope, endpoint);
switch (event.getType()) {
@@ -228,49 +148,18 @@ public class InterfaceMonitorManager {
}
public synchronized void close() {
- for (Interest interest : interests.values()) {
- interest.monitor.close();
- }
interests.clear();
- epListenerScopes.clear();
}
/**
* Only for test case!
*/
- protected synchronized Map<String, Interest> getInterests() {
+ protected synchronized Map<ServiceReference, Interest> getInterests() {
return interests;
}
- /**
- * Only for test case!
- */
- protected synchronized Map<ServiceReference, List<String>> getEndpointListenerScopes() {
- return epListenerScopes;
- }
-
protected List<String> getScopes(ServiceReference<?> sref) {
return StringPlus.normalize(sref.getProperty(EndpointEventListener.ENDPOINT_LISTENER_SCOPE));
}
-
- public static String getObjectClass(String scope) {
- Matcher m = OBJECTCLASS_PATTERN.matcher(scope);
- return m.matches() ? m.group(1) : null;
- }
- /**
- * Returns a service's properties as a map.
- *
- * @param serviceReference a service reference
- * @return the service's properties as a map
- */
- public static Map<String, Object> getProperties(ServiceReference<?> serviceReference) {
- String[] keys = serviceReference.getPropertyKeys();
- Map<String, Object> props = new HashMap<String, Object>(keys.length);
- for (String key : keys) {
- Object val = serviceReference.getProperty(key);
- props.put(key, val);
- }
- return props;
- }
}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/ca922a42/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepositoryTest.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepositoryTest.java b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepositoryTest.java
index 3a20f5a..d9f23e6 100644
--- a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepositoryTest.java
+++ b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepositoryTest.java
@@ -1,15 +1,22 @@
package org.apache.aries.rsa.discovery.zookeeper.repository;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.samePropertyValuesAs;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.URISyntaxException;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
@@ -32,6 +39,7 @@ public class ZookeeperEndpointRepositoryTest {
private ZooKeeperServer server;
private ZooKeeper zk;
private ServerCnxnFactory factory;
+ private List<EndpointEvent> events = new ArrayList<>();
@Before
public void before() throws IOException, InterruptedException, KeeperException {
@@ -62,25 +70,38 @@ public class ZookeeperEndpointRepositoryTest {
@After
public void after() throws InterruptedException {
- zk.close();
+ //zk.close(); // Seems to cause SessionTimeout error
factory.shutdown();
}
@Test
public void test() throws IOException, URISyntaxException, KeeperException, InterruptedException {
+ final Semaphore sem = new Semaphore(0);
EndpointEventListener listener = new EndpointEventListener() {
@Override
public void endpointChanged(EndpointEvent event, String filter) {
- System.out.println(event);
+ events.add(event);
+ sem.release();
}
};
ZookeeperEndpointRepository repository = new ZookeeperEndpointRepository(zk, listener);
+
EndpointDescription endpoint = createEndpoint();
repository.add(endpoint);
+ assertThat(sem.tryAcquire(1000, TimeUnit.SECONDS), equalTo(true));
+
String path = "/osgi/service_registry/java/lang/Runnable/test.de#-1##service1";
EndpointDescription ep2 = repository.read(path);
+ assertNotNull(ep2);
+
+ repository.remove(endpoint);
+
+ assertThat(sem.tryAcquire(1000, TimeUnit.SECONDS), equalTo(true));
+ assertThat(events.get(0), samePropertyValuesAs(new EndpointEvent(EndpointEvent.ADDED, endpoint)));
+ assertThat(events.get(1), samePropertyValuesAs(new EndpointEvent(EndpointEvent.REMOVED, endpoint)));
+
repository.close();
}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/ca922a42/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManagerTest.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManagerTest.java b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManagerTest.java
index 84eca09..41b0795 100644
--- a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManagerTest.java
+++ b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManagerTest.java
@@ -18,95 +18,68 @@
*/
package org.apache.aries.rsa.discovery.zookeeper.subscribe;
-import static org.easymock.EasyMock.getCurrentArguments;
+import static org.easymock.EasyMock.expect;
import static org.junit.Assert.assertEquals;
-import java.util.Collections;
-import java.util.Dictionary;
-import java.util.Hashtable;
+import java.util.ArrayList;
import java.util.List;
-import org.apache.zookeeper.ZooKeeper;
+import org.apache.aries.rsa.discovery.zookeeper.ZooKeeperDiscovery;
+import org.apache.aries.rsa.discovery.zookeeper.repository.ZookeeperEndpointRepository;
import org.easymock.EasyMock;
-import org.easymock.IAnswer;
import org.easymock.IMocksControl;
import org.junit.Test;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.Constants;
import org.osgi.framework.ServiceReference;
+import org.osgi.service.remoteserviceadmin.EndpointDescription;
import org.osgi.service.remoteserviceadmin.EndpointEventListener;
public class InterfaceMonitorManagerTest {
@Test
public void testEndpointListenerTrackerCustomizer() {
- IMocksControl c = EasyMock.createNiceControl();
- BundleContext ctx = c.createMock(BundleContext.class);
- ServiceReference<EndpointEventListener> sref = createService(c, "(objectClass=mine)", "mine");
- ServiceReference<EndpointEventListener> sref2 = createService(c, "(objectClass=mine)", "mine");
- ZooKeeper zk = c.createMock(ZooKeeper.class);
- InterfaceMonitorManager eltc = new InterfaceMonitorManager(ctx, zk);
+ IMocksControl c = EasyMock.createControl();
+ ServiceReference<EndpointEventListener> sref = createService(c, "(objectClass=mine)");
+ ServiceReference<EndpointEventListener> sref2 = createService(c, "(objectClass=mine)");
+ ZookeeperEndpointRepository repository = c.createMock(ZookeeperEndpointRepository.class);
+ List<EndpointDescription> endpoints = new ArrayList<>();
+ expect(repository.getAll()).andReturn(endpoints).atLeastOnce();
+ EndpointEventListener epListener1 = c.createMock(EndpointEventListener.class);
+ EndpointEventListener epListener2 = c.createMock(EndpointEventListener.class);
c.replay();
+ InterfaceMonitorManager eltc = new InterfaceMonitorManager(repository);
// sref has no scope -> nothing should happen
- assertEquals(0, eltc.getEndpointListenerScopes().size());
assertEquals(0, eltc.getInterests().size());
- eltc.addInterest(sref);
- assertScopeIncludes(sref, eltc);
- assertEquals(1, eltc.getEndpointListenerScopes().size());
- assertEquals(1, eltc.getInterests().size());
- eltc.addInterest(sref);
- assertScopeIncludes(sref, eltc);
- assertEquals(1, eltc.getEndpointListenerScopes().size());
+ eltc.addInterest(sref, epListener1);
assertEquals(1, eltc.getInterests().size());
- eltc.addInterest(sref2);
- assertScopeIncludes(sref, eltc);
- assertScopeIncludes(sref2, eltc);
- assertEquals(2, eltc.getEndpointListenerScopes().size());
+ eltc.addInterest(sref, epListener1);
assertEquals(1, eltc.getInterests().size());
+ eltc.addInterest(sref2, epListener2);
+ assertEquals(2, eltc.getInterests().size());
+
eltc.removeInterest(sref);
- assertScopeIncludes(sref2, eltc);
- assertEquals(1, eltc.getEndpointListenerScopes().size());
assertEquals(1, eltc.getInterests().size());
eltc.removeInterest(sref);
- assertScopeIncludes(sref2, eltc);
- assertEquals(1, eltc.getEndpointListenerScopes().size());
assertEquals(1, eltc.getInterests().size());
eltc.removeInterest(sref2);
- assertEquals(0, eltc.getEndpointListenerScopes().size());
assertEquals(0, eltc.getInterests().size());
c.verify();
}
@SuppressWarnings("unchecked")
- private ServiceReference<EndpointEventListener> createService(IMocksControl c, String scope, String objectClass) {
+ private ServiceReference<EndpointEventListener> createService(IMocksControl c, String scope) {
ServiceReference<EndpointEventListener> sref = c.createMock(ServiceReference.class);
- final Dictionary<String, String> props = new Hashtable<>();
- props.put(EndpointEventListener.ENDPOINT_LISTENER_SCOPE, scope);
- props.put(Constants.OBJECTCLASS, objectClass);
- String[] keys = Collections.list(props.keys()).toArray(new String[]{});
- EasyMock.expect(sref.getPropertyKeys()).andReturn(keys).anyTimes();
- EasyMock.expect(sref.getProperty((String)EasyMock.anyObject())).andAnswer(new IAnswer<Object>() {
- public Object answer() throws Throwable {
- return props.get(getCurrentArguments()[0]);
- }
- }).anyTimes();
+ expect(sref.getProperty(EndpointEventListener.ENDPOINT_LISTENER_SCOPE)).andReturn(scope).atLeastOnce();
+ expect(sref.getProperty(ZooKeeperDiscovery.DISCOVERY_ZOOKEEPER_ID)).andReturn(null).atLeastOnce();
return sref;
}
- private void assertScopeIncludes(ServiceReference<EndpointEventListener> sref, InterfaceMonitorManager imm) {
- List<String> srefScope = imm.getEndpointListenerScopes().get(sref);
- assertEquals(1, srefScope.size());
- assertEquals("(objectClass=mine)", srefScope.get(0));
-
- }
-
}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/ca922a42/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorTest.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorTest.java b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorTest.java
deleted file mode 100644
index e09cfbf..0000000
--- a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorTest.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.aries.rsa.discovery.zookeeper.subscribe;
-
-import static org.easymock.EasyMock.eq;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
-
-import java.util.Collections;
-
-import org.apache.aries.rsa.discovery.zookeeper.repository.ZookeeperEndpointRepository;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher.Event.EventType;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.AsyncCallback.DataCallback;
-import org.apache.zookeeper.data.Stat;
-import org.easymock.EasyMock;
-import org.easymock.IMocksControl;
-import org.osgi.service.remoteserviceadmin.EndpointEventListener;
-
-import junit.framework.TestCase;
-
-public class InterfaceMonitorTest extends TestCase {
-
- public void testInterfaceMonitor() throws KeeperException, InterruptedException {
- IMocksControl c = EasyMock.createControl();
-
- ZooKeeper zk = c.createMock(ZooKeeper.class);
- expect(zk.getState()).andReturn(ZooKeeper.States.CONNECTED).anyTimes();
-
- String scope = "(myProp=test)";
- String interf = "es.schaaf.test";
- String node = ZookeeperEndpointRepository.getZooKeeperPath(interf);
-
- EndpointEventListener endpointListener = c.createMock(EndpointEventListener.class);
- InterfaceMonitor im = new InterfaceMonitor(zk, interf, endpointListener, scope);
- zk.exists(eq(node), eq(im), eq(im), EasyMock.anyObject());
- EasyMock.expectLastCall().once();
- zk.getData(eq(node), eq(im), EasyMock.anyObject(DataCallback.class), EasyMock.anyObject());
- expectLastCall();
-
- expect(zk.exists(eq(node), eq(false))).andReturn(new Stat()).anyTimes();
- expect(zk.getChildren(eq(node), eq(false))).andReturn(Collections.<String> emptyList()).once();
- expect(zk.getChildren(eq(node), eq(im))).andReturn(Collections.<String> emptyList()).once();
-
- c.replay();
- im.start();
- // simulate a zk callback
- WatchedEvent we = new WatchedEvent(EventType.NodeCreated, KeeperState.SyncConnected, node);
- im.process(we);
- c.verify();
- }
-}
[4/9] aries-rsa git commit: [ARIES-1774] Remove
PublishingEndpointListenerFactory
Posted by cs...@apache.org.
[ARIES-1774] Remove PublishingEndpointListenerFactory
Project: http://git-wip-us.apache.org/repos/asf/aries-rsa/repo
Commit: http://git-wip-us.apache.org/repos/asf/aries-rsa/commit/ed5adf8f
Tree: http://git-wip-us.apache.org/repos/asf/aries-rsa/tree/ed5adf8f
Diff: http://git-wip-us.apache.org/repos/asf/aries-rsa/diff/ed5adf8f
Branch: refs/heads/master
Commit: ed5adf8fc57ea2f10d0d73d97a47bdaa013b1b2f
Parents: 44943bc
Author: Christian Schneider <cs...@adobe.com>
Authored: Wed Feb 7 10:45:30 2018 +0100
Committer: Christian Schneider <cs...@adobe.com>
Committed: Thu Feb 8 17:40:34 2018 +0100
----------------------------------------------------------------------
.../discovery/zookeeper/ZooKeeperDiscovery.java | 13 ++-
.../publish/PublishingEndpointListener.java | 42 +++++--
.../PublishingEndpointListenerFactory.java | 110 -------------------
.../PublishingEndpointListenerFactoryTest.java | 103 -----------------
4 files changed, 41 insertions(+), 227 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/ed5adf8f/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java
index 13dadad..c8b020d 100644
--- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java
@@ -25,7 +25,7 @@ import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
-import org.apache.aries.rsa.discovery.zookeeper.publish.PublishingEndpointListenerFactory;
+import org.apache.aries.rsa.discovery.zookeeper.publish.PublishingEndpointListener;
import org.apache.aries.rsa.discovery.zookeeper.subscribe.EndpointListenerTracker;
import org.apache.aries.rsa.discovery.zookeeper.subscribe.InterfaceMonitorManager;
import org.apache.zookeeper.WatchedEvent;
@@ -46,7 +46,7 @@ public class ZooKeeperDiscovery implements Watcher, ManagedService {
private final BundleContext bctx;
- private PublishingEndpointListenerFactory endpointListenerFactory;
+ private PublishingEndpointListener endpointListener;
private ServiceTracker<?, ?> endpointListenerTracker;
private InterfaceMonitorManager imManager;
private ZooKeeper zkClient;
@@ -92,8 +92,8 @@ public class ZooKeeperDiscovery implements Watcher, ManagedService {
return;
}
LOG.debug("starting ZookeeperDiscovery");
- endpointListenerFactory = new PublishingEndpointListenerFactory(zkClient, bctx);
- endpointListenerFactory.start();
+ endpointListener = new PublishingEndpointListener(zkClient, bctx);
+ endpointListener.start(bctx);
imManager = new InterfaceMonitorManager(bctx, zkClient);
endpointListenerTracker = new EndpointListenerTracker(bctx, imManager);
endpointListenerTracker.open();
@@ -106,8 +106,9 @@ public class ZooKeeperDiscovery implements Watcher, ManagedService {
}
started = false;
closed |= close;
- if (endpointListenerFactory != null) {
- endpointListenerFactory.stop();
+ if (endpointListener != null) {
+ endpointListener.stop();
+ endpointListener.close();
}
if (endpointListenerTracker != null) {
endpointListenerTracker.close();
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/ed5adf8f/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListener.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListener.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListener.java
index d5fe7a6..6dc0a08 100644
--- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListener.java
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListener.java
@@ -26,11 +26,14 @@ import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Dictionary;
import java.util.HashMap;
+import java.util.Hashtable;
import java.util.List;
import java.util.Map;
import org.apache.aries.rsa.discovery.endpoint.EndpointDescriptionParser;
+import org.apache.aries.rsa.discovery.zookeeper.ZooKeeperDiscovery;
import org.apache.aries.rsa.discovery.zookeeper.util.Utils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
@@ -38,12 +41,14 @@ import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.Stat;
import org.osgi.framework.BundleContext;
+import org.osgi.framework.Constants;
+import org.osgi.framework.ServiceRegistration;
import org.osgi.service.remoteserviceadmin.EndpointDescription;
import org.osgi.service.remoteserviceadmin.EndpointEvent;
import org.osgi.service.remoteserviceadmin.EndpointEventListener;
import org.osgi.service.remoteserviceadmin.EndpointListener;
+import org.osgi.service.remoteserviceadmin.RemoteConstants;
import org.osgi.util.tracker.ServiceTracker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,19 +62,38 @@ public class PublishingEndpointListener implements EndpointEventListener, Endpoi
private static final Logger LOG = LoggerFactory.getLogger(PublishingEndpointListener.class);
private final ZooKeeper zk;
- private final ServiceTracker<DiscoveryPlugin, DiscoveryPlugin> discoveryPluginTracker;
private final List<EndpointDescription> endpoints = new ArrayList<EndpointDescription>();
private boolean closed;
-
private final EndpointDescriptionParser endpointDescriptionParser;
+ private ServiceTracker<DiscoveryPlugin, DiscoveryPlugin> discoveryPluginTracker;
+ private ServiceRegistration<?> listenerReg;
+
public PublishingEndpointListener(ZooKeeper zk, BundleContext bctx) {
this.zk = zk;
- discoveryPluginTracker = new ServiceTracker<DiscoveryPlugin, DiscoveryPlugin>(bctx,
- DiscoveryPlugin.class, null);
- discoveryPluginTracker.open();
endpointDescriptionParser = new EndpointDescriptionParser();
}
+
+ public void start(BundleContext bctx) {
+ discoveryPluginTracker = new ServiceTracker<DiscoveryPlugin, DiscoveryPlugin>(bctx,
+ DiscoveryPlugin.class, null);
+ discoveryPluginTracker.open();
+ Dictionary<String, String> props = new Hashtable<String, String>();
+ String uuid = bctx.getProperty(Constants.FRAMEWORK_UUID);
+ props.put(EndpointEventListener.ENDPOINT_LISTENER_SCOPE,
+ String.format("(&(%s=*)(%s=%s))", Constants.OBJECTCLASS,
+ RemoteConstants.ENDPOINT_FRAMEWORK_UUID, uuid));
+ props.put(ZooKeeperDiscovery.DISCOVERY_ZOOKEEPER_ID, "true");
+ String[] ifAr = {EndpointEventListener.class.getName(), EndpointListener.class.getName()};
+ listenerReg = bctx.registerService(ifAr, this, props);
+ }
+
+ public void stop() {
+ if (listenerReg != null) {
+ listenerReg.unregister();
+ listenerReg = null;
+ }
+ }
@Override
public void endpointChanged(EndpointEvent event, String filter) {
@@ -152,7 +176,7 @@ public class PublishingEndpointListener implements EndpointEventListener, Endpoi
Map<String, Object> props = new HashMap<String, Object>(endpoint.getProperties());
// process plugins
- Object[] plugins = discoveryPluginTracker.getServices();
+ Object[] plugins = discoveryPluginTracker != null ? discoveryPluginTracker.getServices() : null;
if (plugins != null) {
for (Object plugin : plugins) {
if (plugin instanceof DiscoveryPlugin) {
@@ -259,6 +283,8 @@ public class PublishingEndpointListener implements EndpointEventListener, Endpoi
}
endpoints.clear();
}
- discoveryPluginTracker.close();
+ if (discoveryPluginTracker != null) {
+ discoveryPluginTracker.close();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/ed5adf8f/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerFactory.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerFactory.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerFactory.java
deleted file mode 100644
index 444f7bb..0000000
--- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerFactory.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.aries.rsa.discovery.zookeeper.publish;
-
-import java.util.ArrayList;
-import java.util.Dictionary;
-import java.util.Hashtable;
-import java.util.List;
-
-import org.apache.aries.rsa.discovery.zookeeper.ZooKeeperDiscovery;
-import org.apache.zookeeper.ZooKeeper;
-import org.osgi.framework.Bundle;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.Constants;
-import org.osgi.framework.ServiceFactory;
-import org.osgi.framework.ServiceRegistration;
-import org.osgi.service.remoteserviceadmin.EndpointEventListener;
-import org.osgi.service.remoteserviceadmin.EndpointListener;
-import org.osgi.service.remoteserviceadmin.RemoteConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Creates local EndpointListeners that publish to ZooKeeper.
- */
-@SuppressWarnings("deprecation")
-public class PublishingEndpointListenerFactory implements ServiceFactory<PublishingEndpointListener> {
-
- private static final Logger LOG = LoggerFactory.getLogger(PublishingEndpointListenerFactory.class);
-
- private final BundleContext bctx;
- private final ZooKeeper zk;
- private final List<PublishingEndpointListener> listeners = new ArrayList<PublishingEndpointListener>();
- private ServiceRegistration<?> serviceRegistration;
-
- public PublishingEndpointListenerFactory(ZooKeeper zk, BundleContext bctx) {
- this.bctx = bctx;
- this.zk = zk;
- }
-
- public PublishingEndpointListener getService(Bundle b, ServiceRegistration<PublishingEndpointListener> sr) {
- LOG.debug("new EndpointListener from factory");
- synchronized (listeners) {
- PublishingEndpointListener pel = new PublishingEndpointListener(zk, bctx);
- listeners.add(pel);
- return pel;
- }
- }
-
- public void ungetService(Bundle b, ServiceRegistration<PublishingEndpointListener> sr,
- PublishingEndpointListener pel) {
- LOG.debug("remove EndpointListener");
- synchronized (listeners) {
- if (listeners.remove(pel)) {
- pel.close();
- }
- }
- }
-
- public synchronized void start() {
- Dictionary<String, String> props = new Hashtable<String, String>();
- String uuid = bctx.getProperty(Constants.FRAMEWORK_UUID);
- props.put(EndpointEventListener.ENDPOINT_LISTENER_SCOPE,
- String.format("(&(%s=*)(%s=%s))", Constants.OBJECTCLASS,
- RemoteConstants.ENDPOINT_FRAMEWORK_UUID, uuid));
- props.put(ZooKeeperDiscovery.DISCOVERY_ZOOKEEPER_ID, "true");
- String[] ifAr = {EndpointEventListener.class.getName(), EndpointListener.class.getName()};
- serviceRegistration = bctx.registerService(ifAr, this, props);
- }
-
- public synchronized void stop() {
- if (serviceRegistration != null) {
- serviceRegistration.unregister();
- serviceRegistration = null;
- }
- synchronized (listeners) {
- for (PublishingEndpointListener pel : listeners) {
- pel.close();
- }
- listeners.clear();
- }
- }
-
- /**
- * Only for the test case!
- *
- * @return
- */
- protected List<PublishingEndpointListener> getListeners() {
- synchronized (listeners) {
- return listeners;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/ed5adf8f/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerFactoryTest.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerFactoryTest.java b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerFactoryTest.java
deleted file mode 100644
index 381ec9d..0000000
--- a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerFactoryTest.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.aries.rsa.discovery.zookeeper.publish;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.util.Dictionary;
-import java.util.List;
-
-import org.apache.zookeeper.ZooKeeper;
-import org.easymock.EasyMock;
-import org.easymock.IMocksControl;
-import org.junit.Before;
-import org.junit.Test;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.ServiceRegistration;
-import org.osgi.service.remoteserviceadmin.EndpointEventListener;
-import org.osgi.service.remoteserviceadmin.EndpointListener;
-
-@SuppressWarnings("deprecation")
-public class PublishingEndpointListenerFactoryTest {
-
- private IMocksControl c;
- private BundleContext ctx;
- private ZooKeeper zk;
-
- @Before
- public void before() {
- c = EasyMock.createNiceControl();
- zk = c.createMock(ZooKeeper.class);
- ctx = createBundleContext();
- }
-
- @Test
- public void testScope() {
- PublishingEndpointListenerFactory eplf = new PublishingEndpointListenerFactory(zk, ctx);
-
- c.replay();
- eplf.start();
- c.verify();
-
- }
-
- @Test
- public void testServiceFactory() {
- PublishingEndpointListenerFactory eplf = new PublishingEndpointListenerFactory(zk, ctx);
-
- PublishingEndpointListener eli = c.createMock(PublishingEndpointListener.class);
- eli.close();
- EasyMock.expectLastCall().once();
-
- c.replay();
- eplf.start();
-
- PublishingEndpointListener service = eplf.getService(null, null);
- assertTrue(service instanceof EndpointEventListener);
-
- List<PublishingEndpointListener> listeners = eplf.getListeners();
- assertEquals(1, listeners.size());
- assertEquals(service, listeners.get(0));
-
- eplf.ungetService(null, null, service);
- listeners = eplf.getListeners();
- assertEquals(0, listeners.size());
-
- eplf.ungetService(null, null, eli); // no call to close
- listeners.add(eli);
- eplf.ungetService(null, null, eli); // call to close
- listeners = eplf.getListeners();
- assertEquals(0, listeners.size());
-
- c.verify();
- }
-
- @SuppressWarnings({ "rawtypes", "unchecked" })
- private BundleContext createBundleContext() {
- BundleContext ctx = c.createMock(BundleContext.class);
- ServiceRegistration sreg = c.createMock(ServiceRegistration.class);
- String[] ifAr = {EndpointEventListener.class.getName(), EndpointListener.class.getName()};
- EasyMock.expect(ctx.registerService(EasyMock.aryEq(ifAr), EasyMock.anyObject(),
- (Dictionary<String, String>)EasyMock.anyObject())).andReturn(sreg).once();
-
- EasyMock.expect(ctx.getProperty(EasyMock.eq("org.osgi.framework.uuid"))).andReturn("myUUID").anyTimes();
- return ctx;
- }
-}
[3/9] aries-rsa git commit: [ARIES-1774] Centralize Zookeeper logic
in ZookeeperEndpointRepository
Posted by cs...@apache.org.
[ARIES-1774] Centralize Zookeeper logic in ZookeeperEndpointRepository
Project: http://git-wip-us.apache.org/repos/asf/aries-rsa/repo
Commit: http://git-wip-us.apache.org/repos/asf/aries-rsa/commit/f07ee8b5
Tree: http://git-wip-us.apache.org/repos/asf/aries-rsa/tree/f07ee8b5
Diff: http://git-wip-us.apache.org/repos/asf/aries-rsa/diff/f07ee8b5
Branch: refs/heads/master
Commit: f07ee8b59e226579a55c3463329abf912a2ae291
Parents: fa57fb2
Author: Christian Schneider <cs...@adobe.com>
Authored: Wed Feb 7 16:52:07 2018 +0100
Committer: Christian Schneider <cs...@adobe.com>
Committed: Thu Feb 8 17:40:34 2018 +0100
----------------------------------------------------------------------
.../discovery/zookeeper/ZooKeeperDiscovery.java | 7 +-
.../publish/PublishingEndpointListener.java | 178 ++------------
.../repository/ZookeeperEndpointRepository.java | 239 +++++++++++++++++++
.../zookeeper/subscribe/InterfaceMonitor.java | 4 +-
.../subscribe/InterfaceMonitorManager.java | 4 +-
.../rsa/discovery/zookeeper/util/Utils.java | 57 -----
.../publish/PublishingEndpointListenerTest.java | 23 +-
.../ZookeeperEndpointRepositoryTest.java | 116 +++++++++
.../subscribe/InterfaceMonitorTest.java | 4 +-
.../rsa/discovery/zookeeper/util/UtilsTest.java | 37 ---
10 files changed, 383 insertions(+), 286 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/f07ee8b5/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java
index c8b020d..d265a22 100644
--- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java
@@ -26,6 +26,7 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.aries.rsa.discovery.zookeeper.publish.PublishingEndpointListener;
+import org.apache.aries.rsa.discovery.zookeeper.repository.ZookeeperEndpointRepository;
import org.apache.aries.rsa.discovery.zookeeper.subscribe.EndpointListenerTracker;
import org.apache.aries.rsa.discovery.zookeeper.subscribe.InterfaceMonitorManager;
import org.apache.zookeeper.WatchedEvent;
@@ -55,6 +56,8 @@ public class ZooKeeperDiscovery implements Watcher, ManagedService {
private Dictionary<String, ?> curConfiguration;
+ private ZookeeperEndpointRepository repository;
+
public ZooKeeperDiscovery(BundleContext bctx) {
this.bctx = bctx;
}
@@ -92,7 +95,8 @@ public class ZooKeeperDiscovery implements Watcher, ManagedService {
return;
}
LOG.debug("starting ZookeeperDiscovery");
- endpointListener = new PublishingEndpointListener(zkClient, bctx);
+ repository = new ZookeeperEndpointRepository(zkClient);
+ endpointListener = new PublishingEndpointListener(repository);
endpointListener.start(bctx);
imManager = new InterfaceMonitorManager(bctx, zkClient);
endpointListenerTracker = new EndpointListenerTracker(bctx, imManager);
@@ -108,7 +112,6 @@ public class ZooKeeperDiscovery implements Watcher, ManagedService {
closed |= close;
if (endpointListener != null) {
endpointListener.stop();
- endpointListener.close();
}
if (endpointListenerTracker != null) {
endpointListenerTracker.close();
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/f07ee8b5/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListener.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListener.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListener.java
index c3bf01f..ceef6b0 100644
--- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListener.java
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListener.java
@@ -18,29 +18,11 @@
*/
package org.apache.aries.rsa.discovery.zookeeper.publish;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
import java.util.Dictionary;
-import java.util.HashMap;
import java.util.Hashtable;
-import java.util.List;
-import java.util.Map;
-import org.apache.aries.rsa.discovery.endpoint.EndpointDescriptionParser;
import org.apache.aries.rsa.discovery.zookeeper.ZooKeeperDiscovery;
-import org.apache.aries.rsa.discovery.zookeeper.util.Utils;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.NoNodeException;
-import org.apache.zookeeper.KeeperException.NodeExistsException;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.ZooKeeper;
+import org.apache.aries.rsa.discovery.zookeeper.repository.ZookeeperEndpointRepository;
import org.osgi.framework.BundleContext;
import org.osgi.framework.Constants;
import org.osgi.framework.ServiceRegistration;
@@ -49,28 +31,23 @@ import org.osgi.service.remoteserviceadmin.EndpointEvent;
import org.osgi.service.remoteserviceadmin.EndpointEventListener;
import org.osgi.service.remoteserviceadmin.EndpointListener;
import org.osgi.service.remoteserviceadmin.RemoteConstants;
-import org.osgi.util.tracker.ServiceTracker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Listens for local Endpoints and publishes them to ZooKeeper.
+ * Listens for local EndpointEvents using old and new style listeners and publishes changes to
+ * the ZooKeeperEndpointRepository
*/
@SuppressWarnings("deprecation")
public class PublishingEndpointListener implements EndpointEventListener, EndpointListener {
private static final Logger LOG = LoggerFactory.getLogger(PublishingEndpointListener.class);
- private final ZooKeeper zk;
- private final List<EndpointDescription> endpoints = new ArrayList<EndpointDescription>();
- private boolean closed;
- private final EndpointDescriptionParser endpointDescriptionParser;
-
private ServiceRegistration<?> listenerReg;
+ private ZookeeperEndpointRepository repository;
- public PublishingEndpointListener(ZooKeeper zk, BundleContext bctx) {
- this.zk = zk;
- endpointDescriptionParser = new EndpointDescriptionParser();
+ public PublishingEndpointListener(ZookeeperEndpointRepository repository) {
+ this.repository = repository;
}
public void start(BundleContext bctx) {
@@ -109,155 +86,28 @@ public class PublishingEndpointListener implements EndpointEventListener, Endpoi
private void endpointModified(EndpointDescription endpoint, String filter) {
try {
- modifyEndpoint(endpoint);
+ repository.modify(endpoint);
} catch (Exception e) {
LOG.error("Error modifying endpoint data in zookeeper for endpoint {}", endpoint.getId(), e);
}
}
- private void modifyEndpoint(EndpointDescription endpoint) throws URISyntaxException, KeeperException, InterruptedException {
- Collection<String> interfaces = endpoint.getInterfaces();
- String endpointKey = getKey(endpoint);
- Map<String, Object> props = new HashMap<String, Object>(endpoint.getProperties());
-
- LOG.info("Changing endpoint in zookeeper: {}", endpoint);
- for (String name : interfaces) {
- String path = Utils.getZooKeeperPath(name);
- String fullPath = path + '/' + endpointKey;
- LOG.info("Changing ZooKeeper node for service with path {}", fullPath);
- createPath(path, zk);
- zk.setData(fullPath, getData(endpoint), -1);
- }
- }
-
@Override
public void endpointAdded(EndpointDescription endpoint, String matchedFilter) {
- synchronized (endpoints) {
- if (closed) {
- return;
- }
- if (endpoints.contains(endpoint)) {
- // TODO -> Should the published endpoint be updated here?
- return;
- }
-
- try {
- addEndpoint(endpoint);
- endpoints.add(endpoint);
- } catch (Exception ex) {
- LOG.error("Exception while processing the addition of an endpoint.", ex);
- }
- }
- }
-
- private byte[] getData(EndpointDescription epd) {
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- endpointDescriptionParser.writeEndpoint(epd, bos);
- return bos.toByteArray();
- }
-
- private void addEndpoint(EndpointDescription endpoint) throws URISyntaxException, KeeperException,
- InterruptedException, IOException {
- Collection<String> interfaces = endpoint.getInterfaces();
- String endpointKey = getKey(endpoint);
- LOG.info("Exporting endpoint to zookeeper: {}", endpoint);
- for (String name : interfaces) {
- String path = Utils.getZooKeeperPath(name);
- String fullPath = path + '/' + endpointKey;
- LOG.info("Creating ZooKeeper node for service with path {}", fullPath);
- createPath(path, zk);
- createEphemeralNode(fullPath, getData(endpoint));
- }
- }
-
- private void createEphemeralNode(String fullPath, byte[] data) throws KeeperException, InterruptedException {
try {
- zk.create(fullPath, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
- } catch (NodeExistsException nee) {
- // this sometimes happens after a ZooKeeper node dies and the ephemeral node
- // that belonged to the old session was not yet deleted. We need to make our
- // session the owner of the node so it won't get deleted automatically -
- // we do this by deleting and recreating it ourselves.
- LOG.info("node for endpoint already exists, recreating: {}", fullPath);
- try {
- zk.delete(fullPath, -1);
- } catch (NoNodeException nne) {
- // it's a race condition, but as long as it got deleted - it's ok
- }
- zk.create(fullPath, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+ repository.add(endpoint);
+ } catch (Exception ex) {
+ LOG.error("Exception while processing the addition of an endpoint.", ex);
}
}
@Override
public void endpointRemoved(EndpointDescription endpoint, String matchedFilter) {
- LOG.info("Local EndpointDescription removed: {}", endpoint);
-
- synchronized (endpoints) {
- if (closed) {
- return;
- }
- if (!endpoints.contains(endpoint)) {
- return;
- }
-
- try {
- removeEndpoint(endpoint);
- endpoints.remove(endpoint);
- } catch (Exception ex) {
- LOG.error("Exception while processing the removal of an endpoint", ex);
- }
- }
- }
-
- private void removeEndpoint(EndpointDescription endpoint) throws UnknownHostException, URISyntaxException {
- Collection<String> interfaces = endpoint.getInterfaces();
- String endpointKey = getKey(endpoint);
- for (String name : interfaces) {
- String path = Utils.getZooKeeperPath(name);
- String fullPath = path + '/' + endpointKey;
- LOG.debug("Removing ZooKeeper node: {}", fullPath);
- try {
- zk.delete(fullPath, -1);
- } catch (Exception ex) {
- LOG.debug("Error while removing endpoint: {}", ex); // e.g. session expired
- }
- }
- }
-
- private static void createPath(String path, ZooKeeper zk) throws KeeperException, InterruptedException {
- StringBuilder current = new StringBuilder();
- List<String> parts = Utils.removeEmpty(Arrays.asList(path.split("/")));
- for (String part : parts) {
- current.append('/');
- current.append(part);
- try {
- if (zk.exists(current.toString(), false) == null) {
- zk.create(current.toString(), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- }
- } catch (NodeExistsException nee) {
- // it's not the first node with this path to ever exist - that's normal
- }
+ try {
+ repository.remove(endpoint);
+ } catch (Exception ex) {
+ LOG.error("Exception while processing the removal of an endpoint", ex);
}
}
- private static String getKey(EndpointDescription endpoint) throws URISyntaxException {
- URI uri = new URI(endpoint.getId());
- return new StringBuilder().append(uri.getHost()).append("#").append(uri.getPort())
- .append("#").append(uri.getPath().replace('/', '#')).toString();
- }
-
- public void close() {
- LOG.debug("closing - removing all endpoints");
- synchronized (endpoints) {
- closed = true;
- for (EndpointDescription endpoint : endpoints) {
- try {
- removeEndpoint(endpoint);
- } catch (Exception ex) {
- LOG.error("Exception while removing endpoint during close", ex);
- }
- }
- endpoints.clear();
- }
- }
}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/f07ee8b5/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepository.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepository.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepository.java
new file mode 100644
index 0000000..2349c45
--- /dev/null
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepository.java
@@ -0,0 +1,239 @@
+package org.apache.aries.rsa.discovery.zookeeper.repository;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.aries.rsa.discovery.endpoint.EndpointDescriptionParser;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+import org.osgi.service.remoteserviceadmin.EndpointDescription;
+import org.osgi.service.remoteserviceadmin.EndpointEvent;
+import org.osgi.service.remoteserviceadmin.EndpointEventListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ZookeeperEndpointRepository implements Closeable, Watcher {
+ private static final Logger LOG = LoggerFactory.getLogger(ZookeeperEndpointRepository.class);
+ private final ZooKeeper zk;
+ private final EndpointDescriptionParser parser;
+ private EndpointEventListener listener;
+ public static final String PATH_PREFIX = "/osgi/service_registry";
+
+ private Map<String, EndpointDescription> nodes = new ConcurrentHashMap<String, EndpointDescription>();
+
+ public ZookeeperEndpointRepository(ZooKeeper zk) {
+ this(zk, null);
+ }
+
+ public ZookeeperEndpointRepository(ZooKeeper zk, EndpointEventListener listener) {
+ this.zk = zk;
+ this.listener = listener;
+ this.parser = new EndpointDescriptionParser();
+ try {
+ createPath(PATH_PREFIX);
+ } catch (Exception e) {
+ throw new IllegalStateException("Unable to create base path");
+ }
+ // Not yet needed
+ //this.registerWatcher();
+ }
+
+ private void registerWatcher() {
+ try {
+ List<String> children = zk.getChildren(ZookeeperEndpointRepository.PATH_PREFIX, this);
+ System.out.println(children);
+ } catch (KeeperException e) {
+ e.printStackTrace();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ protected void notifyListener(WatchedEvent wevent) {
+ EndpointDescription ep = read(wevent.getPath());
+ if (ep != null) {
+ int type = getEndpointEventType(wevent);
+ EndpointEvent event = new EndpointEvent(type, ep);
+ listener.endpointChanged(event, null);
+ }
+ }
+
+ private int getEndpointEventType(WatchedEvent wevent) {
+ EventType type = wevent.getType();
+ return EndpointEvent.ADDED;
+ }
+
+ /**
+ * Retrieves data from the given node and parses it into an EndpointDescription.
+ *
+ * @param path a node path
+ * @return endpoint found in the node or null if no endpoint was found
+ */
+ public EndpointDescription read(String path) {
+ try {
+ Stat stat = zk.exists(path, false);
+ if (stat == null || stat.getDataLength() <= 0) {
+ return null;
+ }
+ byte[] data = zk.getData(path, false, null);
+ LOG.debug("Got data for node: {}", path);
+
+ EndpointDescription endpoint = parser.readEndpoint(new ByteArrayInputStream(data));
+ if (endpoint != null) {
+ return endpoint;
+ }
+ LOG.warn("No Discovery information found for node: {}", path);
+ } catch (Exception e) {
+ LOG.error("Problem getting EndpointDescription from node " + path, e);
+ }
+ return null;
+ }
+
+ public void add(EndpointDescription endpoint) throws URISyntaxException, KeeperException,
+ InterruptedException, IOException {
+ Collection<String> interfaces = endpoint.getInterfaces();
+ String endpointKey = getKey(endpoint);
+
+ LOG.info("Exporting endpoint to zookeeper: {}", endpoint);
+ for (String name : interfaces) {
+ String path = ZookeeperEndpointRepository.getZooKeeperPath(name);
+ String fullPath = path + '/' + endpointKey;
+ LOG.info("Creating ZooKeeper node for service with path {}", fullPath);
+ createPath(path);
+ createEphemeralNode(fullPath, getData(endpoint));
+ }
+ }
+
+ public void modify(EndpointDescription endpoint) throws URISyntaxException, KeeperException, InterruptedException {
+ Collection<String> interfaces = endpoint.getInterfaces();
+ String endpointKey = getKey(endpoint);
+
+ LOG.info("Changing endpoint in zookeeper: {}", endpoint);
+ for (String name : interfaces) {
+ String path = ZookeeperEndpointRepository.getZooKeeperPath(name);
+ String fullPath = path + '/' + endpointKey;
+ LOG.info("Changing ZooKeeper node for service with path {}", fullPath);
+ createPath(path);
+ zk.setData(fullPath, getData(endpoint), -1);
+ }
+ }
+
+ public void remove(EndpointDescription endpoint) throws UnknownHostException, URISyntaxException {
+ Collection<String> interfaces = endpoint.getInterfaces();
+ String endpointKey = getKey(endpoint);
+ for (String name : interfaces) {
+ String path = ZookeeperEndpointRepository.getZooKeeperPath(name);
+ String fullPath = path + '/' + endpointKey;
+ LOG.debug("Removing ZooKeeper node: {}", fullPath);
+ try {
+ zk.delete(fullPath, -1);
+ } catch (Exception ex) {
+ LOG.debug("Error while removing endpoint: {}", ex); // e.g. session expired
+ }
+ }
+ }
+
+ public List<EndpointDescription> getAll() throws KeeperException, InterruptedException {
+ return null;
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+
+ private byte[] getData(EndpointDescription epd) {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ parser.writeEndpoint(epd, bos);
+ return bos.toByteArray();
+ }
+
+ private void createEphemeralNode(String fullPath, byte[] data) throws KeeperException, InterruptedException {
+ try {
+ zk.create(fullPath, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+ } catch (NodeExistsException nee) {
+ // this sometimes happens after a ZooKeeper node dies and the ephemeral node
+ // that belonged to the old session was not yet deleted. We need to make our
+ // session the owner of the node so it won't get deleted automatically -
+ // we do this by deleting and recreating it ourselves.
+ LOG.info("node for endpoint already exists, recreating: {}", fullPath);
+ try {
+ zk.delete(fullPath, -1);
+ } catch (NoNodeException nne) {
+ // it's a race condition, but as long as it got deleted - it's ok
+ }
+ zk.create(fullPath, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+ }
+ }
+
+ private void createPath(String path) throws KeeperException, InterruptedException {
+ StringBuilder current = new StringBuilder();
+ List<String> parts = ZookeeperEndpointRepository.removeEmpty(Arrays.asList(path.split("/")));
+ for (String part : parts) {
+ current.append('/');
+ current.append(part);
+ try {
+ if (zk.exists(current.toString(), false) == null) {
+ zk.create(current.toString(), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ }
+ } catch (NodeExistsException nee) {
+ // it's not the first node with this path to ever exist - that's normal
+ }
+ }
+ }
+
+ /**
+ * Removes nulls and empty strings from the given string array.
+ *
+ * @param strings an array of strings
+ * @return a new array containing the non-null and non-empty
+ * elements of the original array in the same order
+ */
+ public static List<String> removeEmpty(List<String> strings) {
+ List<String> result = new ArrayList<String>();
+ if (strings == null) {
+ return result;
+ }
+ for (String s : strings) {
+ if (s != null && !s.isEmpty()) {
+ result.add(s);
+ }
+ }
+ return result;
+ }
+
+ public static String getZooKeeperPath(String name) {
+ return name == null || name.isEmpty() ? PATH_PREFIX : PATH_PREFIX + '/' + name.replace('.', '/');
+ }
+
+ private static String getKey(EndpointDescription endpoint) throws URISyntaxException {
+ URI uri = new URI(endpoint.getId());
+ return new StringBuilder().append(uri.getHost()).append("#").append(uri.getPort())
+ .append("#").append(uri.getPath().replace('/', '#')).toString();
+ }
+
+ @Override
+ public void process(WatchedEvent event) {
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/f07ee8b5/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitor.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitor.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitor.java
index 7078bb8..2c90b3c 100644
--- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitor.java
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitor.java
@@ -25,7 +25,7 @@ import java.util.List;
import java.util.Map;
import org.apache.aries.rsa.discovery.endpoint.EndpointDescriptionParser;
-import org.apache.aries.rsa.discovery.zookeeper.util.Utils;
+import org.apache.aries.rsa.discovery.zookeeper.repository.ZookeeperEndpointRepository;
import org.apache.zookeeper.AsyncCallback.StatCallback;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code;
@@ -67,7 +67,7 @@ public class InterfaceMonitor implements Watcher, StatCallback {
public InterfaceMonitor(ZooKeeper zk, String objClass, EndpointEventListener endpointListener, String scope) {
this.zk = zk;
- this.znode = Utils.getZooKeeperPath(objClass);
+ this.znode = ZookeeperEndpointRepository.getZooKeeperPath(objClass);
this.recursive = objClass == null || objClass.isEmpty();
this.endpointListener = endpointListener;
this.parser = new EndpointDescriptionParser();
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/f07ee8b5/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManager.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManager.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManager.java
index 7d6e4ae..0aa98b3 100644
--- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManager.java
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManager.java
@@ -30,7 +30,7 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.aries.rsa.discovery.zookeeper.ZooKeeperDiscovery;
-import org.apache.aries.rsa.discovery.zookeeper.util.Utils;
+import org.apache.aries.rsa.discovery.zookeeper.repository.ZookeeperEndpointRepository;
import org.apache.aries.rsa.util.StringPlus;
import org.apache.zookeeper.ZooKeeper;
import org.osgi.framework.BundleContext;
@@ -250,7 +250,7 @@ public class InterfaceMonitorManager {
}
protected List<String> getScopes(ServiceReference<?> sref) {
- return Utils.removeEmpty(StringPlus.normalize(sref.getProperty(EndpointEventListener.ENDPOINT_LISTENER_SCOPE)));
+ return StringPlus.normalize(sref.getProperty(EndpointEventListener.ENDPOINT_LISTENER_SCOPE));
}
public static String getObjectClass(String scope) {
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/f07ee8b5/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/util/Utils.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/util/Utils.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/util/Utils.java
deleted file mode 100644
index 289ae32..0000000
--- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/util/Utils.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.aries.rsa.discovery.zookeeper.util;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public final class Utils {
-
- static final String PATH_PREFIX = "/osgi/service_registry";
-
- private Utils() {
- // never constructed
- }
-
- public static String getZooKeeperPath(String name) {
- return name == null || name.isEmpty() ? PATH_PREFIX : PATH_PREFIX + '/' + name.replace('.', '/');
- }
-
- /**
- * Removes nulls and empty strings from the given string array.
- *
- * @param strings an array of strings
- * @return a new array containing the non-null and non-empty
- * elements of the original array in the same order
- */
- public static List<String> removeEmpty(List<String> strings) {
- List<String> result = new ArrayList<String>();
- if (strings == null) {
- return result;
- }
- for (String s : strings) {
- if (s != null && !s.isEmpty()) {
- result.add(s);
- }
- }
- return result;
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/f07ee8b5/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerTest.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerTest.java b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerTest.java
index b7debf6..a61cf76 100644
--- a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerTest.java
+++ b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerTest.java
@@ -23,13 +23,13 @@ import static org.easymock.EasyMock.expect;
import java.util.HashMap;
import java.util.Map;
+import org.apache.aries.rsa.discovery.zookeeper.repository.ZookeeperEndpointRepository;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.easymock.EasyMock;
import org.easymock.IMocksControl;
-import org.osgi.framework.BundleContext;
import org.osgi.framework.Constants;
import org.osgi.service.remoteserviceadmin.EndpointDescription;
import org.osgi.service.remoteserviceadmin.EndpointEvent;
@@ -44,7 +44,6 @@ public class PublishingEndpointListenerTest extends TestCase {
public void testEndpointRemovalAdding() throws KeeperException, InterruptedException {
IMocksControl c = EasyMock.createNiceControl();
- BundleContext ctx = c.createMock(BundleContext.class);
ZooKeeper zk = c.createMock(ZooKeeper.class);
String path = ENDPOINT_PATH;
@@ -53,7 +52,8 @@ public class PublishingEndpointListenerTest extends TestCase {
c.replay();
- PublishingEndpointListener eli = new PublishingEndpointListener(zk, ctx);
+ ZookeeperEndpointRepository repository = new ZookeeperEndpointRepository(zk);
+ PublishingEndpointListener eli = new PublishingEndpointListener(repository);
EndpointDescription endpoint = createEndpoint();
eli.endpointChanged(new EndpointEvent(EndpointEvent.ADDED, endpoint), null);
eli.endpointChanged(new EndpointEvent(EndpointEvent.ADDED, endpoint), null); // should do nothing
@@ -63,23 +63,6 @@ public class PublishingEndpointListenerTest extends TestCase {
c.verify();
}
- public void testClose() throws KeeperException, InterruptedException {
- IMocksControl c = EasyMock.createNiceControl();
- BundleContext ctx = c.createMock(BundleContext.class);
- ZooKeeper zk = c.createMock(ZooKeeper.class);
- expectCreated(zk, ENDPOINT_PATH);
- expectDeleted(zk, ENDPOINT_PATH);
-
- c.replay();
-
- PublishingEndpointListener eli = new PublishingEndpointListener(zk, ctx);
- EndpointDescription endpoint = createEndpoint();
- eli.endpointChanged(new EndpointEvent(EndpointEvent.ADDED, endpoint), null);
- eli.close(); // should result in zk.delete(...)
-
- c.verify();
- }
-
private void expectCreated(ZooKeeper zk, String path) throws KeeperException, InterruptedException {
expect(zk.create(EasyMock.eq(path),
(byte[])EasyMock.anyObject(),
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/f07ee8b5/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepositoryTest.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepositoryTest.java b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepositoryTest.java
new file mode 100644
index 0000000..3a20f5a
--- /dev/null
+++ b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepositoryTest.java
@@ -0,0 +1,116 @@
+package org.apache.aries.rsa.discovery.zookeeper.repository;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.URISyntaxException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.osgi.framework.Constants;
+import org.osgi.service.remoteserviceadmin.EndpointDescription;
+import org.osgi.service.remoteserviceadmin.EndpointEvent;
+import org.osgi.service.remoteserviceadmin.EndpointEventListener;
+import org.osgi.service.remoteserviceadmin.RemoteConstants;
+
+public class ZookeeperEndpointRepositoryTest {
+
+ private ZooKeeperServer server;
+ private ZooKeeper zk;
+ private ServerCnxnFactory factory;
+
+ @Before
+ public void before() throws IOException, InterruptedException, KeeperException {
+ File target = new File("target");
+ File zookeeperDir = new File(target, "zookeeper");
+ server = new ZooKeeperServer(zookeeperDir, zookeeperDir, 2000);
+ factory = new NIOServerCnxnFactory();
+ int clientPort = getClientPort();
+ factory.configure(new InetSocketAddress(clientPort), 10);
+ factory.startup(server);
+ Watcher watcher = new Watcher() {
+
+ @Override
+ public void process(WatchedEvent event) {
+ System.out.println(event);
+ }
+
+ };
+ zk = new ZooKeeper("localhost:" + server.getClientPort(), 1000, watcher);
+ printNodes("/");
+ }
+
+ private int getClientPort() throws IOException {
+ try (ServerSocket serverSocket = new ServerSocket(0)) {
+ return serverSocket.getLocalPort();
+ }
+ }
+
+ @After
+ public void after() throws InterruptedException {
+ zk.close();
+ factory.shutdown();
+ }
+
+ @Test
+ public void test() throws IOException, URISyntaxException, KeeperException, InterruptedException {
+ EndpointEventListener listener = new EndpointEventListener() {
+
+ @Override
+ public void endpointChanged(EndpointEvent event, String filter) {
+ System.out.println(event);
+ }
+ };
+ ZookeeperEndpointRepository repository = new ZookeeperEndpointRepository(zk, listener);
+ EndpointDescription endpoint = createEndpoint();
+ repository.add(endpoint);
+
+ String path = "/osgi/service_registry/java/lang/Runnable/test.de#-1##service1";
+ EndpointDescription ep2 = repository.read(path);
+ repository.close();
+ }
+
+ @Test
+ public void testGetZooKeeperPath() {
+ assertEquals(ZookeeperEndpointRepository.PATH_PREFIX + '/' + "org/example/Test",
+ ZookeeperEndpointRepository.getZooKeeperPath("org.example.Test"));
+
+ // used for the recursive discovery
+ assertEquals(ZookeeperEndpointRepository.PATH_PREFIX, ZookeeperEndpointRepository.getZooKeeperPath(null));
+ assertEquals(ZookeeperEndpointRepository.PATH_PREFIX, ZookeeperEndpointRepository.getZooKeeperPath(""));
+ }
+
+ private EndpointDescription createEndpoint() {
+ Map<String, Object> props = new HashMap<>();
+ props.put(Constants.OBJECTCLASS, new String[] {Runnable.class.getName()});
+ props.put(RemoteConstants.ENDPOINT_ID, "http://test.de/service1");
+ props.put(RemoteConstants.SERVICE_IMPORTED_CONFIGS, "my");
+
+ EndpointDescription endpoint = new EndpointDescription(props);
+ return endpoint;
+ }
+
+ public void printNodes(String path) throws KeeperException, InterruptedException {
+ List<String> children = zk.getChildren(path, false);
+ for (String child : children) {
+ String newPath = path.endsWith("/") ? path : path + "/";
+ String fullPath = newPath + child;
+ System.out.println(fullPath);
+ printNodes(fullPath);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/f07ee8b5/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorTest.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorTest.java b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorTest.java
index 53ddbc4..e09cfbf 100644
--- a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorTest.java
+++ b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorTest.java
@@ -24,7 +24,7 @@ import static org.easymock.EasyMock.expectLastCall;
import java.util.Collections;
-import org.apache.aries.rsa.discovery.zookeeper.util.Utils;
+import org.apache.aries.rsa.discovery.zookeeper.repository.ZookeeperEndpointRepository;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher.Event.EventType;
@@ -48,7 +48,7 @@ public class InterfaceMonitorTest extends TestCase {
String scope = "(myProp=test)";
String interf = "es.schaaf.test";
- String node = Utils.getZooKeeperPath(interf);
+ String node = ZookeeperEndpointRepository.getZooKeeperPath(interf);
EndpointEventListener endpointListener = c.createMock(EndpointEventListener.class);
InterfaceMonitor im = new InterfaceMonitor(zk, interf, endpointListener, scope);
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/f07ee8b5/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/util/UtilsTest.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/util/UtilsTest.java b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/util/UtilsTest.java
deleted file mode 100644
index 4d41fb0..0000000
--- a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/util/UtilsTest.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.aries.rsa.discovery.zookeeper.util;
-
-import org.apache.aries.rsa.discovery.zookeeper.util.Utils;
-
-import junit.framework.TestCase;
-
-public class UtilsTest extends TestCase {
-
- public void testGetZooKeeperPath() {
- assertEquals(Utils.PATH_PREFIX + '/' + "org/example/Test",
- Utils.getZooKeeperPath("org.example.Test"));
-
- // used for the recursive discovery
- assertEquals(Utils.PATH_PREFIX, Utils.getZooKeeperPath(null));
- assertEquals(Utils.PATH_PREFIX, Utils.getZooKeeperPath(""));
- }
-
-
-}
[8/9] aries-rsa git commit: [ARIES-1769] Fixing wrong intent names
and exception in timeout case
Posted by cs...@apache.org.
[ARIES-1769] Fixing wrong intent names and exception in timeout case
Project: http://git-wip-us.apache.org/repos/asf/aries-rsa/repo
Commit: http://git-wip-us.apache.org/repos/asf/aries-rsa/commit/9a08b235
Tree: http://git-wip-us.apache.org/repos/asf/aries-rsa/tree/9a08b235
Diff: http://git-wip-us.apache.org/repos/asf/aries-rsa/diff/9a08b235
Branch: refs/heads/master
Commit: 9a08b23533b4b085b10129d8e06d07b677a6c8d9
Parents: 362b68d
Author: Christian Schneider <cs...@adobe.com>
Authored: Thu Feb 8 17:31:10 2018 +0100
Committer: Christian Schneider <cs...@adobe.com>
Committed: Thu Feb 8 17:40:34 2018 +0100
----------------------------------------------------------------------
.../org/apache/aries/rsa/provider/tcp/TCPProvider.java | 2 +-
.../org/apache/aries/rsa/provider/tcp/TcpEndpoint.java | 2 +-
.../aries/rsa/provider/tcp/TcpProviderIntentTest.java | 12 ++++++++++++
.../apache/aries/rsa/provider/tcp/TcpProviderTest.java | 8 ++++++--
4 files changed, 20 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/9a08b235/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TCPProvider.java
----------------------------------------------------------------------
diff --git a/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TCPProvider.java b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TCPProvider.java
index bceb063..6d3b6ad 100644
--- a/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TCPProvider.java
+++ b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TCPProvider.java
@@ -40,7 +40,7 @@ import org.slf4j.LoggerFactory;
@SuppressWarnings("rawtypes")
public class TCPProvider implements DistributionProvider {
static final String TCP_CONFIG_TYPE = "aries.tcp";
- private static final String[] SUPPORTED_INTENTS = { "osgi.basic", "osgi.sync"};
+ private static final String[] SUPPORTED_INTENTS = { "osgi.basic", "osgi.async"};
private Logger logger = LoggerFactory.getLogger(TCPProvider.class);
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/9a08b235/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpEndpoint.java
----------------------------------------------------------------------
diff --git a/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpEndpoint.java b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpEndpoint.java
index fc207c3..00d06f2 100644
--- a/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpEndpoint.java
+++ b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpEndpoint.java
@@ -45,7 +45,7 @@ public class TcpEndpoint implements Endpoint {
String endpointId = String.format("tcp://%s:%s",hostName, tcpServer.getPort());
effectiveProperties.put(RemoteConstants.ENDPOINT_ID, endpointId);
effectiveProperties.put(RemoteConstants.SERVICE_EXPORTED_CONFIGS, "");
- effectiveProperties.put(RemoteConstants.SERVICE_INTENTS, Arrays.asList("osgi.basic, osgi.async"));
+ effectiveProperties.put(RemoteConstants.SERVICE_INTENTS, Arrays.asList("osgi.basic", "osgi.async"));
// tck tests for one such property ... so we provide it
effectiveProperties.put(TCPProvider.TCP_CONFIG_TYPE + ".id", endpointId);
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/9a08b235/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderIntentTest.java
----------------------------------------------------------------------
diff --git a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderIntentTest.java b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderIntentTest.java
index c339ca6..1d7928c 100644
--- a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderIntentTest.java
+++ b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderIntentTest.java
@@ -18,8 +18,10 @@
*/
package org.apache.aries.rsa.provider.tcp;
+import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
@@ -49,6 +51,16 @@ public class TcpProviderIntentTest {
}
@Test
+ public void basicAndAsnycIntents() {
+ Map<String, Object> props = new HashMap<String, Object>();
+ EndpointHelper.addObjectClass(props, exportedInterfaces);
+ String[] standardIntents = new String[] {"osgi.basic", "osgi.async"};
+ props.put(RemoteConstants.SERVICE_EXPORTED_INTENTS, standardIntents);
+ Endpoint ep = provider.exportService(myService, bc, props, exportedInterfaces);
+ Assert.assertThat("Service should be exported as the intents: " + Arrays.toString(standardIntents) + " must be supported", ep, notNullValue());
+ }
+
+ @Test
public void unknownIntent() {
Map<String, Object> props = new HashMap<String, Object>();
EndpointHelper.addObjectClass(props, exportedInterfaces);
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/9a08b235/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderTest.java
----------------------------------------------------------------------
diff --git a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderTest.java b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderTest.java
index 538b52b..67678f3 100644
--- a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderTest.java
+++ b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderTest.java
@@ -18,8 +18,10 @@
*/
package org.apache.aries.rsa.provider.tcp;
+import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.core.StringStartsWith.startsWith;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
@@ -47,6 +49,7 @@ import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceException;
import org.osgi.util.promise.Promise;
public class TcpProviderTest {
@@ -81,8 +84,9 @@ public class TcpProviderTest {
try {
myServiceProxy.callSlow(TIMEOUT + 100);
Assert.fail("Expecting timeout");
- } catch (RuntimeException e) {
- Assert.assertEquals(SocketTimeoutException.class, e.getCause().getClass());
+ } catch (ServiceException e) {
+ assertThat(e.getCause().getClass().getName(), equalTo(SocketTimeoutException.class.getName()));
+ assertThat(e.getType(), equalTo(ServiceException.REMOTE));
}
}
[9/9] aries-rsa git commit: [ARIES-1776] Fixes for tck tests with
SecurityManager
Posted by cs...@apache.org.
[ARIES-1776] Fixes for tck tests with SecurityManager
Project: http://git-wip-us.apache.org/repos/asf/aries-rsa/repo
Commit: http://git-wip-us.apache.org/repos/asf/aries-rsa/commit/75448368
Tree: http://git-wip-us.apache.org/repos/asf/aries-rsa/tree/75448368
Diff: http://git-wip-us.apache.org/repos/asf/aries-rsa/diff/75448368
Branch: refs/heads/master
Commit: 75448368d0efecbef48464bbf10791986e20c4b0
Parents: 2033037
Author: Christian Schneider <cs...@adobe.com>
Authored: Thu Feb 8 17:25:52 2018 +0100
Committer: Christian Schneider <cs...@adobe.com>
Committed: Thu Feb 8 17:40:34 2018 +0100
----------------------------------------------------------------------
.../aries/rsa/core/RemoteServiceAdminCore.java | 48 ++++++++++++++------
.../rsa/core/RemoteServiceAdminInstance.java | 7 +--
.../aries/rsa/core/event/EventAdminSender.java | 13 ++++--
.../rsa/core/RemoteServiceAdminCoreTest.java | 29 ++++--------
4 files changed, 56 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/75448368/rsa/src/main/java/org/apache/aries/rsa/core/RemoteServiceAdminCore.java
----------------------------------------------------------------------
diff --git a/rsa/src/main/java/org/apache/aries/rsa/core/RemoteServiceAdminCore.java b/rsa/src/main/java/org/apache/aries/rsa/core/RemoteServiceAdminCore.java
index 654e61d..1586307 100644
--- a/rsa/src/main/java/org/apache/aries/rsa/core/RemoteServiceAdminCore.java
+++ b/rsa/src/main/java/org/apache/aries/rsa/core/RemoteServiceAdminCore.java
@@ -18,6 +18,8 @@
*/
package org.apache.aries.rsa.core;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -44,6 +46,7 @@ import org.osgi.framework.ServiceListener;
import org.osgi.framework.ServiceReference;
import org.osgi.framework.ServiceRegistration;
import org.osgi.service.remoteserviceadmin.EndpointDescription;
+import org.osgi.service.remoteserviceadmin.EndpointPermission;
import org.osgi.service.remoteserviceadmin.ExportReference;
import org.osgi.service.remoteserviceadmin.ExportRegistration;
import org.osgi.service.remoteserviceadmin.ImportReference;
@@ -210,22 +213,29 @@ public class RemoteServiceAdminCore implements RemoteServiceAdmin {
return null;
}
- private ExportRegistration exportService(List<String> interfaceNames,
- ServiceReference<?> serviceReference, Map<String, Object> serviceProperties) {
+ private ExportRegistration exportService(
+ final List<String> interfaceNames,
+ final ServiceReference<?> serviceReference,
+ final Map<String, Object> serviceProperties) {
LOG.info("interfaces selected for export: " + interfaceNames);
try {
- Class<?>[] interfaces = getInterfaces(interfaceNames, serviceReference.getBundle());
- Map<String, Object> eprops = createEndpointProps(serviceProperties, interfaces);
- Bundle bundle = serviceReference.getBundle();
- if (bundle == null) {
+ checkPermission(new EndpointPermission("*", EndpointPermission.EXPORT));
+ Bundle serviceBundle = serviceReference.getBundle();
+ if (serviceBundle == null) {
throw new IllegalStateException("Service is already unregistered");
}
- BundleContext serviceContext = bundle.getBundleContext();
+ final BundleContext serviceContext = serviceBundle.getBundleContext();
+ final Object serviceO = serviceContext.getService(serviceReference);
+ final Class<?>[] interfaces = getInterfaces(serviceO, interfaceNames);
+ final Map<String, Object> eprops = createEndpointProps(serviceProperties, interfaces);
// TODO unget service when export is destroyed
- Object serviceO = serviceContext.getService(serviceReference);
- Endpoint endpoint = provider.exportService(serviceO, serviceContext, eprops, interfaces);
+ Endpoint endpoint = AccessController.doPrivileged(new PrivilegedAction<Endpoint>() {
+ public Endpoint run() {
+ return provider.exportService(serviceO, serviceContext, eprops, interfaces);
+ }
+ });
if (endpoint == null) {
return null;
}
@@ -238,11 +248,16 @@ public class RemoteServiceAdminCore implements RemoteServiceAdmin {
}
}
- private Class<?>[] getInterfaces(List<String> interfaceNames,
- Bundle serviceBundle) throws ClassNotFoundException {
+ private Class<?>[] getInterfaces(
+ Object serviceO,
+ List<String> interfaceNames
+ ) throws ClassNotFoundException {
List<Class<?>> interfaces = new ArrayList<>();
- for (String interfaceName : interfaceNames) {
- interfaces.add(serviceBundle.loadClass(interfaceName));
+ Class<?>[] allInterfaces = serviceO.getClass().getInterfaces();
+ for (Class<?> iface : allInterfaces) {
+ if (interfaceNames.contains(iface.getName())) {
+ interfaces.add(iface);
+ }
}
return interfaces.toArray(new Class[]{});
}
@@ -649,4 +664,11 @@ public class RemoteServiceAdminCore implements RemoteServiceAdmin {
}
}
}
+
+ private void checkPermission(EndpointPermission permission) {
+ SecurityManager sm = System.getSecurityManager();
+ if (sm != null) {
+ sm.checkPermission(permission);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/75448368/rsa/src/main/java/org/apache/aries/rsa/core/RemoteServiceAdminInstance.java
----------------------------------------------------------------------
diff --git a/rsa/src/main/java/org/apache/aries/rsa/core/RemoteServiceAdminInstance.java b/rsa/src/main/java/org/apache/aries/rsa/core/RemoteServiceAdminInstance.java
index cd435ba..7158f52 100644
--- a/rsa/src/main/java/org/apache/aries/rsa/core/RemoteServiceAdminInstance.java
+++ b/rsa/src/main/java/org/apache/aries/rsa/core/RemoteServiceAdminInstance.java
@@ -52,12 +52,7 @@ public class RemoteServiceAdminInstance implements RemoteServiceAdmin {
@Override
@SuppressWarnings("rawtypes")
public List<ExportRegistration> exportService(final ServiceReference ref, final Map properties) {
- checkPermission(new EndpointPermission("*", EndpointPermission.EXPORT));
- return AccessController.doPrivileged(new PrivilegedAction<List<ExportRegistration>>() {
- public List<ExportRegistration> run() {
- return closed ? Collections.<ExportRegistration>emptyList() : rsaCore.exportService(ref, properties);
- }
- });
+ return closed ? Collections.<ExportRegistration>emptyList() : rsaCore.exportService(ref, properties);
}
@Override
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/75448368/rsa/src/main/java/org/apache/aries/rsa/core/event/EventAdminSender.java
----------------------------------------------------------------------
diff --git a/rsa/src/main/java/org/apache/aries/rsa/core/event/EventAdminSender.java b/rsa/src/main/java/org/apache/aries/rsa/core/event/EventAdminSender.java
index f42afc9..9f3e55c 100644
--- a/rsa/src/main/java/org/apache/aries/rsa/core/event/EventAdminSender.java
+++ b/rsa/src/main/java/org/apache/aries/rsa/core/event/EventAdminSender.java
@@ -1,5 +1,7 @@
package org.apache.aries.rsa.core.event;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
import java.util.HashMap;
import java.util.Map;
@@ -30,11 +32,16 @@ public class EventAdminSender {
}
public void send(RemoteServiceAdminEvent rsaEvent) {
- Event event = toEvent(rsaEvent);
+ final Event event = toEvent(rsaEvent);
ServiceReference<EventAdmin> sref = this.context.getServiceReference(EventAdmin.class);
if (sref != null) {
- EventAdmin eventAdmin = this.context.getService(sref);
- eventAdmin.postEvent(event);
+ final EventAdmin eventAdmin = this.context.getService(sref);
+ AccessController.doPrivileged(new PrivilegedAction<Void>() {
+ public Void run() {
+ eventAdmin.postEvent(event);
+ return null;
+ }
+ });
this.context.ungetService(sref);
}
}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/75448368/rsa/src/test/java/org/apache/aries/rsa/core/RemoteServiceAdminCoreTest.java
----------------------------------------------------------------------
diff --git a/rsa/src/test/java/org/apache/aries/rsa/core/RemoteServiceAdminCoreTest.java b/rsa/src/test/java/org/apache/aries/rsa/core/RemoteServiceAdminCoreTest.java
index 02f6f18..c6f3378 100644
--- a/rsa/src/test/java/org/apache/aries/rsa/core/RemoteServiceAdminCoreTest.java
+++ b/rsa/src/test/java/org/apache/aries/rsa/core/RemoteServiceAdminCoreTest.java
@@ -212,6 +212,7 @@ public class RemoteServiceAdminCoreTest {
ServiceReference sref = mockServiceReference(sProps);
provider.endpoint = createEndpoint(sProps);
+ ServiceReference sref2 = mockServiceReference(sProps);
c.replay();
// Export the service for the first time
@@ -230,7 +231,6 @@ public class RemoteServiceAdminCoreTest {
// Ask to export the same service again, this should not go through the whole process again but simply return
// a copy of the first instance.
- ServiceReference sref2 = mockServiceReference(sProps);
List<ExportRegistration> eregs2 = rsaCore.exportService(sref2, null);
assertEquals(1, eregs2.size());
ExportRegistration ereg2 = eregs2.iterator().next();
@@ -296,6 +296,7 @@ public class RemoteServiceAdminCoreTest {
sProps.put("service.exported.interfaces", "*");
ServiceReference sref = mockServiceReference(sProps);
+ c.replay();
provider.ex = new TestException();
List<ExportRegistration> ereg = rsaCore.exportService(sref, sProps);
@@ -304,6 +305,7 @@ public class RemoteServiceAdminCoreTest {
Collection<ExportReference> exportedServices = rsaCore.getExportedServices();
assertEquals("No service was exported", 0, exportedServices.size());
+ c.verify();
}
@Test
@@ -345,34 +347,23 @@ public class RemoteServiceAdminCoreTest {
}
private ServiceReference mockServiceReference(final Map<String, Object> sProps) {
- BundleContext bc = EasyMock.createNiceMock(BundleContext.class);
-
- Bundle sb = EasyMock.createNiceMock(Bundle.class);
+ BundleContext bc = c.createMock(BundleContext.class);
+ Bundle sb = c.createMock(Bundle.class);
expect(sb.getBundleContext()).andReturn(bc).anyTimes();
- try {
- expect((Class)sb.loadClass(Runnable.class.getName())).andReturn(Runnable.class);
- } catch (ClassNotFoundException e) {
- throw new IllegalStateException(e.getMessage(), e);
- }
- EasyMock.replay(sb);
-
expect(bc.getBundle()).andReturn(sb).anyTimes();
- EasyMock.replay(bc);
- ServiceReference sref = EasyMock.createNiceMock(ServiceReference.class);
+ String[] propKeys = sProps.keySet().toArray(new String[] {});
+ ServiceReference sref = c.createMock(ServiceReference.class);
expect(sref.getBundle()).andReturn(sb).anyTimes();
- expect(sref.getPropertyKeys()).andReturn(sProps.keySet().toArray(new String[] {})).anyTimes();
+ expect(sref.getPropertyKeys()).andReturn(propKeys).anyTimes();
expect(sref.getProperty((String) EasyMock.anyObject())).andAnswer(new IAnswer<Object>() {
@Override
public Object answer() throws Throwable {
return sProps.get(EasyMock.getCurrentArguments()[0]);
}
}).anyTimes();
- EasyMock.replay(sref);
-
- Runnable svcObject = EasyMock.createNiceMock(Runnable.class);
- EasyMock.replay(svcObject);
-
+ Runnable svcObject = c.createMock(Runnable.class);
+ expect(bc.getService(sref)).andReturn(svcObject).anyTimes();
return sref;
}