You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@slider.apache.org by sm...@apache.org on 2014/05/28 04:21:17 UTC

[12/44] git commit: SLIDER-38 copy in the mini ZK cluster from hbase, and drop hbase as a dependency for the tests

SLIDER-38 copy in the mini ZK cluster from hbase, and drop hbase as a dependency for the tests


Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/5281b049
Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/5281b049
Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/5281b049

Branch: refs/heads/master
Commit: 5281b049e280c22e576f42c65e2bec0457e50b02
Parents: cfa4d71
Author: Steve Loughran <st...@hortonworks.com>
Authored: Thu May 22 11:09:33 2014 +0100
Committer: Steve Loughran <st...@hortonworks.com>
Committed: Thu May 22 11:09:33 2014 +0100

----------------------------------------------------------------------
 slider-core/pom.xml                             |  18 +-
 .../slider/core/registry/zk/ZookeeperUtils.java |   4 +-
 .../apache/slider/test/MicroZKCluster.groovy    |   1 -
 .../test/YarnZKMiniClusterTestBase.groovy       |   3 +-
 .../slider/test/MiniZooKeeperCluster.java       | 395 +++++++++++++++++++
 5 files changed, 403 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5281b049/slider-core/pom.xml
----------------------------------------------------------------------
diff --git a/slider-core/pom.xml b/slider-core/pom.xml
index 100102b..f0b7d2c 100644
--- a/slider-core/pom.xml
+++ b/slider-core/pom.xml
@@ -419,6 +419,7 @@
           </exclusion>
         </exclusions>
     </dependency>
+<!--
 
     <dependency>
       <groupId>org.apache.hbase</groupId>
@@ -432,19 +433,9 @@
       <scope>test</scope>
     </dependency>
 
-    <!--
-        <dependency>
-          <groupId>org.apache.accumulo</groupId>
-          <artifactId>accumulo</artifactId>
-          <version>${accumulo.version}</version>
-          <exclusions>
-            <exclusion>
-              <groupId>org.slf4j</groupId>
-              <artifactId>slf4j-api</artifactId>
-            </exclusion>
-          </exclusions>
-        </dependency>
-    -->
+-->
+
+<!--
 
     <dependency>
       <groupId>org.apache.accumulo</groupId>
@@ -469,6 +460,7 @@
       <artifactId>accumulo-trace</artifactId>
       <scope>test</scope>
     </dependency>
+-->
 
     <dependency>
       <groupId>junit</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5281b049/slider-core/src/main/java/org/apache/slider/core/registry/zk/ZookeeperUtils.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/core/registry/zk/ZookeeperUtils.java b/slider-core/src/main/java/org/apache/slider/core/registry/zk/ZookeeperUtils.java
index 84a9321..7d73a97 100644
--- a/slider-core/src/main/java/org/apache/slider/core/registry/zk/ZookeeperUtils.java
+++ b/slider-core/src/main/java/org/apache/slider/core/registry/zk/ZookeeperUtils.java
@@ -32,9 +32,7 @@ public class ZookeeperUtils {
     String zkPort = Integer.toString(port);
     //parse the hosts
     String[] hostlist = zkHosts.split(",", 0);
-    String quorum = SliderUtils.join(hostlist, ":" + zkPort + ",");
-    //this quorum has a trailing comma
-    quorum = quorum.substring(0, quorum.length() - 1);
+    String quorum = SliderUtils.join(hostlist, ":" + zkPort + ",", false);
     return quorum;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5281b049/slider-core/src/test/groovy/org/apache/slider/test/MicroZKCluster.groovy
----------------------------------------------------------------------
diff --git a/slider-core/src/test/groovy/org/apache/slider/test/MicroZKCluster.groovy b/slider-core/src/test/groovy/org/apache/slider/test/MicroZKCluster.groovy
index d0da914..8e7b460 100644
--- a/slider-core/src/test/groovy/org/apache/slider/test/MicroZKCluster.groovy
+++ b/slider-core/src/test/groovy/org/apache/slider/test/MicroZKCluster.groovy
@@ -21,7 +21,6 @@ package org.apache.slider.test
 import groovy.transform.CompileStatic
 import groovy.util.logging.Slf4j
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster
 import org.apache.slider.common.tools.SliderUtils
 
 @Slf4j

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5281b049/slider-core/src/test/groovy/org/apache/slider/test/YarnZKMiniClusterTestBase.groovy
----------------------------------------------------------------------
diff --git a/slider-core/src/test/groovy/org/apache/slider/test/YarnZKMiniClusterTestBase.groovy b/slider-core/src/test/groovy/org/apache/slider/test/YarnZKMiniClusterTestBase.groovy
index dcc07c6..09a5a8f 100644
--- a/slider-core/src/test/groovy/org/apache/slider/test/YarnZKMiniClusterTestBase.groovy
+++ b/slider-core/src/test/groovy/org/apache/slider/test/YarnZKMiniClusterTestBase.groovy
@@ -21,6 +21,7 @@ package org.apache.slider.test
 import groovy.transform.CompileStatic
 import groovy.util.logging.Slf4j
 import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.io.IOUtils
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.slider.common.SliderXmlConfKeys
 import org.apache.slider.core.registry.zk.BlockingZKWatcher
@@ -41,7 +42,7 @@ public abstract class YarnZKMiniClusterTestBase extends YarnMiniClusterTestBase
   
   public void stopMiniCluster() {
     super.stopMiniCluster()
-    microZKCluster?.close()
+    IOUtils.closeStream(microZKCluster);
   }
 
   public ZKIntegration createZKIntegrationInstance(String zkQuorum,

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5281b049/slider-core/src/test/java/org/apache/slider/test/MiniZooKeeperCluster.java
----------------------------------------------------------------------
diff --git a/slider-core/src/test/java/org/apache/slider/test/MiniZooKeeperCluster.java b/slider-core/src/test/java/org/apache/slider/test/MiniZooKeeperCluster.java
new file mode 100644
index 0000000..cc2cc9b
--- /dev/null
+++ b/slider-core/src/test/java/org/apache/slider/test/MiniZooKeeperCluster.java
@@ -0,0 +1,395 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.persistence.FileTxnLog;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.Reader;
+import java.net.BindException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+
+/**
+ * This is a version of the HBase ZK cluster cut out to be standalone
+ */
+public class MiniZooKeeperCluster {
+  private static final Logger LOG = LoggerFactory.getLogger(
+      MiniZooKeeperCluster.class);
+
+  private static final int TICK_TIME = 2000;
+  private static final int CONNECTION_TIMEOUT = 30000;
+  public static final int MAX_CLIENT_CONNECTIONS = 1000;
+
+  private boolean started;
+
+  /** The default port. If zero, we use a random port. */
+  private int defaultClientPort = 0;
+
+  private int clientPort;
+
+  private List<NIOServerCnxnFactory> standaloneServerFactoryList;
+  private List<ZooKeeperServer> zooKeeperServers;
+  private List<Integer> clientPortList;
+
+  private int activeZKServerIndex;
+  private int tickTime = 0;
+
+  private Configuration configuration;
+
+  public MiniZooKeeperCluster() {
+    this(new Configuration());
+  }
+
+  public MiniZooKeeperCluster(Configuration configuration) {
+    this.started = false;
+    this.configuration = configuration;
+    activeZKServerIndex = -1;
+    zooKeeperServers = new ArrayList<>();
+    clientPortList = new ArrayList<>();
+    standaloneServerFactoryList = new ArrayList<>();
+  }
+
+  public void setDefaultClientPort(int clientPort) {
+    if (clientPort <= 0) {
+      throw new IllegalArgumentException("Invalid default ZK client port: "
+                                         + clientPort);
+    }
+    this.defaultClientPort = clientPort;
+  }
+
+  /**
+   * Selects a ZK client port. Returns the default port if specified.
+   * Otherwise, returns a random port. The random port is selected from the
+   * range between 49152 to 65535. These ports cannot be registered with IANA
+   * and are intended for dynamic allocation (see http://bit.ly/dynports).
+   */
+  private int selectClientPort() {
+    if (defaultClientPort > 0) {
+      return defaultClientPort;
+    }
+    return 0xc000 + new Random().nextInt(0x3f00);
+  }
+
+  public void setTickTime(int tickTime) {
+    this.tickTime = tickTime;
+  }
+
+  public int getBackupZooKeeperServerNum() {
+    return zooKeeperServers.size() - 1;
+  }
+
+  public int getZooKeeperServerNum() {
+    return zooKeeperServers.size();
+  }
+
+  // / XXX: From o.a.zk.t.ClientBase
+  private static void setupTestEnv() {
+    // during the tests we run with 100K prealloc in the logs.
+    // on windows systems prealloc of 64M was seen to take ~15seconds
+    // resulting in test failure (client timeout on first session).
+    // set env and directly in order to handle static init/gc issues
+    System.setProperty("zookeeper.preAllocSize", "100");
+    FileTxnLog.setPreallocSize(100 * 1024);
+  }
+
+  public int startup(File baseDir) throws IOException, InterruptedException {
+    return startup(baseDir, 1);
+  }
+
+  /**
+   * @param baseDir
+   * @param numZooKeeperServers
+   * @return ClientPort server bound to, -1 if there was a
+   *         binding problem and we couldn't pick another port.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public int startup(File baseDir, int numZooKeeperServers) throws IOException,
+      InterruptedException {
+    if (numZooKeeperServers <= 0)
+      return -1;
+
+    setupTestEnv();
+    shutdown();
+
+    int tentativePort = selectClientPort();
+
+    // running all the ZK servers
+    for (int i = 0; i < numZooKeeperServers; i++) {
+      File dir = new File(baseDir, "zookeeper_" + i).getAbsoluteFile();
+      recreateDir(dir);
+      int tickTimeToUse;
+      if (this.tickTime > 0) {
+        tickTimeToUse = this.tickTime;
+      } else {
+        tickTimeToUse = TICK_TIME;
+      }
+      ZooKeeperServer server = new ZooKeeperServer(dir, dir, tickTimeToUse);
+      NIOServerCnxnFactory standaloneServerFactory;
+      while (true) {
+        try {
+          standaloneServerFactory = new NIOServerCnxnFactory();
+          standaloneServerFactory.configure(
+              new InetSocketAddress(tentativePort),
+              MAX_CLIENT_CONNECTIONS
+          );
+        } catch (BindException e) {
+          LOG.debug("Failed binding ZK Server to client port: " +
+                    tentativePort, e);
+          // We're told to use some port but it's occupied, fail
+          if (defaultClientPort > 0) return -1;
+          // This port is already in use, try to use another.
+          tentativePort = selectClientPort();
+          continue;
+        }
+        break;
+      }
+
+      // Start up this ZK server
+      standaloneServerFactory.startup(server);
+      if (!waitForServerUp(tentativePort, CONNECTION_TIMEOUT)) {
+        throw new IOException("Waiting for startup of standalone server");
+      }
+
+      // We have selected this port as a client port.
+      clientPortList.add(tentativePort);
+      standaloneServerFactoryList.add(standaloneServerFactory);
+      zooKeeperServers.add(server);
+      tentativePort++; //for the next server
+    }
+
+    // set the first one to be active ZK; Others are backups
+    activeZKServerIndex = 0;
+    started = true;
+    clientPort = clientPortList.get(activeZKServerIndex);
+    LOG.info("Started MiniZK Cluster and connect 1 ZK server " +
+             "on client port: " + clientPort);
+    return clientPort;
+  }
+
+  private void recreateDir(File dir) throws IOException {
+    if (dir.exists()) {
+      if (!FileUtil.fullyDelete(dir)) {
+        throw new IOException("Could not delete zk base directory: " + dir);
+      }
+    }
+    try {
+      dir.mkdirs();
+    } catch (SecurityException e) {
+      throw new IOException("creating dir: " + dir, e);
+    }
+  }
+
+  /**
+   * @throws IOException
+   */
+  public void shutdown() throws IOException {
+    if (!started) {
+      return;
+    }
+
+    // shut down all the zk servers
+    for (int i = 0; i < standaloneServerFactoryList.size(); i++) {
+      NIOServerCnxnFactory standaloneServerFactory =
+          standaloneServerFactoryList.get(i);
+      int clientPort = clientPortList.get(i);
+
+      standaloneServerFactory.shutdown();
+      if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) {
+        throw new IOException("Waiting for shutdown of standalone server");
+      }
+    }
+    for (ZooKeeperServer zkServer : zooKeeperServers) {
+      //explicitly close ZKDatabase since ZookeeperServer does not close them
+      zkServer.getZKDatabase().close();
+    }
+
+    // clear everything
+    started = false;
+    activeZKServerIndex = 0;
+    standaloneServerFactoryList.clear();
+    clientPortList.clear();
+    zooKeeperServers.clear();
+
+    LOG.info("Shutdown MiniZK cluster with all ZK servers");
+  }
+
+  /**@return clientPort return clientPort if there is another ZK backup can run
+   *         when killing the current active; return -1, if there is no backups.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public int killCurrentActiveZooKeeperServer() throws IOException,
+      InterruptedException {
+    if (!started || activeZKServerIndex < 0) {
+      return -1;
+    }
+
+    // Shutdown the current active one
+    NIOServerCnxnFactory standaloneServerFactory =
+        standaloneServerFactoryList.get(activeZKServerIndex);
+    int clientPort = clientPortList.get(activeZKServerIndex);
+
+    standaloneServerFactory.shutdown();
+    if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) {
+      throw new IOException("Waiting for shutdown of standalone server");
+    }
+
+    zooKeeperServers.get(activeZKServerIndex).getZKDatabase().close();
+
+    // remove the current active zk server
+    standaloneServerFactoryList.remove(activeZKServerIndex);
+    clientPortList.remove(activeZKServerIndex);
+    zooKeeperServers.remove(activeZKServerIndex);
+    LOG.info("Kill the current active ZK servers in the cluster " +
+             "on client port: " + clientPort);
+
+    if (standaloneServerFactoryList.size() == 0) {
+      // there is no backup servers;
+      return -1;
+    }
+    clientPort = clientPortList.get(activeZKServerIndex);
+    LOG.info("Activate a backup zk server in the cluster " +
+             "on client port: " + clientPort);
+    // return the next back zk server's port
+    return clientPort;
+  }
+
+  /**
+   * Kill one back up ZK servers
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public void killOneBackupZooKeeperServer() throws IOException,
+      InterruptedException {
+    if (!started || activeZKServerIndex < 0 ||
+        standaloneServerFactoryList.size() <= 1) {
+      return;
+    }
+
+    int backupZKServerIndex = activeZKServerIndex + 1;
+    // Shutdown the current active one
+    NIOServerCnxnFactory standaloneServerFactory =
+        standaloneServerFactoryList.get(backupZKServerIndex);
+    int clientPort = clientPortList.get(backupZKServerIndex);
+
+    standaloneServerFactory.shutdown();
+    if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) {
+      throw new IOException("Waiting for shutdown of standalone server");
+    }
+
+    zooKeeperServers.get(backupZKServerIndex).getZKDatabase().close();
+
+    // remove this backup zk server
+    standaloneServerFactoryList.remove(backupZKServerIndex);
+    clientPortList.remove(backupZKServerIndex);
+    zooKeeperServers.remove(backupZKServerIndex);
+    LOG.info("Kill one backup ZK servers in the cluster " +
+             "on client port: " + clientPort);
+  }
+
+  // XXX: From o.a.zk.t.ClientBase
+  private static boolean waitForServerDown(int port, long timeout) {
+    long start = System.currentTimeMillis();
+    while (true) {
+      try {
+        Socket sock = new Socket("localhost", port);
+        try {
+          OutputStream outstream = sock.getOutputStream();
+          outstream.write("stat".getBytes());
+          outstream.flush();
+        } finally {
+          sock.close();
+        }
+      } catch (IOException e) {
+        return true;
+      }
+
+      if (System.currentTimeMillis() > start + timeout) {
+        break;
+      }
+      try {
+        Thread.sleep(250);
+      } catch (InterruptedException e) {
+        // ignore
+      }
+    }
+    return false;
+  }
+
+  // XXX: From o.a.zk.t.ClientBase
+  private static boolean waitForServerUp(int port, long timeout) {
+    long start = System.currentTimeMillis();
+    while (true) {
+      try {
+        Socket sock = new Socket("localhost", port);
+        BufferedReader reader = null;
+        try {
+          OutputStream outstream = sock.getOutputStream();
+          outstream.write("stat".getBytes());
+          outstream.flush();
+
+          Reader isr = new InputStreamReader(sock.getInputStream());
+          reader = new BufferedReader(isr);
+          String line = reader.readLine();
+          if (line != null && line.startsWith("Zookeeper version:")) {
+            return true;
+          }
+        } finally {
+          sock.close();
+          if (reader != null) {
+            reader.close();
+          }
+        }
+      } catch (IOException e) {
+        // ignore as this is expected
+        LOG.info("server localhost:" + port + " not up " + e);
+      }
+
+      if (System.currentTimeMillis() > start + timeout) {
+        break;
+      }
+      try {
+        Thread.sleep(250);
+      } catch (InterruptedException e) {
+        // ignore
+      }
+    }
+    return false;
+  }
+
+  public int getClientPort() {
+    return clientPort;
+  }
+}