You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@slider.apache.org by sm...@apache.org on 2014/05/28 04:21:17 UTC
[12/44] git commit: SLIDER-38 copy in the mini ZK cluster from hbase,
and drop hbase as a dependency for the tests
SLIDER-38 copy in the mini ZK cluster from hbase, and drop hbase as a dependency for the tests
Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/5281b049
Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/5281b049
Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/5281b049
Branch: refs/heads/master
Commit: 5281b049e280c22e576f42c65e2bec0457e50b02
Parents: cfa4d71
Author: Steve Loughran <st...@hortonworks.com>
Authored: Thu May 22 11:09:33 2014 +0100
Committer: Steve Loughran <st...@hortonworks.com>
Committed: Thu May 22 11:09:33 2014 +0100
----------------------------------------------------------------------
slider-core/pom.xml | 18 +-
.../slider/core/registry/zk/ZookeeperUtils.java | 4 +-
.../apache/slider/test/MicroZKCluster.groovy | 1 -
.../test/YarnZKMiniClusterTestBase.groovy | 3 +-
.../slider/test/MiniZooKeeperCluster.java | 395 +++++++++++++++++++
5 files changed, 403 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5281b049/slider-core/pom.xml
----------------------------------------------------------------------
diff --git a/slider-core/pom.xml b/slider-core/pom.xml
index 100102b..f0b7d2c 100644
--- a/slider-core/pom.xml
+++ b/slider-core/pom.xml
@@ -419,6 +419,7 @@
</exclusion>
</exclusions>
</dependency>
+<!--
<dependency>
<groupId>org.apache.hbase</groupId>
@@ -432,19 +433,9 @@
<scope>test</scope>
</dependency>
- <!--
- <dependency>
- <groupId>org.apache.accumulo</groupId>
- <artifactId>accumulo</artifactId>
- <version>${accumulo.version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- -->
+-->
+
+<!--
<dependency>
<groupId>org.apache.accumulo</groupId>
@@ -469,6 +460,7 @@
<artifactId>accumulo-trace</artifactId>
<scope>test</scope>
</dependency>
+-->
<dependency>
<groupId>junit</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5281b049/slider-core/src/main/java/org/apache/slider/core/registry/zk/ZookeeperUtils.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/core/registry/zk/ZookeeperUtils.java b/slider-core/src/main/java/org/apache/slider/core/registry/zk/ZookeeperUtils.java
index 84a9321..7d73a97 100644
--- a/slider-core/src/main/java/org/apache/slider/core/registry/zk/ZookeeperUtils.java
+++ b/slider-core/src/main/java/org/apache/slider/core/registry/zk/ZookeeperUtils.java
@@ -32,9 +32,7 @@ public class ZookeeperUtils {
String zkPort = Integer.toString(port);
//parse the hosts
String[] hostlist = zkHosts.split(",", 0);
- String quorum = SliderUtils.join(hostlist, ":" + zkPort + ",");
- //this quorum has a trailing comma
- quorum = quorum.substring(0, quorum.length() - 1);
+ String quorum = SliderUtils.join(hostlist, ":" + zkPort + ",", false);
return quorum;
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5281b049/slider-core/src/test/groovy/org/apache/slider/test/MicroZKCluster.groovy
----------------------------------------------------------------------
diff --git a/slider-core/src/test/groovy/org/apache/slider/test/MicroZKCluster.groovy b/slider-core/src/test/groovy/org/apache/slider/test/MicroZKCluster.groovy
index d0da914..8e7b460 100644
--- a/slider-core/src/test/groovy/org/apache/slider/test/MicroZKCluster.groovy
+++ b/slider-core/src/test/groovy/org/apache/slider/test/MicroZKCluster.groovy
@@ -21,7 +21,6 @@ package org.apache.slider.test
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster
import org.apache.slider.common.tools.SliderUtils
@Slf4j
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5281b049/slider-core/src/test/groovy/org/apache/slider/test/YarnZKMiniClusterTestBase.groovy
----------------------------------------------------------------------
diff --git a/slider-core/src/test/groovy/org/apache/slider/test/YarnZKMiniClusterTestBase.groovy b/slider-core/src/test/groovy/org/apache/slider/test/YarnZKMiniClusterTestBase.groovy
index dcc07c6..09a5a8f 100644
--- a/slider-core/src/test/groovy/org/apache/slider/test/YarnZKMiniClusterTestBase.groovy
+++ b/slider-core/src/test/groovy/org/apache/slider/test/YarnZKMiniClusterTestBase.groovy
@@ -21,6 +21,7 @@ package org.apache.slider.test
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.io.IOUtils
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.slider.common.SliderXmlConfKeys
import org.apache.slider.core.registry.zk.BlockingZKWatcher
@@ -41,7 +42,7 @@ public abstract class YarnZKMiniClusterTestBase extends YarnMiniClusterTestBase
public void stopMiniCluster() {
super.stopMiniCluster()
- microZKCluster?.close()
+ IOUtils.closeStream(microZKCluster);
}
public ZKIntegration createZKIntegrationInstance(String zkQuorum,
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5281b049/slider-core/src/test/java/org/apache/slider/test/MiniZooKeeperCluster.java
----------------------------------------------------------------------
diff --git a/slider-core/src/test/java/org/apache/slider/test/MiniZooKeeperCluster.java b/slider-core/src/test/java/org/apache/slider/test/MiniZooKeeperCluster.java
new file mode 100644
index 0000000..cc2cc9b
--- /dev/null
+++ b/slider-core/src/test/java/org/apache/slider/test/MiniZooKeeperCluster.java
@@ -0,0 +1,395 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.persistence.FileTxnLog;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.Reader;
+import java.net.BindException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+
+/**
+ * This is a version of the HBase ZK cluster cut out to be standalone
+ */
+public class MiniZooKeeperCluster {
+ private static final Logger LOG = LoggerFactory.getLogger(
+ MiniZooKeeperCluster.class);
+
+ private static final int TICK_TIME = 2000;
+ private static final int CONNECTION_TIMEOUT = 30000;
+ public static final int MAX_CLIENT_CONNECTIONS = 1000;
+
+ private boolean started;
+
+ /** The default port. If zero, we use a random port. */
+ private int defaultClientPort = 0;
+
+ private int clientPort;
+
+ private List<NIOServerCnxnFactory> standaloneServerFactoryList;
+ private List<ZooKeeperServer> zooKeeperServers;
+ private List<Integer> clientPortList;
+
+ private int activeZKServerIndex;
+ private int tickTime = 0;
+
+ private Configuration configuration;
+
+ public MiniZooKeeperCluster() {
+ this(new Configuration());
+ }
+
+ public MiniZooKeeperCluster(Configuration configuration) {
+ this.started = false;
+ this.configuration = configuration;
+ activeZKServerIndex = -1;
+ zooKeeperServers = new ArrayList<>();
+ clientPortList = new ArrayList<>();
+ standaloneServerFactoryList = new ArrayList<>();
+ }
+
+ public void setDefaultClientPort(int clientPort) {
+ if (clientPort <= 0) {
+ throw new IllegalArgumentException("Invalid default ZK client port: "
+ + clientPort);
+ }
+ this.defaultClientPort = clientPort;
+ }
+
+ /**
+ * Selects a ZK client port. Returns the default port if specified.
+ * Otherwise, returns a random port. The random port is selected from the
+ * range between 49152 to 65535. These ports cannot be registered with IANA
+ * and are intended for dynamic allocation (see http://bit.ly/dynports).
+ */
+ private int selectClientPort() {
+ if (defaultClientPort > 0) {
+ return defaultClientPort;
+ }
+ return 0xc000 + new Random().nextInt(0x3f00);
+ }
+
+ public void setTickTime(int tickTime) {
+ this.tickTime = tickTime;
+ }
+
+ public int getBackupZooKeeperServerNum() {
+ return zooKeeperServers.size() - 1;
+ }
+
+ public int getZooKeeperServerNum() {
+ return zooKeeperServers.size();
+ }
+
+ // / XXX: From o.a.zk.t.ClientBase
+ private static void setupTestEnv() {
+ // during the tests we run with 100K prealloc in the logs.
+ // on windows systems prealloc of 64M was seen to take ~15seconds
+ // resulting in test failure (client timeout on first session).
+ // set env and directly in order to handle static init/gc issues
+ System.setProperty("zookeeper.preAllocSize", "100");
+ FileTxnLog.setPreallocSize(100 * 1024);
+ }
+
+ public int startup(File baseDir) throws IOException, InterruptedException {
+ return startup(baseDir, 1);
+ }
+
+ /**
+ * @param baseDir
+ * @param numZooKeeperServers
+ * @return ClientPort server bound to, -1 if there was a
+ * binding problem and we couldn't pick another port.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public int startup(File baseDir, int numZooKeeperServers) throws IOException,
+ InterruptedException {
+ if (numZooKeeperServers <= 0)
+ return -1;
+
+ setupTestEnv();
+ shutdown();
+
+ int tentativePort = selectClientPort();
+
+ // running all the ZK servers
+ for (int i = 0; i < numZooKeeperServers; i++) {
+ File dir = new File(baseDir, "zookeeper_" + i).getAbsoluteFile();
+ recreateDir(dir);
+ int tickTimeToUse;
+ if (this.tickTime > 0) {
+ tickTimeToUse = this.tickTime;
+ } else {
+ tickTimeToUse = TICK_TIME;
+ }
+ ZooKeeperServer server = new ZooKeeperServer(dir, dir, tickTimeToUse);
+ NIOServerCnxnFactory standaloneServerFactory;
+ while (true) {
+ try {
+ standaloneServerFactory = new NIOServerCnxnFactory();
+ standaloneServerFactory.configure(
+ new InetSocketAddress(tentativePort),
+ MAX_CLIENT_CONNECTIONS
+ );
+ } catch (BindException e) {
+ LOG.debug("Failed binding ZK Server to client port: " +
+ tentativePort, e);
+ // We're told to use some port but it's occupied, fail
+ if (defaultClientPort > 0) return -1;
+ // This port is already in use, try to use another.
+ tentativePort = selectClientPort();
+ continue;
+ }
+ break;
+ }
+
+ // Start up this ZK server
+ standaloneServerFactory.startup(server);
+ if (!waitForServerUp(tentativePort, CONNECTION_TIMEOUT)) {
+ throw new IOException("Waiting for startup of standalone server");
+ }
+
+ // We have selected this port as a client port.
+ clientPortList.add(tentativePort);
+ standaloneServerFactoryList.add(standaloneServerFactory);
+ zooKeeperServers.add(server);
+ tentativePort++; //for the next server
+ }
+
+ // set the first one to be active ZK; Others are backups
+ activeZKServerIndex = 0;
+ started = true;
+ clientPort = clientPortList.get(activeZKServerIndex);
+ LOG.info("Started MiniZK Cluster and connect 1 ZK server " +
+ "on client port: " + clientPort);
+ return clientPort;
+ }
+
+ private void recreateDir(File dir) throws IOException {
+ if (dir.exists()) {
+ if (!FileUtil.fullyDelete(dir)) {
+ throw new IOException("Could not delete zk base directory: " + dir);
+ }
+ }
+ try {
+ dir.mkdirs();
+ } catch (SecurityException e) {
+ throw new IOException("creating dir: " + dir, e);
+ }
+ }
+
+ /**
+ * @throws IOException
+ */
+ public void shutdown() throws IOException {
+ if (!started) {
+ return;
+ }
+
+ // shut down all the zk servers
+ for (int i = 0; i < standaloneServerFactoryList.size(); i++) {
+ NIOServerCnxnFactory standaloneServerFactory =
+ standaloneServerFactoryList.get(i);
+ int clientPort = clientPortList.get(i);
+
+ standaloneServerFactory.shutdown();
+ if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) {
+ throw new IOException("Waiting for shutdown of standalone server");
+ }
+ }
+ for (ZooKeeperServer zkServer : zooKeeperServers) {
+ //explicitly close ZKDatabase since ZookeeperServer does not close them
+ zkServer.getZKDatabase().close();
+ }
+
+ // clear everything
+ started = false;
+ activeZKServerIndex = 0;
+ standaloneServerFactoryList.clear();
+ clientPortList.clear();
+ zooKeeperServers.clear();
+
+ LOG.info("Shutdown MiniZK cluster with all ZK servers");
+ }
+
+ /**@return clientPort return clientPort if there is another ZK backup can run
+ * when killing the current active; return -1, if there is no backups.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public int killCurrentActiveZooKeeperServer() throws IOException,
+ InterruptedException {
+ if (!started || activeZKServerIndex < 0) {
+ return -1;
+ }
+
+ // Shutdown the current active one
+ NIOServerCnxnFactory standaloneServerFactory =
+ standaloneServerFactoryList.get(activeZKServerIndex);
+ int clientPort = clientPortList.get(activeZKServerIndex);
+
+ standaloneServerFactory.shutdown();
+ if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) {
+ throw new IOException("Waiting for shutdown of standalone server");
+ }
+
+ zooKeeperServers.get(activeZKServerIndex).getZKDatabase().close();
+
+ // remove the current active zk server
+ standaloneServerFactoryList.remove(activeZKServerIndex);
+ clientPortList.remove(activeZKServerIndex);
+ zooKeeperServers.remove(activeZKServerIndex);
+ LOG.info("Kill the current active ZK servers in the cluster " +
+ "on client port: " + clientPort);
+
+ if (standaloneServerFactoryList.size() == 0) {
+ // there is no backup servers;
+ return -1;
+ }
+ clientPort = clientPortList.get(activeZKServerIndex);
+ LOG.info("Activate a backup zk server in the cluster " +
+ "on client port: " + clientPort);
+ // return the next back zk server's port
+ return clientPort;
+ }
+
+ /**
+ * Kill one back up ZK servers
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public void killOneBackupZooKeeperServer() throws IOException,
+ InterruptedException {
+ if (!started || activeZKServerIndex < 0 ||
+ standaloneServerFactoryList.size() <= 1) {
+ return;
+ }
+
+ int backupZKServerIndex = activeZKServerIndex + 1;
+ // Shutdown the current active one
+ NIOServerCnxnFactory standaloneServerFactory =
+ standaloneServerFactoryList.get(backupZKServerIndex);
+ int clientPort = clientPortList.get(backupZKServerIndex);
+
+ standaloneServerFactory.shutdown();
+ if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) {
+ throw new IOException("Waiting for shutdown of standalone server");
+ }
+
+ zooKeeperServers.get(backupZKServerIndex).getZKDatabase().close();
+
+ // remove this backup zk server
+ standaloneServerFactoryList.remove(backupZKServerIndex);
+ clientPortList.remove(backupZKServerIndex);
+ zooKeeperServers.remove(backupZKServerIndex);
+ LOG.info("Kill one backup ZK servers in the cluster " +
+ "on client port: " + clientPort);
+ }
+
+ // XXX: From o.a.zk.t.ClientBase
+ private static boolean waitForServerDown(int port, long timeout) {
+ long start = System.currentTimeMillis();
+ while (true) {
+ try {
+ Socket sock = new Socket("localhost", port);
+ try {
+ OutputStream outstream = sock.getOutputStream();
+ outstream.write("stat".getBytes());
+ outstream.flush();
+ } finally {
+ sock.close();
+ }
+ } catch (IOException e) {
+ return true;
+ }
+
+ if (System.currentTimeMillis() > start + timeout) {
+ break;
+ }
+ try {
+ Thread.sleep(250);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ return false;
+ }
+
+ // XXX: From o.a.zk.t.ClientBase
+ private static boolean waitForServerUp(int port, long timeout) {
+ long start = System.currentTimeMillis();
+ while (true) {
+ try {
+ Socket sock = new Socket("localhost", port);
+ BufferedReader reader = null;
+ try {
+ OutputStream outstream = sock.getOutputStream();
+ outstream.write("stat".getBytes());
+ outstream.flush();
+
+ Reader isr = new InputStreamReader(sock.getInputStream());
+ reader = new BufferedReader(isr);
+ String line = reader.readLine();
+ if (line != null && line.startsWith("Zookeeper version:")) {
+ return true;
+ }
+ } finally {
+ sock.close();
+ if (reader != null) {
+ reader.close();
+ }
+ }
+ } catch (IOException e) {
+ // ignore as this is expected
+ LOG.info("server localhost:" + port + " not up " + e);
+ }
+
+ if (System.currentTimeMillis() > start + timeout) {
+ break;
+ }
+ try {
+ Thread.sleep(250);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ return false;
+ }
+
+ public int getClientPort() {
+ return clientPort;
+ }
+}