You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aries.apache.org by cs...@apache.org on 2016/03/11 20:43:38 UTC
[39/50] [abbrv] aries-rsa git commit: Switch project setup to Aries
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/d73a3a7f/discovery/distributed/zookeeper-server/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/server/ZookeeperStarter.java
----------------------------------------------------------------------
diff --git a/discovery/distributed/zookeeper-server/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/server/ZookeeperStarter.java b/discovery/distributed/zookeeper-server/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/server/ZookeeperStarter.java
deleted file mode 100644
index bd5618f..0000000
--- a/discovery/distributed/zookeeper-server/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/server/ZookeeperStarter.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.dosgi.discovery.zookeeper.server;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Dictionary;
-import java.util.Map;
-
-import org.apache.zookeeper.server.ServerConfig;
-import org.apache.zookeeper.server.ZooKeeperServerMain;
-import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
-import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
-import org.apache.zookeeper.server.quorum.QuorumPeerMain;
-import org.osgi.framework.BundleContext;
-import org.osgi.service.cm.ConfigurationException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ZookeeperStarter implements org.osgi.service.cm.ManagedService {
-
- private static final Logger LOG = LoggerFactory.getLogger(ZookeeperStarter.class); //NOPMD - using log4j here
-
- protected ZookeeperServer main;
- private final BundleContext bundleContext;
- private Thread zkMainThread;
- private Map<String, ?> curConfiguration;
-
- public ZookeeperStarter(BundleContext ctx) {
- bundleContext = ctx;
- }
-
- synchronized void shutdown() {
- if (main != null) {
- LOG.info("Shutting down ZooKeeper server");
- try {
- main.shutdown();
- if (zkMainThread != null) {
- zkMainThread.join();
- }
- } catch (Throwable e) {
- LOG.error(e.getMessage(), e);
- }
- main = null;
- zkMainThread = null;
- }
- }
-
- private void setDefaults(Dictionary<String, String> dict) throws IOException {
- Utils.removeEmptyValues(dict); // to avoid NumberFormatExceptions
- Utils.setDefault(dict, "tickTime", "2000");
- Utils.setDefault(dict, "initLimit", "10");
- Utils.setDefault(dict, "syncLimit", "5");
- Utils.setDefault(dict, "clientPort", "2181");
- Utils.setDefault(dict, "dataDir", new File(bundleContext.getDataFile(""), "zkdata").getCanonicalPath());
- }
-
- @SuppressWarnings("unchecked")
- public synchronized void updated(Dictionary<String, ?> dict) throws ConfigurationException {
- LOG.debug("Received configuration update for Zookeeper Server: " + dict);
- try {
- if (dict != null) {
- setDefaults((Dictionary<String, String>)dict);
- }
- Map<String, ?> configMap = Utils.toMap(dict);
- if (!configMap.equals(curConfiguration)) { // only if something actually changed
- shutdown();
- curConfiguration = configMap;
- // config is null if it doesn't exist, is being deleted or has not yet been loaded
- // in which case we just stop running
- if (dict != null) {
- startFromConfig(parseConfig(dict));
- LOG.info("Applied configuration update: " + dict);
- }
- }
- } catch (Exception th) {
- LOG.error("Problem applying configuration update: " + dict, th);
- }
- }
-
- private QuorumPeerConfig parseConfig(Dictionary<String, ?> dict) throws IOException, ConfigException {
- QuorumPeerConfig config = new QuorumPeerConfig();
- config.parseProperties(Utils.toProperties(dict));
- return config;
- }
-
- protected void startFromConfig(final QuorumPeerConfig config) {
- int numServers = config.getServers().size();
- main = numServers > 1 ? new MyQuorumPeerMain(config) : new MyZooKeeperServerMain(config);
- zkMainThread = new Thread(new Runnable() {
- public void run() {
- try {
- main.startup();
- } catch (Throwable e) {
- LOG.error("Problem running ZooKeeper server.", e);
- }
- }
- });
- zkMainThread.start();
- }
-
- interface ZookeeperServer {
- void startup() throws IOException;
- void shutdown();
- }
-
- static class MyQuorumPeerMain extends QuorumPeerMain implements ZookeeperServer {
-
- private QuorumPeerConfig config;
-
- MyQuorumPeerMain(QuorumPeerConfig config) {
- this.config = config;
- }
-
- public void startup() throws IOException {
- runFromConfig(config);
- }
-
- public void shutdown() {
- if (null != quorumPeer) {
- quorumPeer.shutdown();
- }
- }
- }
-
- static class MyZooKeeperServerMain extends ZooKeeperServerMain implements ZookeeperServer {
-
- private QuorumPeerConfig config;
-
- MyZooKeeperServerMain(QuorumPeerConfig config) {
- this.config = config;
- }
-
- public void startup() throws IOException {
- ServerConfig serverConfig = new ServerConfig();
- serverConfig.readFrom(config);
- runFromConfig(serverConfig);
- }
-
- public void shutdown() {
- try {
- super.shutdown();
- } catch (Exception e) {
- LOG.error("Error shutting down ZooKeeper", e);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/d73a3a7f/discovery/distributed/zookeeper-server/src/main/resources/OSGI-INF/metatype/zookeeper.xml
----------------------------------------------------------------------
diff --git a/discovery/distributed/zookeeper-server/src/main/resources/OSGI-INF/metatype/zookeeper.xml b/discovery/distributed/zookeeper-server/src/main/resources/OSGI-INF/metatype/zookeeper.xml
deleted file mode 100644
index efd9403..0000000
--- a/discovery/distributed/zookeeper-server/src/main/resources/OSGI-INF/metatype/zookeeper.xml
+++ /dev/null
@@ -1,34 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
--->
-<MetaData xmlns="http://www.osgi.org/xmlns/metadata/v1.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="
- http://www.osgi.org/xmlns/metadata/v1.0.0 http://www.osgi.org/xmlns/metatype/v1.1.0/metatype.xsd
- ">
- <OCD description="" name="Zookeeper server config" id="org.apache.cxf.dosgi.discovery.zookeeper.server">
- <AD id="clientPort" required="false" type="String" default="2181" description=""/>
- <AD id="tickTime" required="false" type="String" default="2000" description=""/>
- <AD id="initLimit" required="false" type="String" default="10" description=""/>
- <AD id="syncLimit" required="false" type="String" default="5" description=""/>
- </OCD>
- <Designate pid="org.apache.cxf.dosgi.discovery.zookeeper.server">
- <Object ocdref="org.apache.cxf.dosgi.discovery.zookeeper.server"/>
- </Designate>
-</MetaData>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/d73a3a7f/discovery/distributed/zookeeper-server/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/server/ZookeeperStarterTest.java
----------------------------------------------------------------------
diff --git a/discovery/distributed/zookeeper-server/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/server/ZookeeperStarterTest.java b/discovery/distributed/zookeeper-server/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/server/ZookeeperStarterTest.java
deleted file mode 100644
index 17ca117..0000000
--- a/discovery/distributed/zookeeper-server/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/server/ZookeeperStarterTest.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.dosgi.discovery.zookeeper.server;
-
-import java.io.File;
-import java.util.Dictionary;
-import java.util.Hashtable;
-
-import junit.framework.TestCase;
-
-import org.apache.cxf.dosgi.discovery.zookeeper.server.ZookeeperStarter.MyZooKeeperServerMain;
-import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
-import org.easymock.classextension.EasyMock;
-import org.easymock.classextension.IMocksControl;
-import org.osgi.framework.BundleContext;
-
-import static org.easymock.EasyMock.expect;
-import static org.easymock.classextension.EasyMock.replay;
-import static org.easymock.classextension.EasyMock.verify;
-
-public class ZookeeperStarterTest extends TestCase {
-
- public void testUpdateConfig() throws Exception {
- final File tempDir = new File("target");
- IMocksControl control = EasyMock.createControl();
- BundleContext bc = control.createMock(BundleContext.class);
- expect(bc.getDataFile("")).andReturn(tempDir);
- final MyZooKeeperServerMain mockServer = control.createMock(MyZooKeeperServerMain.class);
- control.replay();
-
- ZookeeperStarter starter = new ZookeeperStarter(bc) {
- @Override
- protected void startFromConfig(QuorumPeerConfig config) {
- assertEquals(1234, config.getClientPortAddress().getPort());
- assertTrue(config.getDataDir().contains(tempDir + File.separator + "zkdata"));
- assertEquals(2000, config.getTickTime());
- assertEquals(10, config.getInitLimit());
- assertEquals(5, config.getSyncLimit());
- this.main = mockServer;
- }
- };
- Dictionary<String, Object> props = new Hashtable<String, Object>();
- props.put("clientPort", "1234");
- starter.updated(props);
- assertNotNull(starter.main);
-
- control.verify();
- }
-
- public void testRemoveConfiguration() throws Exception {
- BundleContext bc = EasyMock.createMock(BundleContext.class);
- MyZooKeeperServerMain zkServer = EasyMock.createMock(MyZooKeeperServerMain.class);
- zkServer.shutdown();
- EasyMock.expectLastCall();
-
- replay(zkServer);
-
- ZookeeperStarter starter = new ZookeeperStarter(bc);
- starter.main = zkServer;
- starter.updated(null);
-
- verify(zkServer);
- assertNull("main should be null", starter.main);
- }
-}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/d73a3a7f/discovery/local/bnd.bnd
----------------------------------------------------------------------
diff --git a/discovery/local/bnd.bnd b/discovery/local/bnd.bnd
new file mode 100644
index 0000000..b1233ce
--- /dev/null
+++ b/discovery/local/bnd.bnd
@@ -0,0 +1 @@
+Bundle-Activator: org.apache.cxf.dosgi.discovery.local.internal.Activator
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/d73a3a7f/discovery/local/pom.xml
----------------------------------------------------------------------
diff --git a/discovery/local/pom.xml b/discovery/local/pom.xml
index 8fa118f..8b7435a 100644
--- a/discovery/local/pom.xml
+++ b/discovery/local/pom.xml
@@ -20,17 +20,18 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
- <artifactId>cxf-dosgi-ri-discovery-local</artifactId>
- <packaging>bundle</packaging>
- <name>CXF Local Discovery Service Bundle</name>
- <description>The CXF Local Discovery Service Implementation</description>
<parent>
- <groupId>org.apache.cxf.dosgi</groupId>
- <artifactId>cxf-dosgi-ri-parent</artifactId>
+ <groupId>org.apache.aries.rsa</groupId>
+ <artifactId>parent</artifactId>
<version>1.8-SNAPSHOT</version>
<relativePath>../../parent/pom.xml</relativePath>
</parent>
+
+ <groupId>org.apache.aries.rsa.discovery</groupId>
+ <artifactId>local</artifactId>
+ <packaging>bundle</packaging>
+ <name>Aries Remote Service Admin Discovery Local</name>
<properties>
<topDirectoryLocation>../..</topDirectoryLocation>
@@ -38,34 +39,9 @@
<dependencies>
<dependency>
- <groupId>org.osgi</groupId>
- <artifactId>org.osgi.core</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.osgi</groupId>
- <artifactId>org.osgi.compendium</artifactId>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- </dependency>
-
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
<groupId>xmlunit</groupId>
<artifactId>xmlunit</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.easymock</groupId>
- <artifactId>easymockclassextension</artifactId>
+ <version>1.6</version>
<scope>test</scope>
</dependency>
</dependencies>
@@ -84,16 +60,6 @@
</execution>
</executions>
</plugin>
- <plugin>
- <groupId>org.apache.felix</groupId>
- <artifactId>maven-bundle-plugin</artifactId>
- <configuration>
- <instructions>
- <Bundle-SymbolicName>${project.artifactId}</Bundle-SymbolicName>
- <Bundle-Activator>org.apache.cxf.dosgi.discovery.local.internal.Activator</Bundle-Activator>
- </instructions>
- </configuration>
- </plugin>
</plugins>
</build>
</project>
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/d73a3a7f/discovery/pom.xml
----------------------------------------------------------------------
diff --git a/discovery/pom.xml b/discovery/pom.xml
index f176e51..40b5f80 100644
--- a/discovery/pom.xml
+++ b/discovery/pom.xml
@@ -18,24 +18,23 @@
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
<modelVersion>4.0.0</modelVersion>
- <groupId>org.apache.cxf.dosgi</groupId>
- <artifactId>cxf-dosgi-ri-discovery</artifactId>
- <packaging>pom</packaging>
- <name>Distributed OSGI Discovery Service Modules</name>
- <version>1.8-SNAPSHOT</version>
<parent>
- <groupId>org.apache.cxf.dosgi</groupId>
- <artifactId>cxf-dosgi-ri-parent</artifactId>
+ <groupId>org.apache.aries.rsa</groupId>
+ <artifactId>parent</artifactId>
<version>1.8-SNAPSHOT</version>
<relativePath>../parent/pom.xml</relativePath>
</parent>
+
+ <artifactId>discovery</artifactId>
+ <packaging>pom</packaging>
+ <name>Aries Remote Service Admin Discovery</name>
<modules>
<module>local</module>
- <module>distributed</module>
+ <module>zookeeper</module>
+ <module>zookeeper-server</module>
+ <module>zookeeper-server-config</module>
</modules>
-
</project>
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/d73a3a7f/discovery/zookeeper-server-config/bnd.bnd
----------------------------------------------------------------------
diff --git a/discovery/zookeeper-server-config/bnd.bnd b/discovery/zookeeper-server-config/bnd.bnd
new file mode 100644
index 0000000..769558e
--- /dev/null
+++ b/discovery/zookeeper-server-config/bnd.bnd
@@ -0,0 +1 @@
+Bundle-Activator: org.apache.cxf.dosgi.discovery.zookeeper.server.config.Activator
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/d73a3a7f/discovery/zookeeper-server-config/pom.xml
----------------------------------------------------------------------
diff --git a/discovery/zookeeper-server-config/pom.xml b/discovery/zookeeper-server-config/pom.xml
new file mode 100644
index 0000000..4f7ac7a
--- /dev/null
+++ b/discovery/zookeeper-server-config/pom.xml
@@ -0,0 +1,40 @@
+<?xml version='1.0' encoding='UTF-8' ?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.aries.rsa</groupId>
+ <artifactId>parent</artifactId>
+ <version>1.8-SNAPSHOT</version>
+ <relativePath>../../parent/pom.xml</relativePath>
+ </parent>
+
+ <groupId>org.apache.aries.rsa.discovery</groupId>
+ <artifactId>zookeeper-server-config</artifactId>
+ <packaging>bundle</packaging>
+ <name>Aries Remote Service Admin Discovery Zookeeper Config</name>
+
+ <properties>
+ <topDirectoryLocation>../..</topDirectoryLocation>
+ </properties>
+
+</project>
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/d73a3a7f/discovery/zookeeper-server-config/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/server/config/Activator.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper-server-config/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/server/config/Activator.java b/discovery/zookeeper-server-config/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/server/config/Activator.java
new file mode 100644
index 0000000..a00c7b0
--- /dev/null
+++ b/discovery/zookeeper-server-config/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/server/config/Activator.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.dosgi.discovery.zookeeper.server.config;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.util.Dictionary;
+import java.util.Hashtable;
+
+import org.osgi.framework.BundleActivator;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceReference;
+import org.osgi.service.cm.Configuration;
+import org.osgi.service.cm.ConfigurationAdmin;
+import org.osgi.service.cm.ManagedService;
+import org.osgi.util.tracker.ServiceTracker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Activator implements BundleActivator {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Activator.class);
+ private static final String ZOOKEEPER_PORT = "org.apache.cxf.dosgi.discovery.zookeeper.port";
+ private static final String PID = "org.apache.cxf.dosgi.discovery.zookeeper.server";
+ private ServiceTracker<ConfigurationAdmin, ConfigurationAdmin> st;
+
+ public void start(BundleContext context) throws Exception {
+ synchronized (Activator.class) {
+ // Only one thread gets to set the port number
+ if (System.getProperty(ZOOKEEPER_PORT) == null) {
+ String port = getFreePort();
+ System.setProperty(ZOOKEEPER_PORT, port);
+ LOG.info("Global ZooKeeper port: {}", port);
+ }
+ }
+
+ st = new ServiceTracker<ConfigurationAdmin, ConfigurationAdmin>(context, ConfigurationAdmin.class, null) {
+ @Override
+ public ConfigurationAdmin addingService(ServiceReference<ConfigurationAdmin> reference) {
+ ConfigurationAdmin service = super.addingService(reference);
+ try {
+ Configuration cfg = service.getConfiguration(PID, null);
+ Dictionary<String, Object> props = new Hashtable<String, Object>();
+ String zp = System.getProperty(ZOOKEEPER_PORT);
+ props.put("clientPort", zp);
+ cfg.update(props);
+ LOG.debug("Set ZooKeeper client port to {}", zp);
+ } catch (IOException e) {
+ LOG.error("Failed to configure ZooKeeper server!", e);
+ }
+ return service;
+ }
+ };
+ st.open();
+
+ // The following section is done synchronously otherwise it doesn't happen in time for the CT
+ ServiceReference[] refs = context.getServiceReferences(ManagedService.class.getName(),
+ "(service.pid=org.apache.cxf.dosgi.discovery.zookeeper)");
+ if (refs == null || refs.length == 0) {
+ throw new RuntimeException("This bundle must be started after the bundle with the ZooKeeper "
+ + "Discovery Managed Service was started.");
+ }
+
+ Dictionary<String, Object> props = new Hashtable<String, Object>();
+ props.put("zookeeper.host", "127.0.0.1");
+ props.put("zookeeper.port", System.getProperty(ZOOKEEPER_PORT));
+
+ ManagedService ms = (ManagedService) context.getService(refs[0]);
+ try {
+ ms.updated(props);
+ } finally {
+ if (ms != null) {
+ context.ungetService(refs[0]);
+ }
+ }
+ LOG.debug("Passed the zookeeper.host property to the ZooKeeper Client managed service.");
+ }
+
+ private String getFreePort() {
+ try {
+ ServerSocket ss = new ServerSocket(0);
+ String port = "" + ss.getLocalPort();
+ ss.close();
+ return port;
+ } catch (IOException e) {
+ LOG.error("Failed to find a free port!", e);
+ return null;
+ }
+ }
+
+ public void stop(BundleContext context) throws Exception {
+ st.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/d73a3a7f/discovery/zookeeper-server/bnd.bnd
----------------------------------------------------------------------
diff --git a/discovery/zookeeper-server/bnd.bnd b/discovery/zookeeper-server/bnd.bnd
new file mode 100644
index 0000000..cef642b
--- /dev/null
+++ b/discovery/zookeeper-server/bnd.bnd
@@ -0,0 +1 @@
+Bundle-Activator: org.apache.cxf.dosgi.discovery.zookeeper.server.Activator
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/d73a3a7f/discovery/zookeeper-server/pom.xml
----------------------------------------------------------------------
diff --git a/discovery/zookeeper-server/pom.xml b/discovery/zookeeper-server/pom.xml
new file mode 100644
index 0000000..705d319
--- /dev/null
+++ b/discovery/zookeeper-server/pom.xml
@@ -0,0 +1,84 @@
+<?xml version='1.0' encoding='UTF-8' ?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.aries.rsa</groupId>
+ <artifactId>parent</artifactId>
+ <version>1.8-SNAPSHOT</version>
+ <relativePath>../../parent/pom.xml</relativePath>
+ </parent>
+
+ <groupId>org.apache.aries.rsa.discovery</groupId>
+ <artifactId>zookeeper-server</artifactId>
+ <packaging>bundle</packaging>
+ <name>Aries Remote Service Admin Discovery Zookeeper Server</name>
+
+
+ <properties>
+ <topDirectoryLocation>../..</topDirectoryLocation>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ <version>${zookeeper.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.sun.jdmk</groupId>
+ <artifactId>jmxtools</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jmx</groupId>
+ <artifactId>jmxri</artifactId>
+ </exclusion>
+ <exclusion>
+ <artifactId>slf4j-log4j12</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jline</artifactId>
+ <groupId>jline</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>netty</artifactId>
+ <groupId>io.netty</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>log4j</artifactId>
+ <groupId>log4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <!-- We need the newer log4j as the one from ZooKeeper has some ugly dependencies -->
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <version>${log4j.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ </dependencies>
+
+</project>
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/d73a3a7f/discovery/zookeeper-server/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/server/Activator.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper-server/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/server/Activator.java b/discovery/zookeeper-server/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/server/Activator.java
new file mode 100644
index 0000000..6adf700
--- /dev/null
+++ b/discovery/zookeeper-server/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/server/Activator.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.dosgi.discovery.zookeeper.server;
+
+import java.util.Dictionary;
+import java.util.Hashtable;
+
+import org.osgi.framework.BundleActivator;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.Constants;
+
+public class Activator implements BundleActivator {
+
+ ZookeeperStarter zkStarter;
+
+ public void start(BundleContext context) throws Exception {
+ zkStarter = new ZookeeperStarter(context);
+ Dictionary<String, Object> props = new Hashtable<String, Object>();
+ props.put(Constants.SERVICE_PID, "org.apache.cxf.dosgi.discovery.zookeeper.server");
+ context.registerService(org.osgi.service.cm.ManagedService.class.getName(), zkStarter, props);
+ }
+
+ public void stop(BundleContext context) throws Exception {
+ if (zkStarter != null) {
+ zkStarter.shutdown();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/d73a3a7f/discovery/zookeeper-server/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/server/Utils.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper-server/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/server/Utils.java b/discovery/zookeeper-server/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/server/Utils.java
new file mode 100644
index 0000000..fe3c663
--- /dev/null
+++ b/discovery/zookeeper-server/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/server/Utils.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.dosgi.discovery.zookeeper.server;
+
+import java.util.ArrayList;
+import java.util.Dictionary;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * General purpose utility methods.
+ */
+public final class Utils {
+
+ private Utils() {
+ // prevent instantiation
+ }
+
+ /**
+ * Remove entries whose values are empty from the given dictionary.
+ *
+ * @param dict a dictionary
+ */
+ public static void removeEmptyValues(Dictionary<String, ?> dict) {
+ List<String> keysToRemove = new ArrayList<String>();
+ Enumeration<String> keys = dict.keys();
+ while (keys.hasMoreElements()) {
+ String key = keys.nextElement();
+ Object value = dict.get(key);
+ if (value instanceof String && "".equals(value)) {
+ keysToRemove.add(key);
+ }
+ }
+ for (String key : keysToRemove) {
+ dict.remove(key);
+ }
+ }
+
+ /**
+ * Puts the given key-value pair in the given dictionary if the key does not
+ * already exist in it or if its existing value is null.
+ *
+ * @param dict a dictionary
+ * @param key the key
+ * @param value the default value to set
+ */
+ public static void setDefault(Dictionary<String, String> dict, String key, String value) {
+ if (dict.get(key) == null) {
+ dict.put(key, value);
+ }
+ }
+
+ /**
+ * Converts the given Dictionary to a Map.
+ *
+ * @param dict a dictionary
+ * @param <K> the key type
+ * @param <V> the value type
+ * @return the converted map, or an empty map if the given dictionary is null
+ */
+ public static <K, V> Map<K, V> toMap(Dictionary<K, V> dict) {
+ Map<K, V> map = new HashMap<K, V>();
+ if (dict != null) {
+ Enumeration<K> keys = dict.keys();
+ while (keys.hasMoreElements()) {
+ K key = keys.nextElement();
+ map.put(key, dict.get(key));
+ }
+ }
+ return map;
+ }
+
+ /**
+ * Converts a Dictionary into a Properties instance.
+ *
+ * @param dict a dictionary
+ * @param <K> the key type
+ * @param <V> the value type
+ * @return the properties
+ */
+ public static <K, V> Properties toProperties(Dictionary<K, V> dict) {
+ Properties props = new Properties();
+ for (Enumeration<K> e = dict.keys(); e.hasMoreElements();) {
+ K key = e.nextElement();
+ props.put(key, dict.get(key));
+ }
+ return props;
+ }
+}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/d73a3a7f/discovery/zookeeper-server/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/server/ZookeeperStarter.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper-server/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/server/ZookeeperStarter.java b/discovery/zookeeper-server/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/server/ZookeeperStarter.java
new file mode 100644
index 0000000..bd5618f
--- /dev/null
+++ b/discovery/zookeeper-server/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/server/ZookeeperStarter.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.dosgi.discovery.zookeeper.server;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Dictionary;
+import java.util.Map;
+
+import org.apache.zookeeper.server.ServerConfig;
+import org.apache.zookeeper.server.ZooKeeperServerMain;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
+import org.apache.zookeeper.server.quorum.QuorumPeerMain;
+import org.osgi.framework.BundleContext;
+import org.osgi.service.cm.ConfigurationException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ZookeeperStarter implements org.osgi.service.cm.ManagedService {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ZookeeperStarter.class); //NOPMD - using log4j here
+
+ protected ZookeeperServer main;
+ private final BundleContext bundleContext;
+ private Thread zkMainThread;
+ private Map<String, ?> curConfiguration;
+
+ public ZookeeperStarter(BundleContext ctx) {
+ bundleContext = ctx;
+ }
+
+ synchronized void shutdown() {
+ if (main != null) {
+ LOG.info("Shutting down ZooKeeper server");
+ try {
+ main.shutdown();
+ if (zkMainThread != null) {
+ zkMainThread.join();
+ }
+ } catch (Throwable e) {
+ LOG.error(e.getMessage(), e);
+ }
+ main = null;
+ zkMainThread = null;
+ }
+ }
+
+ private void setDefaults(Dictionary<String, String> dict) throws IOException {
+ Utils.removeEmptyValues(dict); // to avoid NumberFormatExceptions
+ Utils.setDefault(dict, "tickTime", "2000");
+ Utils.setDefault(dict, "initLimit", "10");
+ Utils.setDefault(dict, "syncLimit", "5");
+ Utils.setDefault(dict, "clientPort", "2181");
+ Utils.setDefault(dict, "dataDir", new File(bundleContext.getDataFile(""), "zkdata").getCanonicalPath());
+ }
+
+ @SuppressWarnings("unchecked")
+ public synchronized void updated(Dictionary<String, ?> dict) throws ConfigurationException {
+ LOG.debug("Received configuration update for Zookeeper Server: " + dict);
+ try {
+ if (dict != null) {
+ setDefaults((Dictionary<String, String>)dict);
+ }
+ Map<String, ?> configMap = Utils.toMap(dict);
+ if (!configMap.equals(curConfiguration)) { // only if something actually changed
+ shutdown();
+ curConfiguration = configMap;
+ // config is null if it doesn't exist, is being deleted or has not yet been loaded
+ // in which case we just stop running
+ if (dict != null) {
+ startFromConfig(parseConfig(dict));
+ LOG.info("Applied configuration update: " + dict);
+ }
+ }
+ } catch (Exception th) {
+ LOG.error("Problem applying configuration update: " + dict, th);
+ }
+ }
+
+ private QuorumPeerConfig parseConfig(Dictionary<String, ?> dict) throws IOException, ConfigException {
+ QuorumPeerConfig config = new QuorumPeerConfig();
+ config.parseProperties(Utils.toProperties(dict));
+ return config;
+ }
+
+ protected void startFromConfig(final QuorumPeerConfig config) {
+ int numServers = config.getServers().size();
+ main = numServers > 1 ? new MyQuorumPeerMain(config) : new MyZooKeeperServerMain(config);
+ zkMainThread = new Thread(new Runnable() {
+ public void run() {
+ try {
+ main.startup();
+ } catch (Throwable e) {
+ LOG.error("Problem running ZooKeeper server.", e);
+ }
+ }
+ });
+ zkMainThread.start();
+ }
+
+ interface ZookeeperServer {
+ void startup() throws IOException;
+ void shutdown();
+ }
+
+ static class MyQuorumPeerMain extends QuorumPeerMain implements ZookeeperServer {
+
+ private QuorumPeerConfig config;
+
+ MyQuorumPeerMain(QuorumPeerConfig config) {
+ this.config = config;
+ }
+
+ public void startup() throws IOException {
+ runFromConfig(config);
+ }
+
+ public void shutdown() {
+ if (null != quorumPeer) {
+ quorumPeer.shutdown();
+ }
+ }
+ }
+
+ static class MyZooKeeperServerMain extends ZooKeeperServerMain implements ZookeeperServer {
+
+ private QuorumPeerConfig config;
+
+ MyZooKeeperServerMain(QuorumPeerConfig config) {
+ this.config = config;
+ }
+
+ public void startup() throws IOException {
+ ServerConfig serverConfig = new ServerConfig();
+ serverConfig.readFrom(config);
+ runFromConfig(serverConfig);
+ }
+
+ public void shutdown() {
+ try {
+ super.shutdown();
+ } catch (Exception e) {
+ LOG.error("Error shutting down ZooKeeper", e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/d73a3a7f/discovery/zookeeper-server/src/main/resources/OSGI-INF/metatype/zookeeper.xml
----------------------------------------------------------------------
diff --git a/discovery/zookeeper-server/src/main/resources/OSGI-INF/metatype/zookeeper.xml b/discovery/zookeeper-server/src/main/resources/OSGI-INF/metatype/zookeeper.xml
new file mode 100644
index 0000000..efd9403
--- /dev/null
+++ b/discovery/zookeeper-server/src/main/resources/OSGI-INF/metatype/zookeeper.xml
@@ -0,0 +1,34 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<MetaData xmlns="http://www.osgi.org/xmlns/metadata/v1.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="
+ http://www.osgi.org/xmlns/metadata/v1.0.0 http://www.osgi.org/xmlns/metatype/v1.1.0/metatype.xsd
+ ">
+ <OCD description="" name="Zookeeper server config" id="org.apache.cxf.dosgi.discovery.zookeeper.server">
+ <AD id="clientPort" required="false" type="String" default="2181" description=""/>
+ <AD id="tickTime" required="false" type="String" default="2000" description=""/>
+ <AD id="initLimit" required="false" type="String" default="10" description=""/>
+ <AD id="syncLimit" required="false" type="String" default="5" description=""/>
+ </OCD>
+ <Designate pid="org.apache.cxf.dosgi.discovery.zookeeper.server">
+ <Object ocdref="org.apache.cxf.dosgi.discovery.zookeeper.server"/>
+ </Designate>
+</MetaData>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/d73a3a7f/discovery/zookeeper-server/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/server/ZookeeperStarterTest.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper-server/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/server/ZookeeperStarterTest.java b/discovery/zookeeper-server/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/server/ZookeeperStarterTest.java
new file mode 100644
index 0000000..17ca117
--- /dev/null
+++ b/discovery/zookeeper-server/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/server/ZookeeperStarterTest.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.dosgi.discovery.zookeeper.server;
+
+import java.io.File;
+import java.util.Dictionary;
+import java.util.Hashtable;
+
+import junit.framework.TestCase;
+
+import org.apache.cxf.dosgi.discovery.zookeeper.server.ZookeeperStarter.MyZooKeeperServerMain;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
+import org.easymock.classextension.EasyMock;
+import org.easymock.classextension.IMocksControl;
+import org.osgi.framework.BundleContext;
+
+import static org.easymock.EasyMock.expect;
+import static org.easymock.classextension.EasyMock.replay;
+import static org.easymock.classextension.EasyMock.verify;
+
+public class ZookeeperStarterTest extends TestCase {
+
+ public void testUpdateConfig() throws Exception {
+ final File tempDir = new File("target");
+ IMocksControl control = EasyMock.createControl();
+ BundleContext bc = control.createMock(BundleContext.class);
+ expect(bc.getDataFile("")).andReturn(tempDir);
+ final MyZooKeeperServerMain mockServer = control.createMock(MyZooKeeperServerMain.class);
+ control.replay();
+
+ ZookeeperStarter starter = new ZookeeperStarter(bc) {
+ @Override
+ protected void startFromConfig(QuorumPeerConfig config) {
+ assertEquals(1234, config.getClientPortAddress().getPort());
+ assertTrue(config.getDataDir().contains(tempDir + File.separator + "zkdata"));
+ assertEquals(2000, config.getTickTime());
+ assertEquals(10, config.getInitLimit());
+ assertEquals(5, config.getSyncLimit());
+ this.main = mockServer;
+ }
+ };
+ Dictionary<String, Object> props = new Hashtable<String, Object>();
+ props.put("clientPort", "1234");
+ starter.updated(props);
+ assertNotNull(starter.main);
+
+ control.verify();
+ }
+
+ public void testRemoveConfiguration() throws Exception {
+ BundleContext bc = EasyMock.createMock(BundleContext.class);
+ MyZooKeeperServerMain zkServer = EasyMock.createMock(MyZooKeeperServerMain.class);
+ zkServer.shutdown();
+ EasyMock.expectLastCall();
+
+ replay(zkServer);
+
+ ZookeeperStarter starter = new ZookeeperStarter(bc);
+ starter.main = zkServer;
+ starter.updated(null);
+
+ verify(zkServer);
+ assertNull("main should be null", starter.main);
+ }
+}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/d73a3a7f/discovery/zookeeper/bnd.bnd
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/bnd.bnd b/discovery/zookeeper/bnd.bnd
new file mode 100644
index 0000000..5c1f23d
--- /dev/null
+++ b/discovery/zookeeper/bnd.bnd
@@ -0,0 +1 @@
+Bundle-Activator: org.apache.cxf.dosgi.discovery.zookeeper.Activator
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/d73a3a7f/discovery/zookeeper/pom.xml
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/pom.xml b/discovery/zookeeper/pom.xml
new file mode 100644
index 0000000..576031a
--- /dev/null
+++ b/discovery/zookeeper/pom.xml
@@ -0,0 +1,77 @@
+<?xml version='1.0' encoding='UTF-8' ?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.aries.rsa</groupId>
+ <artifactId>parent</artifactId>
+ <version>1.8-SNAPSHOT</version>
+ <relativePath>../../parent/pom.xml</relativePath>
+ </parent>
+
+ <groupId>org.apache.aries.rsa.discovery</groupId>
+ <artifactId>zookeeper</artifactId>
+ <packaging>bundle</packaging>
+ <name>Aries Remote Service Admin Discovery Zookeeper</name>
+
+ <properties>
+ <topDirectoryLocation>../..</topDirectoryLocation>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ <version>${zookeeper.version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>com.sun.jdmk</groupId>
+ <artifactId>jmxtools</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jmx</groupId>
+ <artifactId>jmxri</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <!-- We need the newer log4j as the one from zookeeper has some ugly dependencies -->
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <version>${log4j.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.aries.rsa.discovery</groupId>
+ <artifactId>local</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ </dependencies>
+
+</project>
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/d73a3a7f/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/Activator.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/Activator.java b/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/Activator.java
new file mode 100644
index 0000000..cbbea58
--- /dev/null
+++ b/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/Activator.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.dosgi.discovery.zookeeper;
+
+import java.util.Dictionary;
+import java.util.Hashtable;
+
+import org.osgi.framework.BundleActivator;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.Constants;
+import org.osgi.service.cm.ManagedService;
+
+public class Activator implements BundleActivator {
+
+ private ZooKeeperDiscovery zkd;
+
+ public synchronized void start(BundleContext bc) throws Exception {
+ zkd = new ZooKeeperDiscovery(bc);
+ Dictionary<String, String> props = new Hashtable<String, String>();
+ props.put(Constants.SERVICE_PID, "org.apache.cxf.dosgi.discovery.zookeeper");
+ bc.registerService(ManagedService.class.getName(), zkd, props);
+ }
+
+ public synchronized void stop(BundleContext bc) throws Exception {
+ zkd.stop(true);
+ }
+}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/d73a3a7f/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/ZooKeeperDiscovery.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/ZooKeeperDiscovery.java b/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/ZooKeeperDiscovery.java
new file mode 100644
index 0000000..33e2da4
--- /dev/null
+++ b/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/ZooKeeperDiscovery.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.dosgi.discovery.zookeeper;
+
+import java.io.IOException;
+import java.util.Dictionary;
+
+import org.apache.cxf.dosgi.discovery.zookeeper.publish.PublishingEndpointListenerFactory;
+import org.apache.cxf.dosgi.discovery.zookeeper.subscribe.EndpointListenerTracker;
+import org.apache.cxf.dosgi.discovery.zookeeper.subscribe.InterfaceMonitorManager;
+import org.apache.cxf.dosgi.discovery.zookeeper.util.Utils;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.osgi.framework.BundleContext;
+import org.osgi.service.cm.ConfigurationException;
+import org.osgi.service.cm.ManagedService;
+import org.osgi.service.remoteserviceadmin.EndpointListener;
+import org.osgi.util.tracker.ServiceTracker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ZooKeeperDiscovery implements Watcher, ManagedService {
+
+ public static final String DISCOVERY_ZOOKEEPER_ID = "org.apache.cxf.dosgi.discovery.zookeeper";
+
+ private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperDiscovery.class);
+
+ private final BundleContext bctx;
+
+ private PublishingEndpointListenerFactory endpointListenerFactory;
+ private ServiceTracker<EndpointListener, EndpointListener> endpointListenerTracker;
+ private InterfaceMonitorManager imManager;
+ private ZooKeeper zk;
+ private boolean closed;
+ private boolean started;
+
+ private Dictionary<String, ?> curConfiguration;
+
+ public ZooKeeperDiscovery(BundleContext bctx) {
+ this.bctx = bctx;
+ }
+
+ private void setDefaults(Dictionary<String, String> configuration) {
+ Utils.setDefault(configuration, "zookeeper.host", "localhost");
+ Utils.setDefault(configuration, "zookeeper.port", "2181");
+ Utils.setDefault(configuration, "zookeeper.timeout", "3000");
+ }
+
+ @SuppressWarnings("unchecked")
+ public synchronized void updated(Dictionary<String, ?> configuration) throws ConfigurationException {
+ LOG.debug("Received configuration update for Zookeeper Discovery: {}", configuration);
+ if (configuration != null) {
+ setDefaults((Dictionary<String, String>)configuration);
+ }
+ // make changes only if config actually changed, to prevent unnecessary ZooKeeper reconnections
+ if (!Utils.toMap(configuration).equals(Utils.toMap(curConfiguration))) {
+ stop(false);
+ curConfiguration = configuration;
+ // config is null if it doesn't exist, is being deleted or has not yet been loaded
+ // in which case we just stop running
+ if (configuration != null) {
+ createZooKeeper(configuration);
+ }
+ }
+ }
+
+ private synchronized void start() {
+ if (closed) {
+ return;
+ }
+ if (started) {
+ // we must be re-entrant, i.e. can be called when already started
+ LOG.debug("ZookeeperDiscovery already started");
+ return;
+ }
+ LOG.debug("starting ZookeeperDiscovery");
+ endpointListenerFactory = new PublishingEndpointListenerFactory(zk, bctx);
+ endpointListenerFactory.start();
+ imManager = new InterfaceMonitorManager(bctx, zk);
+ endpointListenerTracker = new EndpointListenerTracker(bctx, imManager);
+ endpointListenerTracker.open();
+ started = true;
+ }
+
+ public synchronized void stop(boolean close) {
+ if (started) {
+ LOG.debug("stopping ZookeeperDiscovery");
+ }
+ started = false;
+ closed |= close;
+ if (endpointListenerFactory != null) {
+ endpointListenerFactory.stop();
+ }
+ if (endpointListenerTracker != null) {
+ endpointListenerTracker.close();
+ }
+ if (imManager != null) {
+ imManager.close();
+ }
+ if (zk != null) {
+ try {
+ zk.close();
+ } catch (InterruptedException e) {
+ LOG.error("Error closing ZooKeeper", e);
+ }
+ }
+ }
+
+ private synchronized void createZooKeeper(Dictionary<String, ?> configuration) {
+ if (closed) {
+ return;
+ }
+ String host = configuration.get("zookeeper.host").toString();
+ String port = configuration.get("zookeeper.port").toString();
+ int timeout = Integer.parseInt(configuration.get("zookeeper.timeout").toString());
+ LOG.debug("ZooKeeper configuration: connecting to {}:{} with timeout {}",
+ new Object[]{host, port, timeout});
+ try {
+ zk = new ZooKeeper(host + ":" + port, timeout, this);
+ } catch (IOException e) {
+ LOG.error("Failed to start the ZooKeeper Discovery component.", e);
+ }
+ }
+
+ /* Callback for ZooKeeper */
+ public void process(WatchedEvent event) {
+ LOG.debug("got ZooKeeper event " + event);
+ switch (event.getState()) {
+ case SyncConnected:
+ LOG.info("Connection to ZooKeeper established");
+ // this event can be triggered more than once in a row (e.g. after Disconnected event),
+ // so we must be re-entrant here
+ start();
+ break;
+
+ case Expired:
+ LOG.info("Connection to ZooKeeper expired. Trying to create a new connection");
+ stop(false);
+ createZooKeeper(curConfiguration);
+ break;
+
+ default:
+ // ignore other events
+ break;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/d73a3a7f/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/publish/DiscoveryPlugin.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/publish/DiscoveryPlugin.java b/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/publish/DiscoveryPlugin.java
new file mode 100644
index 0000000..5d46585
--- /dev/null
+++ b/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/publish/DiscoveryPlugin.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.dosgi.discovery.zookeeper.publish;
+
+import java.util.Map;
+
+/**
+ * This interface allows transformation of service registration information before it is pushed into the ZooKeeper
+ * discovery system.
+ * It can be useful for situations where a host name or port number needs to be changed in cases where the host running
+ * the service is known differently from the outside to what the local Java process thinks it is.
+ * Extra service properties can also be added to the registration which can be useful to refine the remote service
+ * lookup process. <p/>
+ *
+ * DiscoveryPlugins use the OSGi WhiteBoard pattern. To add one to the system, register an instance under this interface
+ * with the OSGi Service Registry. All registered DiscoveryPlugin instances are visited and given a chance to
+ * process the information before it is pushed into ZooKeeper. <p/>
+ *
+ * Note that the changes made using this plugin do not modify the local service registration.
+ *
+ */
+public interface DiscoveryPlugin {
+
+ /**
+ * Process service registration information. Plugins can change this information before it is published into the
+ * ZooKeeper discovery system.
+ *
+ * @param mutableProperties A map of service registration properties. The map is mutable and any changes to the map
+ * will be reflected in the ZooKeeper registration.
+ * @param endpointKey The key under which the service is registered in ZooKeeper. This key typically has the
+ * following format: hostname#port##context. While the actual value of this key is not actually used by the
+ * system (people can use it as a hint to understand where the service is located), the value <i>must</i> be
+ * unique for all services of a given type.
+ * @return The <tt>endpointKey</tt> value to be used. If there is no need to change this simply return the value
+ * of the <tt>endpointKey</tt> parameter.
+ */
+ String process(Map<String, Object> mutableProperties, String endpointKey);
+}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/d73a3a7f/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/publish/PublishingEndpointListener.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/publish/PublishingEndpointListener.java b/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/publish/PublishingEndpointListener.java
new file mode 100644
index 0000000..c703b9f
--- /dev/null
+++ b/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/publish/PublishingEndpointListener.java
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.dosgi.discovery.zookeeper.publish;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.cxf.dosgi.discovery.zookeeper.util.Utils;
+import org.apache.cxf.dosgi.endpointdesc.EndpointDescriptionParser;
+import org.apache.cxf.dosgi.endpointdesc.PropertiesMapper;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.osgi.framework.BundleContext;
+import org.osgi.service.remoteserviceadmin.EndpointDescription;
+import org.osgi.service.remoteserviceadmin.EndpointListener;
+import org.osgi.util.tracker.ServiceTracker;
+import org.osgi.xmlns.rsa.v1_0.EndpointDescriptionType;
+import org.osgi.xmlns.rsa.v1_0.PropertyType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Listens for local Endpoints and publishes them to ZooKeeper.
+ */
+public class PublishingEndpointListener implements EndpointListener {
+
+ private static final Logger LOG = LoggerFactory.getLogger(PublishingEndpointListener.class);
+
+ private final ZooKeeper zk;
+ private final ServiceTracker<DiscoveryPlugin, DiscoveryPlugin> discoveryPluginTracker;
+ private final List<EndpointDescription> endpoints = new ArrayList<EndpointDescription>();
+ private boolean closed;
+
+ private final EndpointDescriptionParser endpointDescriptionParser;
+
+ public PublishingEndpointListener(ZooKeeper zk, BundleContext bctx) {
+ this.zk = zk;
+ discoveryPluginTracker = new ServiceTracker<DiscoveryPlugin, DiscoveryPlugin>(bctx,
+ DiscoveryPlugin.class, null);
+ discoveryPluginTracker.open();
+ endpointDescriptionParser = new EndpointDescriptionParser();
+ }
+
+ public void endpointAdded(EndpointDescription endpoint, String matchedFilter) {
+ LOG.info("Local EndpointDescription added: {}", endpoint);
+
+ synchronized (endpoints) {
+ if (closed) {
+ return;
+ }
+ if (endpoints.contains(endpoint)) {
+ // TODO -> Should the published endpoint be updated here?
+ return;
+ }
+
+ try {
+ addEndpoint(endpoint);
+ endpoints.add(endpoint);
+ } catch (Exception ex) {
+ LOG.error("Exception while processing the addition of an endpoint.", ex);
+ }
+ }
+ }
+
+ private void addEndpoint(EndpointDescription endpoint) throws URISyntaxException, KeeperException,
+ InterruptedException, IOException {
+ Collection<String> interfaces = endpoint.getInterfaces();
+ String endpointKey = getKey(endpoint.getId());
+ Map<String, Object> props = new HashMap<String, Object>(endpoint.getProperties());
+
+ // process plugins
+ Object[] plugins = discoveryPluginTracker.getServices();
+ if (plugins != null) {
+ for (Object plugin : plugins) {
+ if (plugin instanceof DiscoveryPlugin) {
+ endpointKey = ((DiscoveryPlugin)plugin).process(props, endpointKey);
+ }
+ }
+ }
+
+ for (String name : interfaces) {
+ String path = Utils.getZooKeeperPath(name);
+ String fullPath = path + '/' + endpointKey;
+ LOG.debug("Creating ZooKeeper node: {}", fullPath);
+ ensurePath(path, zk);
+ List<PropertyType> propsOut = new PropertiesMapper().fromProps(props);
+ EndpointDescriptionType epd = new EndpointDescriptionType();
+ epd.getProperty().addAll(propsOut);
+ byte[] epData = endpointDescriptionParser.getData(epd);
+ createEphemeralNode(fullPath, epData);
+ }
+ }
+
+ private void createEphemeralNode(String fullPath, byte[] data) throws KeeperException, InterruptedException {
+ try {
+ zk.create(fullPath, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+ } catch (NodeExistsException nee) {
+ // this sometimes happens after a ZooKeeper node dies and the ephemeral node
+ // that belonged to the old session was not yet deleted. We need to make our
+ // session the owner of the node so it won't get deleted automatically -
+ // we do this by deleting and recreating it ourselves.
+ LOG.info("node for endpoint already exists, recreating: {}", fullPath);
+ try {
+ zk.delete(fullPath, -1);
+ } catch (NoNodeException nne) {
+ // it's a race condition, but as long as it got deleted - it's ok
+ }
+ zk.create(fullPath, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+ }
+ }
+
+ public void endpointRemoved(EndpointDescription endpoint, String matchedFilter) {
+ LOG.info("Local EndpointDescription removed: {}", endpoint);
+
+ synchronized (endpoints) {
+ if (closed) {
+ return;
+ }
+ if (!endpoints.contains(endpoint)) {
+ return;
+ }
+
+ try {
+ removeEndpoint(endpoint);
+ endpoints.remove(endpoint);
+ } catch (Exception ex) {
+ LOG.error("Exception while processing the removal of an endpoint", ex);
+ }
+ }
+ }
+
+ private void removeEndpoint(EndpointDescription endpoint) throws UnknownHostException, URISyntaxException {
+ Collection<String> interfaces = endpoint.getInterfaces();
+ String endpointKey = getKey(endpoint.getId());
+
+ for (String name : interfaces) {
+ String path = Utils.getZooKeeperPath(name);
+ String fullPath = path + '/' + endpointKey;
+ LOG.debug("Removing ZooKeeper node: {}", fullPath);
+ try {
+ zk.delete(fullPath, -1);
+ } catch (Exception ex) {
+ LOG.debug("Error while removing endpoint: {}", ex); // e.g. session expired
+ }
+ }
+ }
+
+ private static void ensurePath(String path, ZooKeeper zk) throws KeeperException, InterruptedException {
+ StringBuilder current = new StringBuilder();
+ String[] parts = Utils.removeEmpty(path.split("/"));
+ for (String part : parts) {
+ current.append('/');
+ current.append(part);
+ try {
+ zk.create(current.toString(), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ } catch (NodeExistsException nee) {
+ // it's not the first node with this path to ever exist - that's normal
+ }
+ }
+ }
+
+ static String getKey(String endpoint) throws URISyntaxException {
+ URI uri = new URI(endpoint);
+
+ StringBuilder sb = new StringBuilder();
+ sb.append(uri.getHost());
+ sb.append("#");
+ sb.append(uri.getPort());
+ sb.append("#");
+ sb.append(uri.getPath().replace('/', '#'));
+ return sb.toString();
+ }
+
+ public void close() {
+ LOG.debug("closing - removing all endpoints");
+ synchronized (endpoints) {
+ closed = true;
+ for (EndpointDescription endpoint : endpoints) {
+ try {
+ removeEndpoint(endpoint);
+ } catch (Exception ex) {
+ LOG.error("Exception while removing endpoint during close", ex);
+ }
+ }
+ endpoints.clear();
+ }
+ discoveryPluginTracker.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/d73a3a7f/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/publish/PublishingEndpointListenerFactory.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/publish/PublishingEndpointListenerFactory.java b/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/publish/PublishingEndpointListenerFactory.java
new file mode 100644
index 0000000..c505bb4
--- /dev/null
+++ b/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/publish/PublishingEndpointListenerFactory.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.dosgi.discovery.zookeeper.publish;
+
+import java.util.ArrayList;
+import java.util.Dictionary;
+import java.util.Hashtable;
+import java.util.List;
+
+import org.apache.cxf.dosgi.discovery.zookeeper.ZooKeeperDiscovery;
+import org.apache.cxf.dosgi.discovery.zookeeper.util.Utils;
+import org.apache.zookeeper.ZooKeeper;
+import org.osgi.framework.Bundle;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.Constants;
+import org.osgi.framework.ServiceFactory;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.remoteserviceadmin.EndpointListener;
+import org.osgi.service.remoteserviceadmin.RemoteConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Creates local EndpointListeners that publish to ZooKeeper.
+ */
+public class PublishingEndpointListenerFactory implements ServiceFactory<PublishingEndpointListener> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(PublishingEndpointListenerFactory.class);
+
+ private final BundleContext bctx;
+ private final ZooKeeper zk;
+ private final List<PublishingEndpointListener> listeners = new ArrayList<PublishingEndpointListener>();
+ private ServiceRegistration serviceRegistration;
+
+ public PublishingEndpointListenerFactory(ZooKeeper zk, BundleContext bctx) {
+ this.bctx = bctx;
+ this.zk = zk;
+ }
+
+ public PublishingEndpointListener getService(Bundle b, ServiceRegistration<PublishingEndpointListener> sr) {
+ LOG.debug("new EndpointListener from factory");
+ synchronized (listeners) {
+ PublishingEndpointListener pel = new PublishingEndpointListener(zk, bctx);
+ listeners.add(pel);
+ return pel;
+ }
+ }
+
+ public void ungetService(Bundle b, ServiceRegistration<PublishingEndpointListener> sr,
+ PublishingEndpointListener pel) {
+ LOG.debug("remove EndpointListener");
+ synchronized (listeners) {
+ if (listeners.remove(pel)) {
+ pel.close();
+ }
+ }
+ }
+
+ public synchronized void start() {
+ Dictionary<String, String> props = new Hashtable<String, String>();
+ props.put(EndpointListener.ENDPOINT_LISTENER_SCOPE,
+ "(&(" + Constants.OBJECTCLASS + "=*)(" + RemoteConstants.ENDPOINT_FRAMEWORK_UUID
+ + "=" + Utils.getUUID(bctx) + "))");
+ props.put(ZooKeeperDiscovery.DISCOVERY_ZOOKEEPER_ID, "true");
+ serviceRegistration = bctx.registerService(EndpointListener.class.getName(), this, props);
+ }
+
+ public synchronized void stop() {
+ if (serviceRegistration != null) {
+ serviceRegistration.unregister();
+ serviceRegistration = null;
+ }
+ synchronized (listeners) {
+ for (PublishingEndpointListener pel : listeners) {
+ pel.close();
+ }
+ listeners.clear();
+ }
+ }
+
+ /**
+ * Only for the test case!
+ */
+ protected List<PublishingEndpointListener> getListeners() {
+ synchronized (listeners) {
+ return listeners;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/d73a3a7f/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/EndpointListenerTracker.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/EndpointListenerTracker.java b/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/EndpointListenerTracker.java
new file mode 100644
index 0000000..4d0a25f
--- /dev/null
+++ b/discovery/zookeeper/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/EndpointListenerTracker.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.dosgi.discovery.zookeeper.subscribe;
+
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceReference;
+import org.osgi.service.remoteserviceadmin.EndpointListener;
+import org.osgi.util.tracker.ServiceTracker;
+
+/**
+ * Tracks interest in EndpointListeners. Delegates to InterfaceMonitorManager to manage
+ * interest in the scopes of each EndpointListener.
+ */
+public class EndpointListenerTracker extends ServiceTracker<EndpointListener, EndpointListener> {
+ private final InterfaceMonitorManager imManager;
+
+ public EndpointListenerTracker(BundleContext bctx, InterfaceMonitorManager imManager) {
+ super(bctx, EndpointListener.class, null);
+ this.imManager = imManager;
+ }
+
+ @Override
+ public EndpointListener addingService(ServiceReference<EndpointListener> endpointListener) {
+ imManager.addInterest(endpointListener);
+ return null;
+ }
+
+ @Override
+ public void modifiedService(ServiceReference<EndpointListener> endpointListener, EndpointListener service) {
+ // called when an EndpointListener updates its service properties,
+ // e.g. when its interest scope is expanded/reduced
+ imManager.addInterest(endpointListener);
+ }
+
+ @Override
+ public void removedService(ServiceReference<EndpointListener> endpointListener, EndpointListener service) {
+ imManager.removeInterest(endpointListener);
+ }
+
+}