You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@slider.apache.org by sm...@apache.org on 2014/05/28 04:21:19 UTC
[14/44] git commit: SLIDER-38 minicluster to be YARN service and
easily re-used, moved into slider-core JAR for other apps to pick up,
along with move of all zk/ package
SLIDER-38 minicluster to be YARN service and easily re-used, moved into slider-core JAR for other apps to pick up, along with move of all zk/ package
Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/3a5cf1ab
Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/3a5cf1ab
Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/3a5cf1ab
Branch: refs/heads/master
Commit: 3a5cf1ab1f674e7ba9b9fe6c28865a9f6ad2740f
Parents: ab246e2
Author: Steve Loughran <st...@hortonworks.com>
Authored: Thu May 22 15:40:47 2014 +0100
Committer: Steve Loughran <st...@hortonworks.com>
Committed: Thu May 22 15:40:47 2014 +0100
----------------------------------------------------------------------
.../org/apache/slider/client/SliderClient.java | 2 +-
.../slider/core/build/InstanceBuilder.java | 4 +-
.../core/registry/zk/BlockingZKWatcher.java | 63 ---
.../slider/core/registry/zk/ZKCallback.java | 31 --
.../slider/core/registry/zk/ZKIntegration.java | 280 ------------
.../slider/core/registry/zk/ZKPathBuilder.java | 82 ----
.../slider/core/registry/zk/ZookeeperUtils.java | 136 ------
.../slider/core/zk/BlockingZKWatcher.java | 63 +++
.../slider/core/zk/MiniZooKeeperCluster.java | 423 +++++++++++++++++++
.../org/apache/slider/core/zk/ZKCallback.java | 31 ++
.../apache/slider/core/zk/ZKIntegration.java | 280 ++++++++++++
.../apache/slider/core/zk/ZKPathBuilder.java | 82 ++++
.../apache/slider/core/zk/ZookeeperUtils.java | 136 ++++++
.../utility/AbstractSliderLaunchedService.java | 2 +-
.../common/tools/GroovyZKIntegration.groovy | 2 +-
.../common/tools/TestZKIntegration.groovy | 3 +-
.../apache/slider/test/MicroZKCluster.groovy | 18 +-
.../slider/test/YarnMiniClusterTestBase.groovy | 6 +-
.../test/YarnZKMiniClusterTestBase.groovy | 4 +-
.../accumulo/AccumuloProviderService.java | 3 +-
.../live/TestAccCorrectInstanceName.groovy | 2 +-
.../accumulo/live/TestAccFreezeThaw.groovy | 2 +-
.../accumulo/live/TestAccLiveHDFSArchive.groovy | 2 +-
.../live/TestAccLiveLocalArchive.groovy | 2 +-
.../accumulo/live/TestAccM1T1GC1Mon1.groovy | 2 +-
.../accumulo/live/TestAccM2T2GC1Mon1.groovy | 2 +-
.../accumulo/live/TestAccumuloAMWebApp.groovy | 2 +-
.../providers/hbase/HBaseClientProvider.java | 2 +-
.../minicluster/live/TestHBaseMaster.groovy | 2 +-
.../live/TestHBaseMasterWithBadHeap.groovy | 3 -
.../live/TestLiveRegionServiceOnHDFS.groovy | 2 +-
31 files changed, 1043 insertions(+), 631 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/3a5cf1ab/slider-core/src/main/java/org/apache/slider/client/SliderClient.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/client/SliderClient.java b/slider-core/src/main/java/org/apache/slider/client/SliderClient.java
index e36fcdf..95c120c 100644
--- a/slider-core/src/main/java/org/apache/slider/client/SliderClient.java
+++ b/slider-core/src/main/java/org/apache/slider/client/SliderClient.java
@@ -94,7 +94,7 @@ import org.apache.slider.core.registry.docstore.PublishedConfigurationOutputter;
import org.apache.slider.core.registry.info.RegisteredEndpoint;
import org.apache.slider.core.registry.info.ServiceInstanceData;
import org.apache.slider.core.registry.retrieve.RegistryRetriever;
-import org.apache.slider.core.registry.zk.ZKPathBuilder;
+import org.apache.slider.core.zk.ZKPathBuilder;
import org.apache.slider.providers.AbstractClientProvider;
import org.apache.slider.providers.SliderProviderFactory;
import org.apache.slider.providers.agent.AgentKeys;
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/3a5cf1ab/slider-core/src/main/java/org/apache/slider/core/build/InstanceBuilder.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/core/build/InstanceBuilder.java b/slider-core/src/main/java/org/apache/slider/core/build/InstanceBuilder.java
index 54a618a..0580013 100644
--- a/slider-core/src/main/java/org/apache/slider/core/build/InstanceBuilder.java
+++ b/slider-core/src/main/java/org/apache/slider/core/build/InstanceBuilder.java
@@ -40,8 +40,8 @@ import org.apache.slider.core.persist.ConfPersister;
import org.apache.slider.core.persist.InstancePaths;
import org.apache.slider.core.persist.LockAcquireFailedException;
import org.apache.slider.core.persist.LockHeldAction;
-import org.apache.slider.core.registry.zk.ZKPathBuilder;
-import org.apache.slider.core.registry.zk.ZookeeperUtils;
+import org.apache.slider.core.zk.ZKPathBuilder;
+import org.apache.slider.core.zk.ZookeeperUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/3a5cf1ab/slider-core/src/main/java/org/apache/slider/core/registry/zk/BlockingZKWatcher.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/core/registry/zk/BlockingZKWatcher.java b/slider-core/src/main/java/org/apache/slider/core/registry/zk/BlockingZKWatcher.java
deleted file mode 100644
index 68c617c..0000000
--- a/slider-core/src/main/java/org/apache/slider/core/registry/zk/BlockingZKWatcher.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.core.registry.zk;
-
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-
-public class BlockingZKWatcher implements Watcher {
-
- protected static final Logger log =
- LoggerFactory.getLogger(BlockingZKWatcher.class);
- private final AtomicBoolean connectedFlag = new AtomicBoolean(false);
-
- @Override
- public void process(WatchedEvent event) {
- log.info("ZK binding callback received");
- connectedFlag.set(true);
- synchronized (connectedFlag) {
- try {
- connectedFlag.notify();
- } catch (Exception e) {
- log.warn("failed while waiting for notification", e);
- }
- }
- }
-
- /**
- * Wait for a flag to go true
- * @param timeout timeout in millis
- */
-
- public void waitForZKConnection(int timeout) throws InterruptedException {
- synchronized (connectedFlag) {
- if (!connectedFlag.get()) {
- log.info("waiting for ZK event");
- //wait a bit
- connectedFlag.wait(timeout);
- }
- }
- assert connectedFlag.get();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/3a5cf1ab/slider-core/src/main/java/org/apache/slider/core/registry/zk/ZKCallback.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/core/registry/zk/ZKCallback.java b/slider-core/src/main/java/org/apache/slider/core/registry/zk/ZKCallback.java
deleted file mode 100644
index 0fd137e..0000000
--- a/slider-core/src/main/java/org/apache/slider/core/registry/zk/ZKCallback.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.core.registry.zk;
-
-import org.apache.zookeeper.Watcher;
-
-/**
- * Relays ZK watcher events to a closure
- */
-public abstract class ZKCallback implements Watcher {
-
- public ZKCallback() {
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/3a5cf1ab/slider-core/src/main/java/org/apache/slider/core/registry/zk/ZKIntegration.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/core/registry/zk/ZKIntegration.java b/slider-core/src/main/java/org/apache/slider/core/registry/zk/ZKIntegration.java
deleted file mode 100644
index c3ab0a5..0000000
--- a/slider-core/src/main/java/org/apache/slider/core/registry/zk/ZKIntegration.java
+++ /dev/null
@@ -1,280 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.core.registry.zk;
-
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.ACL;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-
-public class ZKIntegration implements Watcher {
-
-/**
- * Base path for services
- */
- public static String ZK_SERVICES = "services";
- /**
- * Base path for all Slider references
- */
- public static String ZK_SLIDER = "slider";
- public static String ZK_USERS = "users";
- public static String SVC_SLIDER = "/" + ZK_SERVICES + "/" + ZK_SLIDER;
- public static String SVC_SLIDER_USERS = SVC_SLIDER + "/" + ZK_USERS;
-
- public static List<String> ZK_USERS_PATH_LIST = new ArrayList<>();
- static {
- ZK_USERS_PATH_LIST.add(ZK_SERVICES);
- ZK_USERS_PATH_LIST.add(ZK_SLIDER);
- ZK_USERS_PATH_LIST.add(ZK_USERS);
- }
-
- public static int SESSION_TIMEOUT = 5000;
- protected static final Logger log =
- LoggerFactory.getLogger(ZKIntegration.class);
- private ZooKeeper zookeeper;
- private final String username;
- private final String clustername;
- private final String userPath;
- private int sessionTimeout = SESSION_TIMEOUT;
-/**
- flag to set to indicate that the user path should be created if
- it is not already there
- */
- private final AtomicBoolean toInit = new AtomicBoolean(false);
- private final boolean createClusterPath;
- private final Watcher watchEventHandler;
- private final String zkConnection;
- private final boolean canBeReadOnly;
-
- protected ZKIntegration(String zkConnection,
- String username,
- String clustername,
- boolean canBeReadOnly,
- boolean createClusterPath,
- Watcher watchEventHandler
- ) throws IOException {
- this.username = username;
- this.clustername = clustername;
- this.watchEventHandler = watchEventHandler;
- this.zkConnection = zkConnection;
- this.canBeReadOnly = canBeReadOnly;
- this.createClusterPath = createClusterPath;
- this.userPath = mkSliderUserPath(username);
- }
-
- public void init() throws IOException {
- assert zookeeper == null;
- log.debug("Binding ZK client to {}", zkConnection);
- zookeeper = new ZooKeeper(zkConnection, sessionTimeout, this, canBeReadOnly);
- }
-
- /**
- * Create an instance bonded to the specific closure
- * @param zkConnection
- * @param username
- * @param clustername
- * @param canBeReadOnly
- * @param watchEventHandler
- * @return
- * @throws IOException
- */
- public static ZKIntegration newInstance(String zkConnection, String username, String clustername, boolean createClusterPath, boolean canBeReadOnly, Watcher watchEventHandler) throws IOException {
-
- return new ZKIntegration(zkConnection,
- username,
- clustername,
- canBeReadOnly,
- createClusterPath,
- watchEventHandler);
- }
-
- public String getConnectionString() {
- return zkConnection;
- }
-
- public String getClusterPath() {
- return mkClusterPath(username, clustername);
- }
-
- public boolean getConnected() {
- return zookeeper.getState().isConnected();
- }
-
- public boolean getAlive() {
- return zookeeper.getState().isAlive();
- }
-
- public ZooKeeper.States getState() {
- return zookeeper.getState();
- }
-
- public Stat getClusterStat() throws KeeperException, InterruptedException {
- return stat(getClusterPath());
- }
-
- public boolean exists(String path) throws
- KeeperException,
- InterruptedException {
- return stat(path) != null;
- }
-
- public Stat stat(String path) throws KeeperException, InterruptedException {
- return zookeeper.exists(path, false);
- }
-
- @Override
- public String toString() {
- return "ZK integration bound @ " + zkConnection + ": " + zookeeper;
- }
-
-/**
- * Event handler to notify of state events
- * @param event
- */
- @Override
- public void process(WatchedEvent event) {
- log.debug("{}", event);
- try {
- maybeInit();
- } catch (Exception e) {
- log.error("Failed to init", e);
- }
- if (watchEventHandler != null) {
- watchEventHandler.process(event);
- }
- }
-
- private void maybeInit() throws KeeperException, InterruptedException {
- if (!toInit.getAndSet(true) && createClusterPath) {
- log.debug("initing");
- //create the user path
- mkPath(ZK_USERS_PATH_LIST, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- //create the specific user
- createPath(userPath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- }
- }
-
- /**
- * Create a path under a parent, don't care if it already exists
- * As the path isn't returned, this isn't the way to create sequentially
- * numbered nodes.
- * @param parent parent dir. Must have a trailing / if entry!=null||empty
- * @param entry entry -can be null or "", in which case it is not appended
- * @param acl
- * @param createMode
- * @return the path if created; null if not
- */
- public String createPath(String parent,
- String entry,
- List<ACL> acl,
- CreateMode createMode) throws KeeperException, InterruptedException {
- //initial create of full path
- assert acl != null;
- assert !acl.isEmpty();
- assert parent != null;
- String path = parent;
- if (entry != null) {
- path = path + entry;
- }
- try {
- log.debug("Creating ZK path {}", path);
- return zookeeper.create(path, null, acl, createMode);
- } catch (KeeperException.NodeExistsException ignored) {
- //node already there
- log.debug("node already present:{}",path);
- return null;
- }
- }
-
- /**
- * Recursive path create
- * @param path
- * @param data
- * @param acl
- * @param createMode
- */
- public void mkPath(List<String> paths,
- List<ACL> acl,
- CreateMode createMode) throws KeeperException, InterruptedException {
- String history = "/";
- for (String entry : paths) {
- createPath(history, entry, acl, createMode);
- history = history + entry + "/";
- }
- }
-
-/**
- * Blocking enum of users
- * @return an unordered list of clusters under a user
- */
- public List<String> getClusters() throws KeeperException, InterruptedException {
- return zookeeper.getChildren(userPath, null);
- }
-
- /**
- * Delete a node, does not throw an exception if the path is not fond
- * @param path path to delete
- * @return true if the path could be deleted, false if there was no node to delete
- *
- */
- public boolean delete(String path) throws
- InterruptedException,
- KeeperException {
- try {
- zookeeper.delete(path, -1);
- return true;
- } catch (KeeperException.NoNodeException ignored) {
- return false;
- }
- }
-
-/**
- * Build the path to a cluster; exists once the cluster has come up.
- * Even before that, a ZK watcher could wait for it.
- * @param username user
- * @param clustername name of the cluster
- * @return a strin
- */
- public static String mkClusterPath(String username, String clustername) {
- return mkSliderUserPath(username) + "/" + clustername;
- }
-/**
- * Build the path to a cluster; exists once the cluster has come up.
- * Even before that, a ZK watcher could wait for it.
- * @param username user
- * @return a string
- */
- public static String mkSliderUserPath(String username) {
- return SVC_SLIDER_USERS + "/" + username;
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/3a5cf1ab/slider-core/src/main/java/org/apache/slider/core/registry/zk/ZKPathBuilder.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/core/registry/zk/ZKPathBuilder.java b/slider-core/src/main/java/org/apache/slider/core/registry/zk/ZKPathBuilder.java
deleted file mode 100644
index c822749..0000000
--- a/slider-core/src/main/java/org/apache/slider/core/registry/zk/ZKPathBuilder.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.core.registry.zk;
-
-import java.util.Locale;
-
-public final class ZKPathBuilder {
-
- private final String username, appname, clustername;
- private final String quorum;
-
- private String appPath;
- private String registryPath;
- private final String appQuorum;
-
- public ZKPathBuilder(String username,
- String appname,
- String clustername,
- String quorum,
- String appQuorum) {
- this.username = username;
- this.appname = appname;
- this.clustername = clustername;
- this.quorum = quorum;
- appPath = buildAppPath();
- registryPath = buildRegistryPath();
- this.appQuorum = appQuorum;
- }
-
- public String buildAppPath() {
- return String.format(Locale.ENGLISH, "/yarnapps_%s_%s_%s", appname,
- username, clustername);
-
- }
-
- public String buildRegistryPath() {
- return String.format(Locale.ENGLISH, "/services_%s_%s_%s", appname,
- username, clustername);
-
- }
-
- public String getQuorum() {
- return quorum;
- }
-
- public String getAppQuorum() {
- return appQuorum;
- }
-
- public String getAppPath() {
- return appPath;
- }
-
- public void setAppPath(String appPath) {
- this.appPath = appPath;
- }
-
- public String getRegistryPath() {
- return registryPath;
- }
-
- public void setRegistryPath(String registryPath) {
- this.registryPath = registryPath;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/3a5cf1ab/slider-core/src/main/java/org/apache/slider/core/registry/zk/ZookeeperUtils.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/core/registry/zk/ZookeeperUtils.java b/slider-core/src/main/java/org/apache/slider/core/registry/zk/ZookeeperUtils.java
deleted file mode 100644
index 84a9321..0000000
--- a/slider-core/src/main/java/org/apache/slider/core/registry/zk/ZookeeperUtils.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.core.registry.zk;
-
-import com.google.common.net.HostAndPort;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.slider.common.tools.SliderUtils;
-import org.apache.slider.core.exceptions.BadConfigException;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class ZookeeperUtils {
-
- public static String buildConnectionString(String zkHosts, int port) {
- String zkPort = Integer.toString(port);
- //parse the hosts
- String[] hostlist = zkHosts.split(",", 0);
- String quorum = SliderUtils.join(hostlist, ":" + zkPort + ",");
- //this quorum has a trailing comma
- quorum = quorum.substring(0, quorum.length() - 1);
- return quorum;
- }
-
- /**
- * Take a quorum list and split it to (trimmed) pairs
- * @param hostPortQuorumList list of form h1:port, h2:port2,...
- * @return a possibly empty list of values between commas. They may not be
- * valid hostname:port pairs
- */
- public static List<String> splitToPairs(String hostPortQuorumList) {
- // split an address hot
- String[] strings = StringUtils.getStrings(hostPortQuorumList);
- List<String> tuples = new ArrayList<>(strings.length);
- for (String s : strings) {
- tuples.add(s.trim());
- }
- return tuples;
- }
-
- /**
- * Split a quorum list into a list of hostnames and ports
- * @param hostPortQuorumList split to a list of hosts and ports
- * @return a list of values
- */
- public static List<HostAndPort> splitToHostsAndPorts(String hostPortQuorumList) {
- // split an address hot
- String[] strings = StringUtils.getStrings(hostPortQuorumList);
- List<HostAndPort> list = new ArrayList<>(strings.length);
- for (String s : strings) {
- list.add(HostAndPort.fromString(s.trim()));
- }
- return list;
- }
-
- /**
- * Build up to a hosts only list
- * @param hostAndPorts
- * @return a list of the hosts only
- */
- public static String buildHostsOnlyList(List<HostAndPort> hostAndPorts) {
- StringBuilder sb = new StringBuilder();
- for (HostAndPort hostAndPort : hostAndPorts) {
- sb.append(hostAndPort.getHostText()).append(",");
- }
- if (sb.length() > 0) {
- sb.delete(sb.length() - 1, sb.length());
- }
- return sb.toString();
- }
-
- public static String buildQuorumEntry(HostAndPort hostAndPort,
- int defaultPort) {
- String s = hostAndPort.toString();
- if (hostAndPort.hasPort()) {
- return s;
- } else {
- return s + ":" + defaultPort;
- }
- }
-
- /**
- * Build a quorum list, injecting a ":defaultPort" ref if needed on
- * any entry without one
- * @param hostAndPorts
- * @param defaultPort
- * @return
- */
- public static String buildQuorum(List<HostAndPort> hostAndPorts, int defaultPort) {
- List<String> entries = new ArrayList<>(hostAndPorts.size());
- for (HostAndPort hostAndPort : hostAndPorts) {
- entries.add(buildQuorumEntry(hostAndPort, defaultPort));
- }
- return SliderUtils.join(entries, ",", false);
- }
-
- public static String convertToHostsOnlyList(String quorum) throws
- BadConfigException {
- List<HostAndPort> hostAndPorts = splitToHostsAndPortsStrictly(quorum);
- return ZookeeperUtils.buildHostsOnlyList(hostAndPorts);
- }
-
- public static List<HostAndPort> splitToHostsAndPortsStrictly(String quorum) throws
- BadConfigException {
- List<HostAndPort> hostAndPorts =
- ZookeeperUtils.splitToHostsAndPorts(quorum);
- if (hostAndPorts.isEmpty()) {
- throw new BadConfigException("empty zookeeper quorum");
- }
- return hostAndPorts;
- }
-
- public static int getFirstPort(String quorum, int defVal) throws
- BadConfigException {
- List<HostAndPort> hostAndPorts = splitToHostsAndPortsStrictly(quorum);
- int port = hostAndPorts.get(0).getPortOrDefault(defVal);
- return port;
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/3a5cf1ab/slider-core/src/main/java/org/apache/slider/core/zk/BlockingZKWatcher.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/core/zk/BlockingZKWatcher.java b/slider-core/src/main/java/org/apache/slider/core/zk/BlockingZKWatcher.java
new file mode 100644
index 0000000..62ebff3
--- /dev/null
+++ b/slider-core/src/main/java/org/apache/slider/core/zk/BlockingZKWatcher.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.core.zk;
+
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class BlockingZKWatcher implements Watcher {
+
+ protected static final Logger log =
+ LoggerFactory.getLogger(BlockingZKWatcher.class);
+ private final AtomicBoolean connectedFlag = new AtomicBoolean(false);
+
+ @Override
+ public void process(WatchedEvent event) {
+ log.info("ZK binding callback received");
+ connectedFlag.set(true);
+ synchronized (connectedFlag) {
+ try {
+ connectedFlag.notify();
+ } catch (Exception e) {
+ log.warn("failed while waiting for notification", e);
+ }
+ }
+ }
+
+ /**
+ * Wait for a flag to go true
+ * @param timeout timeout in millis
+ */
+
+ public void waitForZKConnection(int timeout) throws InterruptedException {
+ synchronized (connectedFlag) {
+ if (!connectedFlag.get()) {
+ log.info("waiting for ZK event");
+ //wait a bit
+ connectedFlag.wait(timeout);
+ }
+ }
+ assert connectedFlag.get();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/3a5cf1ab/slider-core/src/main/java/org/apache/slider/core/zk/MiniZooKeeperCluster.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/core/zk/MiniZooKeeperCluster.java b/slider-core/src/main/java/org/apache/slider/core/zk/MiniZooKeeperCluster.java
new file mode 100644
index 0000000..c8b3adb
--- /dev/null
+++ b/slider-core/src/main/java/org/apache/slider/core/zk/MiniZooKeeperCluster.java
@@ -0,0 +1,423 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.core.zk;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.persistence.FileTxnLog;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.Reader;
+import java.net.BindException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+
+/**
+ * This is a version of the HBase ZK cluster cut out to be standalone.
+ *
+ * <i>Important: keep this Java6 language level for now</i>
+ */
+public class MiniZooKeeperCluster extends AbstractService {
+ private static final Logger LOG = LoggerFactory.getLogger(
+ MiniZooKeeperCluster.class);
+
+ private static final int TICK_TIME = 2000;
+ private static final int CONNECTION_TIMEOUT = 30000;
+ public static final int MAX_CLIENT_CONNECTIONS = 1000;
+
+ private boolean started;
+
+ /** The default port. If zero, we use a random port. */
+ private int defaultClientPort = 0;
+
+ private int clientPort;
+
+ private final List<NIOServerCnxnFactory> standaloneServerFactoryList;
+ private final List<ZooKeeperServer> zooKeeperServers;
+ private final List<Integer> clientPortList;
+
+ private int activeZKServerIndex;
+ private int tickTime = 0;
+ private File baseDir;
+ private final int numZooKeeperServers;
+ private String zkQuorum = "";
+
+ public MiniZooKeeperCluster(int numZooKeeperServers) {
+ super("MiniZooKeeperCluster");
+ this.numZooKeeperServers = numZooKeeperServers;
+ this.started = false;
+ activeZKServerIndex = -1;
+ zooKeeperServers = new ArrayList<ZooKeeperServer>();
+ clientPortList = new ArrayList<Integer>();
+ standaloneServerFactoryList = new ArrayList<NIOServerCnxnFactory>();
+ }
+
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ super.serviceInit(conf);
+ }
+
+ public void setDefaultClientPort(int clientPort) {
+ if (clientPort <= 0) {
+ throw new IllegalArgumentException("Invalid default ZK client port: "
+ + clientPort);
+ }
+ this.defaultClientPort = clientPort;
+ }
+
+ /**
+ * Selects a ZK client port. Returns the default port if specified.
+ * Otherwise, returns a random port. The random port is selected from the
+ * range between 49152 to 65535. These ports cannot be registered with IANA
+ * and are intended for dynamic allocation (see http://bit.ly/dynports).
+ */
+ private int selectClientPort(Random r) {
+ if (defaultClientPort > 0) {
+ return defaultClientPort;
+ }
+ return 0xc000 + r.nextInt(0x3f00);
+ }
+
+ public void setTickTime(int tickTime) {
+ this.tickTime = tickTime;
+ }
+
+ public int getBackupZooKeeperServerNum() {
+ return zooKeeperServers.size() - 1;
+ }
+
+ public int getZooKeeperServerNum() {
+ return zooKeeperServers.size();
+ }
+
+ // / XXX: From o.a.zk.t.ClientBase
+ private static void setupTestEnv() {
+ // during the tests we run with 100K prealloc in the logs.
+ // on windows systems prealloc of 64M was seen to take ~15seconds
+ // resulting in test failure (client timeout on first session).
+ // set env and directly in order to handle static init/gc issues
+ System.setProperty("zookeeper.preAllocSize", "100");
+ FileTxnLog.setPreallocSize(100 * 1024);
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ startup();
+ }
+
+ /**
+ * @param baseDir
+ * @param numZooKeeperServers
+ * @return ClientPort server bound to, -1 if there was a
+ * binding problem and we couldn't pick another port.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ private int startup() throws IOException,
+ InterruptedException {
+ if (numZooKeeperServers <= 0)
+ return -1;
+
+ setupTestEnv();
+ started = true;
+ baseDir = File.createTempFile("zookeeper", ".dir");
+ recreateDir(baseDir);
+
+ StringBuilder quorumList = new StringBuilder();
+ Random rnd = new Random();
+ int tentativePort = selectClientPort(rnd);
+
+ // running all the ZK servers
+ for (int i = 0; i < numZooKeeperServers; i++) {
+ File dir = new File(baseDir, "zookeeper_" + i).getAbsoluteFile();
+ recreateDir(dir);
+ int tickTimeToUse;
+ if (this.tickTime > 0) {
+ tickTimeToUse = this.tickTime;
+ } else {
+ tickTimeToUse = TICK_TIME;
+ }
+ ZooKeeperServer server = new ZooKeeperServer(dir, dir, tickTimeToUse);
+ NIOServerCnxnFactory standaloneServerFactory;
+ while (true) {
+ try {
+ standaloneServerFactory = new NIOServerCnxnFactory();
+ standaloneServerFactory.configure(
+ new InetSocketAddress(tentativePort),
+ MAX_CLIENT_CONNECTIONS
+ );
+ } catch (BindException e) {
+ LOG.debug("Failed binding ZK Server to client port: " +
+ tentativePort, e);
+ // We're told to use some port but it's occupied, fail
+ if (defaultClientPort > 0) return -1;
+ // This port is already in use, try to use another.
+ tentativePort = selectClientPort(rnd);
+ continue;
+ }
+ break;
+ }
+
+ // Start up this ZK server
+ standaloneServerFactory.startup(server);
+ if (!waitForServerUp(tentativePort, CONNECTION_TIMEOUT)) {
+ throw new IOException("Waiting for startup of standalone server");
+ }
+
+ // We have selected this port as a client port.
+ clientPortList.add(tentativePort);
+ standaloneServerFactoryList.add(standaloneServerFactory);
+ zooKeeperServers.add(server);
+ if (quorumList.length() > 0) {
+ quorumList.append(",");
+ }
+ quorumList.append("localhost:").append(tentativePort);
+ tentativePort++; //for the next server
+ }
+
+ // set the first one to be active ZK; Others are backups
+ activeZKServerIndex = 0;
+
+ clientPort = clientPortList.get(activeZKServerIndex);
+ zkQuorum = quorumList.toString();
+ LOG.info("Started MiniZK Cluster and connect 1 ZK server " +
+ "on client port: " + clientPort);
+ return clientPort;
+ }
+
+ private void recreateDir(File dir) throws IOException {
+ if (dir.exists()) {
+ if (!FileUtil.fullyDelete(dir)) {
+ throw new IOException("Could not delete zk base directory: " + dir);
+ }
+ }
+ try {
+ dir.mkdirs();
+ } catch (SecurityException e) {
+ throw new IOException("creating dir: " + dir, e);
+ }
+ }
+
+ /**
+ * Delete the basedir
+ */
+ private void deleteBaseDir() {
+ if (baseDir != null) {
+ baseDir.delete();
+ baseDir = null;
+ }
+
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+
+ if (!started) {
+ return;
+ }
+ started = false;
+
+ try {
+ // shut down all the zk servers
+ for (int i = 0; i < standaloneServerFactoryList.size(); i++) {
+ NIOServerCnxnFactory standaloneServerFactory =
+ standaloneServerFactoryList.get(i);
+ int clientPort = clientPortList.get(i);
+
+ standaloneServerFactory.shutdown();
+ if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) {
+ throw new IOException("Waiting for shutdown of standalone server");
+ }
+ }
+ for (ZooKeeperServer zkServer : zooKeeperServers) {
+ //explicitly close ZKDatabase since ZookeeperServer does not close them
+ zkServer.getZKDatabase().close();
+ }
+ } finally {
+ // clear everything
+ activeZKServerIndex = 0;
+ standaloneServerFactoryList.clear();
+ clientPortList.clear();
+ zooKeeperServers.clear();
+ }
+
+ LOG.info("Shutdown MiniZK cluster with all ZK servers");
+ }
+
+ /**@return clientPort return clientPort if there is another ZK backup can run
+ * when killing the current active; return -1, if there is no backups.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public int killCurrentActiveZooKeeperServer() throws IOException,
+ InterruptedException {
+ if (!started || activeZKServerIndex < 0) {
+ return -1;
+ }
+
+ // Shutdown the current active one
+ NIOServerCnxnFactory standaloneServerFactory =
+ standaloneServerFactoryList.get(activeZKServerIndex);
+ int clientPort = clientPortList.get(activeZKServerIndex);
+
+ standaloneServerFactory.shutdown();
+ if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) {
+ throw new IOException("Waiting for shutdown of standalone server");
+ }
+
+ zooKeeperServers.get(activeZKServerIndex).getZKDatabase().close();
+
+ // remove the current active zk server
+ standaloneServerFactoryList.remove(activeZKServerIndex);
+ clientPortList.remove(activeZKServerIndex);
+ zooKeeperServers.remove(activeZKServerIndex);
+ LOG.info("Kill the current active ZK servers in the cluster " +
+ "on client port: " + clientPort);
+
+ if (standaloneServerFactoryList.size() == 0) {
+ // there is no backup servers;
+ return -1;
+ }
+ clientPort = clientPortList.get(activeZKServerIndex);
+ LOG.info("Activate a backup zk server in the cluster " +
+ "on client port: " + clientPort);
+ // return the next back zk server's port
+ return clientPort;
+ }
+
+ /**
+ * Kill one back up ZK servers
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public void killOneBackupZooKeeperServer() throws IOException,
+ InterruptedException {
+ if (!started || activeZKServerIndex < 0 ||
+ standaloneServerFactoryList.size() <= 1) {
+ return;
+ }
+
+ int backupZKServerIndex = activeZKServerIndex + 1;
+ // Shutdown the current active one
+ NIOServerCnxnFactory standaloneServerFactory =
+ standaloneServerFactoryList.get(backupZKServerIndex);
+ int clientPort = clientPortList.get(backupZKServerIndex);
+
+ standaloneServerFactory.shutdown();
+ if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) {
+ throw new IOException("Waiting for shutdown of standalone server");
+ }
+
+ zooKeeperServers.get(backupZKServerIndex).getZKDatabase().close();
+
+ // remove this backup zk server
+ standaloneServerFactoryList.remove(backupZKServerIndex);
+ clientPortList.remove(backupZKServerIndex);
+ zooKeeperServers.remove(backupZKServerIndex);
+ LOG.info("Kill one backup ZK servers in the cluster " +
+ "on client port: " + clientPort);
+ }
+
+ // XXX: From o.a.zk.t.ClientBase
+ private static boolean waitForServerDown(int port, long timeout) throws
+ InterruptedException {
+ long start = System.currentTimeMillis();
+ while (true) {
+ try {
+ Socket sock = null;
+ try {
+ sock = new Socket("localhost", port);
+ OutputStream outstream = sock.getOutputStream();
+ outstream.write("stat".getBytes());
+ outstream.flush();
+ } finally {
+ IOUtils.closeSocket(sock);
+ }
+ } catch (IOException e) {
+ return true;
+ }
+
+ if (System.currentTimeMillis() > start + timeout) {
+ break;
+ }
+ Thread.sleep(250);
+ }
+ return false;
+ }
+
+ // XXX: From o.a.zk.t.ClientBase
+ private static boolean waitForServerUp(int port, long timeout) throws
+ InterruptedException {
+ long start = System.currentTimeMillis();
+ while (true) {
+ try {
+ Socket sock = null;
+ sock = new Socket("localhost", port);
+ BufferedReader reader = null;
+ try {
+ OutputStream outstream = sock.getOutputStream();
+ outstream.write("stat".getBytes());
+ outstream.flush();
+
+ Reader isr = new InputStreamReader(sock.getInputStream());
+ reader = new BufferedReader(isr);
+ String line = reader.readLine();
+ if (line != null && line.startsWith("Zookeeper version:")) {
+ return true;
+ }
+ } finally {
+ IOUtils.closeSocket(sock);
+ IOUtils.closeStream(reader);
+ }
+ } catch (IOException e) {
+ // ignore as this is expected
+ LOG.debug("server localhost:" + port + " not up " + e);
+ }
+
+ if (System.currentTimeMillis() > start + timeout) {
+ break;
+ }
+ Thread.sleep(250);
+ }
+ return false;
+ }
+
+ public int getClientPort() {
+ return clientPort;
+ }
+
+ public String getZkQuorum() {
+ return zkQuorum;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/3a5cf1ab/slider-core/src/main/java/org/apache/slider/core/zk/ZKCallback.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/core/zk/ZKCallback.java b/slider-core/src/main/java/org/apache/slider/core/zk/ZKCallback.java
new file mode 100644
index 0000000..045b72c
--- /dev/null
+++ b/slider-core/src/main/java/org/apache/slider/core/zk/ZKCallback.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.core.zk;
+
+import org.apache.zookeeper.Watcher;
+
+/**
+ * Relays ZK watcher events to a closure
+ */
+public abstract class ZKCallback implements Watcher {
+
+ public ZKCallback() {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/3a5cf1ab/slider-core/src/main/java/org/apache/slider/core/zk/ZKIntegration.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/core/zk/ZKIntegration.java b/slider-core/src/main/java/org/apache/slider/core/zk/ZKIntegration.java
new file mode 100644
index 0000000..6270123
--- /dev/null
+++ b/slider-core/src/main/java/org/apache/slider/core/zk/ZKIntegration.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.core.zk;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+
+public class ZKIntegration implements Watcher {
+
+/**
+ * Base path for services
+ */
+ public static String ZK_SERVICES = "services";
+ /**
+ * Base path for all Slider references
+ */
+ public static String ZK_SLIDER = "slider";
+ public static String ZK_USERS = "users";
+ public static String SVC_SLIDER = "/" + ZK_SERVICES + "/" + ZK_SLIDER;
+ public static String SVC_SLIDER_USERS = SVC_SLIDER + "/" + ZK_USERS;
+
+ public static final List<String> ZK_USERS_PATH_LIST = new ArrayList<>();
+ static {
+ ZK_USERS_PATH_LIST.add(ZK_SERVICES);
+ ZK_USERS_PATH_LIST.add(ZK_SLIDER);
+ ZK_USERS_PATH_LIST.add(ZK_USERS);
+ }
+
+ public static int SESSION_TIMEOUT = 5000;
+ protected static final Logger log =
+ LoggerFactory.getLogger(ZKIntegration.class);
+ private ZooKeeper zookeeper;
+ private final String username;
+ private final String clustername;
+ private final String userPath;
+ private int sessionTimeout = SESSION_TIMEOUT;
+/**
+ flag to set to indicate that the user path should be created if
+ it is not already there
+ */
+ private final AtomicBoolean toInit = new AtomicBoolean(false);
+ private final boolean createClusterPath;
+ private final Watcher watchEventHandler;
+ private final String zkConnection;
+ private final boolean canBeReadOnly;
+
+ protected ZKIntegration(String zkConnection,
+ String username,
+ String clustername,
+ boolean canBeReadOnly,
+ boolean createClusterPath,
+ Watcher watchEventHandler
+ ) throws IOException {
+ this.username = username;
+ this.clustername = clustername;
+ this.watchEventHandler = watchEventHandler;
+ this.zkConnection = zkConnection;
+ this.canBeReadOnly = canBeReadOnly;
+ this.createClusterPath = createClusterPath;
+ this.userPath = mkSliderUserPath(username);
+ }
+
+ public void init() throws IOException {
+ assert zookeeper == null;
+ log.debug("Binding ZK client to {}", zkConnection);
+ zookeeper = new ZooKeeper(zkConnection, sessionTimeout, this, canBeReadOnly);
+ }
+
+ /**
+ * Create an instance bonded to the specific closure
+ * @param zkConnection
+ * @param username
+ * @param clustername
+ * @param canBeReadOnly
+ * @param watchEventHandler
+ * @return
+ * @throws IOException
+ */
+ public static ZKIntegration newInstance(String zkConnection, String username, String clustername, boolean createClusterPath, boolean canBeReadOnly, Watcher watchEventHandler) throws IOException {
+
+ return new ZKIntegration(zkConnection,
+ username,
+ clustername,
+ canBeReadOnly,
+ createClusterPath,
+ watchEventHandler);
+ }
+
+ public String getConnectionString() {
+ return zkConnection;
+ }
+
+ public String getClusterPath() {
+ return mkClusterPath(username, clustername);
+ }
+
+ public boolean getConnected() {
+ return zookeeper.getState().isConnected();
+ }
+
+ public boolean getAlive() {
+ return zookeeper.getState().isAlive();
+ }
+
+ public ZooKeeper.States getState() {
+ return zookeeper.getState();
+ }
+
+ public Stat getClusterStat() throws KeeperException, InterruptedException {
+ return stat(getClusterPath());
+ }
+
+ public boolean exists(String path) throws
+ KeeperException,
+ InterruptedException {
+ return stat(path) != null;
+ }
+
+ public Stat stat(String path) throws KeeperException, InterruptedException {
+ return zookeeper.exists(path, false);
+ }
+
+ @Override
+ public String toString() {
+ return "ZK integration bound @ " + zkConnection + ": " + zookeeper;
+ }
+
+/**
+ * Event handler to notify of state events
+ * @param event
+ */
+ @Override
+ public void process(WatchedEvent event) {
+ log.debug("{}", event);
+ try {
+ maybeInit();
+ } catch (Exception e) {
+ log.error("Failed to init", e);
+ }
+ if (watchEventHandler != null) {
+ watchEventHandler.process(event);
+ }
+ }
+
+ private void maybeInit() throws KeeperException, InterruptedException {
+ if (!toInit.getAndSet(true) && createClusterPath) {
+ log.debug("initing");
+ //create the user path
+ mkPath(ZK_USERS_PATH_LIST, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ //create the specific user
+ createPath(userPath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ }
+ }
+
+ /**
+ * Create a path under a parent, don't care if it already exists
+ * As the path isn't returned, this isn't the way to create sequentially
+ * numbered nodes.
+ * @param parent parent dir. Must have a trailing / if entry!=null||empty
+ * @param entry entry -can be null or "", in which case it is not appended
+ * @param acl
+ * @param createMode
+ * @return the path if created; null if not
+ */
+ public String createPath(String parent,
+ String entry,
+ List<ACL> acl,
+ CreateMode createMode) throws KeeperException, InterruptedException {
+ //initial create of full path
+ assert acl != null;
+ assert !acl.isEmpty();
+ assert parent != null;
+ String path = parent;
+ if (entry != null) {
+ path = path + entry;
+ }
+ try {
+ log.debug("Creating ZK path {}", path);
+ return zookeeper.create(path, null, acl, createMode);
+ } catch (KeeperException.NodeExistsException ignored) {
+ //node already there
+ log.debug("node already present:{}",path);
+ return null;
+ }
+ }
+
+ /**
+ * Recursive path create
+ * @param path
+ * @param data
+ * @param acl
+ * @param createMode
+ */
+ public void mkPath(List<String> paths,
+ List<ACL> acl,
+ CreateMode createMode) throws KeeperException, InterruptedException {
+ String history = "/";
+ for (String entry : paths) {
+ createPath(history, entry, acl, createMode);
+ history = history + entry + "/";
+ }
+ }
+
+/**
+ * Blocking enum of users
+ * @return an unordered list of clusters under a user
+ */
+ public List<String> getClusters() throws KeeperException, InterruptedException {
+ return zookeeper.getChildren(userPath, null);
+ }
+
+ /**
+ * Delete a node, does not throw an exception if the path is not fond
+ * @param path path to delete
+ * @return true if the path could be deleted, false if there was no node to delete
+ *
+ */
+ public boolean delete(String path) throws
+ InterruptedException,
+ KeeperException {
+ try {
+ zookeeper.delete(path, -1);
+ return true;
+ } catch (KeeperException.NoNodeException ignored) {
+ return false;
+ }
+ }
+
+/**
+ * Build the path to a cluster; exists once the cluster has come up.
+ * Even before that, a ZK watcher could wait for it.
+ * @param username user
+ * @param clustername name of the cluster
+ * @return a strin
+ */
+ public static String mkClusterPath(String username, String clustername) {
+ return mkSliderUserPath(username) + "/" + clustername;
+ }
+/**
+ * Build the path to a cluster; exists once the cluster has come up.
+ * Even before that, a ZK watcher could wait for it.
+ * @param username user
+ * @return a string
+ */
+ public static String mkSliderUserPath(String username) {
+ return SVC_SLIDER_USERS + "/" + username;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/3a5cf1ab/slider-core/src/main/java/org/apache/slider/core/zk/ZKPathBuilder.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/core/zk/ZKPathBuilder.java b/slider-core/src/main/java/org/apache/slider/core/zk/ZKPathBuilder.java
new file mode 100644
index 0000000..b088568
--- /dev/null
+++ b/slider-core/src/main/java/org/apache/slider/core/zk/ZKPathBuilder.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.core.zk;
+
+import java.util.Locale;
+
+public final class ZKPathBuilder {
+
+ private final String username, appname, clustername;
+ private final String quorum;
+
+ private String appPath;
+ private String registryPath;
+ private final String appQuorum;
+
+ public ZKPathBuilder(String username,
+ String appname,
+ String clustername,
+ String quorum,
+ String appQuorum) {
+ this.username = username;
+ this.appname = appname;
+ this.clustername = clustername;
+ this.quorum = quorum;
+ appPath = buildAppPath();
+ registryPath = buildRegistryPath();
+ this.appQuorum = appQuorum;
+ }
+
+ public String buildAppPath() {
+ return String.format(Locale.ENGLISH, "/yarnapps_%s_%s_%s", appname,
+ username, clustername);
+
+ }
+
+ public String buildRegistryPath() {
+ return String.format(Locale.ENGLISH, "/services_%s_%s_%s", appname,
+ username, clustername);
+
+ }
+
+ public String getQuorum() {
+ return quorum;
+ }
+
+ public String getAppQuorum() {
+ return appQuorum;
+ }
+
+ public String getAppPath() {
+ return appPath;
+ }
+
+ public void setAppPath(String appPath) {
+ this.appPath = appPath;
+ }
+
+ public String getRegistryPath() {
+ return registryPath;
+ }
+
+ public void setRegistryPath(String registryPath) {
+ this.registryPath = registryPath;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/3a5cf1ab/slider-core/src/main/java/org/apache/slider/core/zk/ZookeeperUtils.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/core/zk/ZookeeperUtils.java b/slider-core/src/main/java/org/apache/slider/core/zk/ZookeeperUtils.java
new file mode 100644
index 0000000..90029ca
--- /dev/null
+++ b/slider-core/src/main/java/org/apache/slider/core/zk/ZookeeperUtils.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.core.zk;
+
+import com.google.common.net.HostAndPort;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.slider.common.tools.SliderUtils;
+import org.apache.slider.core.exceptions.BadConfigException;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class ZookeeperUtils {
+
+ public static String buildConnectionString(String zkHosts, int port) {
+ String zkPort = Integer.toString(port);
+ //parse the hosts
+ String[] hostlist = zkHosts.split(",", 0);
+ String quorum = SliderUtils.join(hostlist, ":" + zkPort + ",");
+ //this quorum has a trailing comma
+ quorum = quorum.substring(0, quorum.length() - 1);
+ return quorum;
+ }
+
+ /**
+ * Take a quorum list and split it to (trimmed) pairs
+ * @param hostPortQuorumList list of form h1:port, h2:port2,...
+ * @return a possibly empty list of values between commas. They may not be
+ * valid hostname:port pairs
+ */
+ public static List<String> splitToPairs(String hostPortQuorumList) {
+ // split an address hot
+ String[] strings = StringUtils.getStrings(hostPortQuorumList);
+ List<String> tuples = new ArrayList<>(strings.length);
+ for (String s : strings) {
+ tuples.add(s.trim());
+ }
+ return tuples;
+ }
+
+ /**
+ * Split a quorum list into a list of hostnames and ports
+ * @param hostPortQuorumList split to a list of hosts and ports
+ * @return a list of values
+ */
+ public static List<HostAndPort> splitToHostsAndPorts(String hostPortQuorumList) {
+ // split an address hot
+ String[] strings = StringUtils.getStrings(hostPortQuorumList);
+ List<HostAndPort> list = new ArrayList<>(strings.length);
+ for (String s : strings) {
+ list.add(HostAndPort.fromString(s.trim()));
+ }
+ return list;
+ }
+
+ /**
+ * Build up to a hosts only list
+ * @param hostAndPorts
+ * @return a list of the hosts only
+ */
+ public static String buildHostsOnlyList(List<HostAndPort> hostAndPorts) {
+ StringBuilder sb = new StringBuilder();
+ for (HostAndPort hostAndPort : hostAndPorts) {
+ sb.append(hostAndPort.getHostText()).append(",");
+ }
+ if (sb.length() > 0) {
+ sb.delete(sb.length() - 1, sb.length());
+ }
+ return sb.toString();
+ }
+
+ public static String buildQuorumEntry(HostAndPort hostAndPort,
+ int defaultPort) {
+ String s = hostAndPort.toString();
+ if (hostAndPort.hasPort()) {
+ return s;
+ } else {
+ return s + ":" + defaultPort;
+ }
+ }
+
+ /**
+ * Build a quorum list, injecting a ":defaultPort" ref if needed on
+ * any entry without one
+ * @param hostAndPorts
+ * @param defaultPort
+ * @return
+ */
+ public static String buildQuorum(List<HostAndPort> hostAndPorts, int defaultPort) {
+ List<String> entries = new ArrayList<>(hostAndPorts.size());
+ for (HostAndPort hostAndPort : hostAndPorts) {
+ entries.add(buildQuorumEntry(hostAndPort, defaultPort));
+ }
+ return SliderUtils.join(entries, ",", false);
+ }
+
+ public static String convertToHostsOnlyList(String quorum) throws
+ BadConfigException {
+ List<HostAndPort> hostAndPorts = splitToHostsAndPortsStrictly(quorum);
+ return ZookeeperUtils.buildHostsOnlyList(hostAndPorts);
+ }
+
+ public static List<HostAndPort> splitToHostsAndPortsStrictly(String quorum) throws
+ BadConfigException {
+ List<HostAndPort> hostAndPorts =
+ ZookeeperUtils.splitToHostsAndPorts(quorum);
+ if (hostAndPorts.isEmpty()) {
+ throw new BadConfigException("empty zookeeper quorum");
+ }
+ return hostAndPorts;
+ }
+
+ public static int getFirstPort(String quorum, int defVal) throws
+ BadConfigException {
+ List<HostAndPort> hostAndPorts = splitToHostsAndPortsStrictly(quorum);
+ int port = hostAndPorts.get(0).getPortOrDefault(defVal);
+ return port;
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/3a5cf1ab/slider-core/src/main/java/org/apache/slider/server/services/utility/AbstractSliderLaunchedService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/utility/AbstractSliderLaunchedService.java b/slider-core/src/main/java/org/apache/slider/server/services/utility/AbstractSliderLaunchedService.java
index aa594b3..5d37c32 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/utility/AbstractSliderLaunchedService.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/utility/AbstractSliderLaunchedService.java
@@ -23,7 +23,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.slider.common.SliderXmlConfKeys;
import org.apache.slider.common.tools.SliderUtils;
import org.apache.slider.core.exceptions.BadConfigException;
-import org.apache.slider.core.registry.zk.ZookeeperUtils;
+import org.apache.slider.core.zk.ZookeeperUtils;
import org.apache.slider.server.services.curator.CuratorHelper;
import org.apache.slider.server.services.registry.SliderRegistryService;
import org.slf4j.Logger;
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/3a5cf1ab/slider-core/src/test/groovy/org/apache/slider/common/tools/GroovyZKIntegration.groovy
----------------------------------------------------------------------
diff --git a/slider-core/src/test/groovy/org/apache/slider/common/tools/GroovyZKIntegration.groovy b/slider-core/src/test/groovy/org/apache/slider/common/tools/GroovyZKIntegration.groovy
index 49701e3..541e426 100644
--- a/slider-core/src/test/groovy/org/apache/slider/common/tools/GroovyZKIntegration.groovy
+++ b/slider-core/src/test/groovy/org/apache/slider/common/tools/GroovyZKIntegration.groovy
@@ -18,7 +18,7 @@
package org.apache.slider.common.tools
-import org.apache.slider.core.registry.zk.ZKCallback
+import org.apache.slider.core.zk.ZKCallback
import org.apache.zookeeper.WatchedEvent
class GroovyZKIntegration {
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/3a5cf1ab/slider-core/src/test/groovy/org/apache/slider/common/tools/TestZKIntegration.groovy
----------------------------------------------------------------------
diff --git a/slider-core/src/test/groovy/org/apache/slider/common/tools/TestZKIntegration.groovy b/slider-core/src/test/groovy/org/apache/slider/common/tools/TestZKIntegration.groovy
index 0b4c66f..3930864 100644
--- a/slider-core/src/test/groovy/org/apache/slider/common/tools/TestZKIntegration.groovy
+++ b/slider-core/src/test/groovy/org/apache/slider/common/tools/TestZKIntegration.groovy
@@ -18,10 +18,9 @@
package org.apache.slider.common.tools
-import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import org.apache.hadoop.conf.Configuration
-import org.apache.slider.core.registry.zk.ZKIntegration
+import org.apache.slider.core.zk.ZKIntegration
import org.apache.slider.test.KeysForTests
import org.apache.slider.test.YarnZKMiniClusterTestBase
import org.apache.zookeeper.CreateMode
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/3a5cf1ab/slider-core/src/test/groovy/org/apache/slider/test/MicroZKCluster.groovy
----------------------------------------------------------------------
diff --git a/slider-core/src/test/groovy/org/apache/slider/test/MicroZKCluster.groovy b/slider-core/src/test/groovy/org/apache/slider/test/MicroZKCluster.groovy
index d0da914..cb1d9b5 100644
--- a/slider-core/src/test/groovy/org/apache/slider/test/MicroZKCluster.groovy
+++ b/slider-core/src/test/groovy/org/apache/slider/test/MicroZKCluster.groovy
@@ -21,8 +21,8 @@ package org.apache.slider.test
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster
import org.apache.slider.common.tools.SliderUtils
+import org.apache.slider.core.zk.MiniZooKeeperCluster
@Slf4j
@CompileStatic
@@ -30,7 +30,6 @@ class MicroZKCluster implements Closeable {
public static final String HOSTS = "127.0.0.1"
MiniZooKeeperCluster zkCluster
- File baseDir
String zkBindingString
Configuration conf
int port
@@ -44,23 +43,20 @@ class MicroZKCluster implements Closeable {
}
void createCluster() {
- zkCluster = new MiniZooKeeperCluster(conf)
- baseDir = File.createTempFile("zookeeper", ".dir")
- baseDir.delete()
- baseDir.mkdirs()
- port = zkCluster.startup(baseDir)
- zkBindingString = HOSTS + ":" + port
+ zkCluster = new MiniZooKeeperCluster(1)
+ zkCluster.init(conf)
+ zkCluster.start()
+ zkBindingString = zkCluster.zkQuorum
log.info("Created $this")
}
@Override
void close() throws IOException {
- zkCluster?.shutdown();
- baseDir?.deleteDir()
+ zkCluster?.stop()
}
@Override
String toString() {
- return "Micro ZK cluster as $zkBindingString data=$baseDir"
+ return "Micro ZK cluster as $zkBindingString"
}
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/3a5cf1ab/slider-core/src/test/groovy/org/apache/slider/test/YarnMiniClusterTestBase.groovy
----------------------------------------------------------------------
diff --git a/slider-core/src/test/groovy/org/apache/slider/test/YarnMiniClusterTestBase.groovy b/slider-core/src/test/groovy/org/apache/slider/test/YarnMiniClusterTestBase.groovy
index fe1ffc1..d1cc5ca 100644
--- a/slider-core/src/test/groovy/org/apache/slider/test/YarnMiniClusterTestBase.groovy
+++ b/slider-core/src/test/groovy/org/apache/slider/test/YarnMiniClusterTestBase.groovy
@@ -55,7 +55,6 @@ import org.junit.rules.Timeout
import static org.apache.slider.common.SliderXMLConfKeysForTesting.*
import static org.apache.slider.test.KeysForTests.*
-import static org.apache.slider.test.SliderTestUtils.log
/**
* Base class for mini cluster tests -creates a field for the
@@ -88,7 +87,6 @@ public abstract class YarnMiniClusterTestBase extends ServiceLauncherBaseTest {
SLIDER_CONFIG.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, false)
SLIDER_CONFIG.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, false)
SLIDER_CONFIG.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 1)
-
}
@@ -194,8 +192,8 @@ public abstract class YarnMiniClusterTestBase extends ServiceLauncherBaseTest {
}
public void stopMiniCluster() {
- Log l = LogFactory.getLog(this.getClass())
- ServiceOperations.stopQuietly(l, miniCluster)
+ Log commonslog = LogFactory.getLog(this.class)
+ ServiceOperations.stopQuietly(commonslog, miniCluster)
hdfsCluster?.shutdown();
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/3a5cf1ab/slider-core/src/test/groovy/org/apache/slider/test/YarnZKMiniClusterTestBase.groovy
----------------------------------------------------------------------
diff --git a/slider-core/src/test/groovy/org/apache/slider/test/YarnZKMiniClusterTestBase.groovy b/slider-core/src/test/groovy/org/apache/slider/test/YarnZKMiniClusterTestBase.groovy
index dcc07c6..2228580 100644
--- a/slider-core/src/test/groovy/org/apache/slider/test/YarnZKMiniClusterTestBase.groovy
+++ b/slider-core/src/test/groovy/org/apache/slider/test/YarnZKMiniClusterTestBase.groovy
@@ -23,8 +23,8 @@ import groovy.util.logging.Slf4j
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.slider.common.SliderXmlConfKeys
-import org.apache.slider.core.registry.zk.BlockingZKWatcher
-import org.apache.slider.core.registry.zk.ZKIntegration
+import org.apache.slider.core.zk.BlockingZKWatcher
+import org.apache.slider.core.zk.ZKIntegration
import java.util.concurrent.atomic.AtomicBoolean
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/3a5cf1ab/slider-providers/accumulo/slider-accumulo-provider/src/main/java/org/apache/slider/providers/accumulo/AccumuloProviderService.java
----------------------------------------------------------------------
diff --git a/slider-providers/accumulo/slider-accumulo-provider/src/main/java/org/apache/slider/providers/accumulo/AccumuloProviderService.java b/slider-providers/accumulo/slider-accumulo-provider/src/main/java/org/apache/slider/providers/accumulo/AccumuloProviderService.java
index 8396ff1..d35ed66 100644
--- a/slider-providers/accumulo/slider-accumulo-provider/src/main/java/org/apache/slider/providers/accumulo/AccumuloProviderService.java
+++ b/slider-providers/accumulo/slider-accumulo-provider/src/main/java/org/apache/slider/providers/accumulo/AccumuloProviderService.java
@@ -44,7 +44,7 @@ import org.apache.slider.providers.ProviderRole;
import org.apache.slider.providers.ProviderUtils;
import org.apache.slider.common.tools.SliderFileSystem;
import org.apache.slider.common.tools.SliderUtils;
-import org.apache.slider.core.registry.zk.BlockingZKWatcher;
+import org.apache.slider.core.zk.BlockingZKWatcher;
import org.apache.slider.common.tools.ConfigHelper;
import org.apache.slider.server.services.utility.EventCallback;
import org.apache.slider.server.services.utility.EventNotifyingService;
@@ -56,7 +56,6 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
-import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/3a5cf1ab/slider-providers/accumulo/slider-accumulo-provider/src/test/groovy/org/apache/slider/providers/accumulo/live/TestAccCorrectInstanceName.groovy
----------------------------------------------------------------------
diff --git a/slider-providers/accumulo/slider-accumulo-provider/src/test/groovy/org/apache/slider/providers/accumulo/live/TestAccCorrectInstanceName.groovy b/slider-providers/accumulo/slider-accumulo-provider/src/test/groovy/org/apache/slider/providers/accumulo/live/TestAccCorrectInstanceName.groovy
index ad8981f..89dc90c 100644
--- a/slider-providers/accumulo/slider-accumulo-provider/src/test/groovy/org/apache/slider/providers/accumulo/live/TestAccCorrectInstanceName.groovy
+++ b/slider-providers/accumulo/slider-accumulo-provider/src/test/groovy/org/apache/slider/providers/accumulo/live/TestAccCorrectInstanceName.groovy
@@ -30,7 +30,7 @@ import org.apache.slider.providers.accumulo.AccumuloKeys
import org.apache.slider.common.params.Arguments
import org.apache.slider.client.SliderClient
import org.apache.slider.providers.accumulo.AccumuloTestBase
-import org.apache.slider.core.registry.zk.ZKIntegration
+import org.apache.slider.core.zk.ZKIntegration
import org.junit.Test
@CompileStatic
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/3a5cf1ab/slider-providers/accumulo/slider-accumulo-provider/src/test/groovy/org/apache/slider/providers/accumulo/live/TestAccFreezeThaw.groovy
----------------------------------------------------------------------
diff --git a/slider-providers/accumulo/slider-accumulo-provider/src/test/groovy/org/apache/slider/providers/accumulo/live/TestAccFreezeThaw.groovy b/slider-providers/accumulo/slider-accumulo-provider/src/test/groovy/org/apache/slider/providers/accumulo/live/TestAccFreezeThaw.groovy
index b3eecd2..3983a5d 100644
--- a/slider-providers/accumulo/slider-accumulo-provider/src/test/groovy/org/apache/slider/providers/accumulo/live/TestAccFreezeThaw.groovy
+++ b/slider-providers/accumulo/slider-accumulo-provider/src/test/groovy/org/apache/slider/providers/accumulo/live/TestAccFreezeThaw.groovy
@@ -25,7 +25,7 @@ import org.apache.slider.providers.accumulo.AccumuloConfigFileOptions
import org.apache.slider.providers.accumulo.AccumuloKeys
import org.apache.slider.client.SliderClient
import org.apache.slider.providers.accumulo.AccumuloTestBase
-import org.apache.slider.core.registry.zk.ZKIntegration
+import org.apache.slider.core.zk.ZKIntegration
import org.junit.Test
@CompileStatic
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/3a5cf1ab/slider-providers/accumulo/slider-accumulo-provider/src/test/groovy/org/apache/slider/providers/accumulo/live/TestAccLiveHDFSArchive.groovy
----------------------------------------------------------------------
diff --git a/slider-providers/accumulo/slider-accumulo-provider/src/test/groovy/org/apache/slider/providers/accumulo/live/TestAccLiveHDFSArchive.groovy b/slider-providers/accumulo/slider-accumulo-provider/src/test/groovy/org/apache/slider/providers/accumulo/live/TestAccLiveHDFSArchive.groovy
index 7f16a6f..2220cdc 100644
--- a/slider-providers/accumulo/slider-accumulo-provider/src/test/groovy/org/apache/slider/providers/accumulo/live/TestAccLiveHDFSArchive.groovy
+++ b/slider-providers/accumulo/slider-accumulo-provider/src/test/groovy/org/apache/slider/providers/accumulo/live/TestAccLiveHDFSArchive.groovy
@@ -27,7 +27,7 @@ import org.apache.slider.providers.accumulo.AccumuloConfigFileOptions
import org.apache.slider.providers.accumulo.AccumuloKeys
import org.apache.slider.client.SliderClient
import org.apache.slider.providers.accumulo.AccumuloTestBase
-import org.apache.slider.core.registry.zk.ZKIntegration
+import org.apache.slider.core.zk.ZKIntegration
import org.junit.Test
@CompileStatic
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/3a5cf1ab/slider-providers/accumulo/slider-accumulo-provider/src/test/groovy/org/apache/slider/providers/accumulo/live/TestAccLiveLocalArchive.groovy
----------------------------------------------------------------------
diff --git a/slider-providers/accumulo/slider-accumulo-provider/src/test/groovy/org/apache/slider/providers/accumulo/live/TestAccLiveLocalArchive.groovy b/slider-providers/accumulo/slider-accumulo-provider/src/test/groovy/org/apache/slider/providers/accumulo/live/TestAccLiveLocalArchive.groovy
index e8f6edb..1e3adbd 100644
--- a/slider-providers/accumulo/slider-accumulo-provider/src/test/groovy/org/apache/slider/providers/accumulo/live/TestAccLiveLocalArchive.groovy
+++ b/slider-providers/accumulo/slider-accumulo-provider/src/test/groovy/org/apache/slider/providers/accumulo/live/TestAccLiveLocalArchive.groovy
@@ -27,7 +27,7 @@ import org.apache.slider.providers.accumulo.AccumuloConfigFileOptions
import org.apache.slider.providers.accumulo.AccumuloKeys
import org.apache.slider.client.SliderClient
import org.apache.slider.providers.accumulo.AccumuloTestBase
-import org.apache.slider.core.registry.zk.ZKIntegration
+import org.apache.slider.core.zk.ZKIntegration
import org.junit.Test
@CompileStatic
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/3a5cf1ab/slider-providers/accumulo/slider-accumulo-provider/src/test/groovy/org/apache/slider/providers/accumulo/live/TestAccM1T1GC1Mon1.groovy
----------------------------------------------------------------------
diff --git a/slider-providers/accumulo/slider-accumulo-provider/src/test/groovy/org/apache/slider/providers/accumulo/live/TestAccM1T1GC1Mon1.groovy b/slider-providers/accumulo/slider-accumulo-provider/src/test/groovy/org/apache/slider/providers/accumulo/live/TestAccM1T1GC1Mon1.groovy
index 0db2602..9d2cee6 100644
--- a/slider-providers/accumulo/slider-accumulo-provider/src/test/groovy/org/apache/slider/providers/accumulo/live/TestAccM1T1GC1Mon1.groovy
+++ b/slider-providers/accumulo/slider-accumulo-provider/src/test/groovy/org/apache/slider/providers/accumulo/live/TestAccM1T1GC1Mon1.groovy
@@ -26,7 +26,7 @@ import org.apache.slider.api.ClusterDescription
import org.apache.slider.providers.accumulo.AccumuloKeys
import org.apache.slider.client.SliderClient
import org.apache.slider.providers.accumulo.AccumuloTestBase
-import org.apache.slider.core.registry.zk.ZKIntegration
+import org.apache.slider.core.zk.ZKIntegration
import org.junit.Test
@CompileStatic
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/3a5cf1ab/slider-providers/accumulo/slider-accumulo-provider/src/test/groovy/org/apache/slider/providers/accumulo/live/TestAccM2T2GC1Mon1.groovy
----------------------------------------------------------------------
diff --git a/slider-providers/accumulo/slider-accumulo-provider/src/test/groovy/org/apache/slider/providers/accumulo/live/TestAccM2T2GC1Mon1.groovy b/slider-providers/accumulo/slider-accumulo-provider/src/test/groovy/org/apache/slider/providers/accumulo/live/TestAccM2T2GC1Mon1.groovy
index bf403b8..2494a56 100644
--- a/slider-providers/accumulo/slider-accumulo-provider/src/test/groovy/org/apache/slider/providers/accumulo/live/TestAccM2T2GC1Mon1.groovy
+++ b/slider-providers/accumulo/slider-accumulo-provider/src/test/groovy/org/apache/slider/providers/accumulo/live/TestAccM2T2GC1Mon1.groovy
@@ -27,7 +27,7 @@ import org.apache.slider.providers.accumulo.AccumuloConfigFileOptions
import org.apache.slider.providers.accumulo.AccumuloKeys
import org.apache.slider.client.SliderClient
import org.apache.slider.providers.accumulo.AccumuloTestBase
-import org.apache.slider.core.registry.zk.ZKIntegration
+import org.apache.slider.core.zk.ZKIntegration
import org.junit.Test
@CompileStatic
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/3a5cf1ab/slider-providers/accumulo/slider-accumulo-provider/src/test/groovy/org/apache/slider/providers/accumulo/live/TestAccumuloAMWebApp.groovy
----------------------------------------------------------------------
diff --git a/slider-providers/accumulo/slider-accumulo-provider/src/test/groovy/org/apache/slider/providers/accumulo/live/TestAccumuloAMWebApp.groovy b/slider-providers/accumulo/slider-accumulo-provider/src/test/groovy/org/apache/slider/providers/accumulo/live/TestAccumuloAMWebApp.groovy
index 9eb2717..bd975e4 100644
--- a/slider-providers/accumulo/slider-accumulo-provider/src/test/groovy/org/apache/slider/providers/accumulo/live/TestAccumuloAMWebApp.groovy
+++ b/slider-providers/accumulo/slider-accumulo-provider/src/test/groovy/org/apache/slider/providers/accumulo/live/TestAccumuloAMWebApp.groovy
@@ -29,7 +29,7 @@ import org.apache.slider.providers.accumulo.AccumuloKeys
import org.apache.slider.server.appmaster.web.SliderAMWebApp
import org.apache.slider.client.SliderClient
import org.apache.slider.providers.accumulo.AccumuloTestBase
-import org.apache.slider.core.registry.zk.ZKIntegration
+import org.apache.slider.core.zk.ZKIntegration
import org.junit.Test
@CompileStatic
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/3a5cf1ab/slider-providers/hbase/slider-hbase-provider/src/main/java/org/apache/slider/providers/hbase/HBaseClientProvider.java
----------------------------------------------------------------------
diff --git a/slider-providers/hbase/slider-hbase-provider/src/main/java/org/apache/slider/providers/hbase/HBaseClientProvider.java b/slider-providers/hbase/slider-hbase-provider/src/main/java/org/apache/slider/providers/hbase/HBaseClientProvider.java
index 7b19dd2..c40c5f2 100644
--- a/slider-providers/hbase/slider-hbase-provider/src/main/java/org/apache/slider/providers/hbase/HBaseClientProvider.java
+++ b/slider-providers/hbase/slider-hbase-provider/src/main/java/org/apache/slider/providers/hbase/HBaseClientProvider.java
@@ -37,7 +37,7 @@ import org.apache.slider.providers.ProviderUtils;
import org.apache.slider.common.tools.ConfigHelper;
import org.apache.slider.common.tools.SliderFileSystem;
import org.apache.slider.common.tools.SliderUtils;
-import org.apache.slider.core.registry.zk.ZookeeperUtils;
+import org.apache.slider.core.zk.ZookeeperUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/3a5cf1ab/slider-providers/hbase/slider-hbase-provider/src/test/groovy/org/apache/slider/providers/hbase/minicluster/live/TestHBaseMaster.groovy
----------------------------------------------------------------------
diff --git a/slider-providers/hbase/slider-hbase-provider/src/test/groovy/org/apache/slider/providers/hbase/minicluster/live/TestHBaseMaster.groovy b/slider-providers/hbase/slider-hbase-provider/src/test/groovy/org/apache/slider/providers/hbase/minicluster/live/TestHBaseMaster.groovy
index 52bc004..97714d6 100644
--- a/slider-providers/hbase/slider-hbase-provider/src/test/groovy/org/apache/slider/providers/hbase/minicluster/live/TestHBaseMaster.groovy
+++ b/slider-providers/hbase/slider-hbase-provider/src/test/groovy/org/apache/slider/providers/hbase/minicluster/live/TestHBaseMaster.groovy
@@ -27,7 +27,7 @@ import org.apache.slider.core.registry.docstore.PublishedConfigSet
import org.apache.slider.core.registry.info.ServiceInstanceData
import org.apache.slider.core.registry.retrieve.RegistryRetriever
import org.apache.slider.providers.hbase.HBaseKeys
-import org.apache.slider.core.registry.zk.ZKIntegration
+import org.apache.slider.core.zk.ZKIntegration
import org.apache.slider.common.params.Arguments
import org.apache.slider.client.SliderClient
import org.apache.slider.providers.hbase.minicluster.HBaseMiniClusterTestBase
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/3a5cf1ab/slider-providers/hbase/slider-hbase-provider/src/test/groovy/org/apache/slider/providers/hbase/minicluster/live/TestHBaseMasterWithBadHeap.groovy
----------------------------------------------------------------------
diff --git a/slider-providers/hbase/slider-hbase-provider/src/test/groovy/org/apache/slider/providers/hbase/minicluster/live/TestHBaseMasterWithBadHeap.groovy b/slider-providers/hbase/slider-hbase-provider/src/test/groovy/org/apache/slider/providers/hbase/minicluster/live/TestHBaseMasterWithBadHeap.groovy
index 5515eac..9ca5f45 100644
--- a/slider-providers/hbase/slider-hbase-provider/src/test/groovy/org/apache/slider/providers/hbase/minicluster/live/TestHBaseMasterWithBadHeap.groovy
+++ b/slider-providers/hbase/slider-hbase-provider/src/test/groovy/org/apache/slider/providers/hbase/minicluster/live/TestHBaseMasterWithBadHeap.groovy
@@ -21,16 +21,13 @@ package org.apache.slider.providers.hbase.minicluster.live
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import org.apache.hadoop.hbase.ClusterStatus
-import org.apache.slider.api.ClusterDescription
import org.apache.slider.api.RoleKeys
import org.apache.slider.client.SliderClient
import org.apache.slider.common.SliderExitCodes
-import org.apache.slider.common.SliderXmlConfKeys
import org.apache.slider.common.params.Arguments
import org.apache.slider.core.conf.AggregateConf
import org.apache.slider.core.main.ServiceLaunchException
import org.apache.slider.core.main.ServiceLauncher
-import org.apache.slider.core.registry.zk.ZKIntegration
import org.apache.slider.providers.hbase.HBaseKeys
import org.apache.slider.providers.hbase.minicluster.HBaseMiniClusterTestBase
import org.junit.Test
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/3a5cf1ab/slider-providers/hbase/slider-hbase-provider/src/test/groovy/org/apache/slider/providers/hbase/minicluster/live/TestLiveRegionServiceOnHDFS.groovy
----------------------------------------------------------------------
diff --git a/slider-providers/hbase/slider-hbase-provider/src/test/groovy/org/apache/slider/providers/hbase/minicluster/live/TestLiveRegionServiceOnHDFS.groovy b/slider-providers/hbase/slider-hbase-provider/src/test/groovy/org/apache/slider/providers/hbase/minicluster/live/TestLiveRegionServiceOnHDFS.groovy
index eb94580..26292fb 100644
--- a/slider-providers/hbase/slider-hbase-provider/src/test/groovy/org/apache/slider/providers/hbase/minicluster/live/TestLiveRegionServiceOnHDFS.groovy
+++ b/slider-providers/hbase/slider-hbase-provider/src/test/groovy/org/apache/slider/providers/hbase/minicluster/live/TestLiveRegionServiceOnHDFS.groovy
@@ -22,7 +22,7 @@ import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import org.apache.hadoop.hbase.ClusterStatus
import org.apache.slider.api.ClusterDescription
-import org.apache.slider.core.registry.zk.ZKIntegration
+import org.apache.slider.core.zk.ZKIntegration
import org.apache.slider.client.SliderClient
import org.apache.slider.providers.hbase.minicluster.HBaseMiniClusterTestBase
import org.apache.slider.core.main.ServiceLauncher