You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by eo...@apache.org on 2017/06/01 09:31:18 UTC

bookkeeper git commit: BOOKKEEPER-1078: Local BookKeeper enhancements for testability

Repository: bookkeeper
Updated Branches:
  refs/heads/master da7064871 -> 0eca5ff57


BOOKKEEPER-1078: Local BookKeeper enhancements for testability

BookKeeper: Local Bookkeeper enhancements for testability

1. Allow creating local bookies without always starting a zookeeper server - This is required as tests may want to create and use their own instance of a test zookeeper
2. Allow using non default zookeeper host and more importantly non default ZK port
3. Allowing the caller to specify the initial port for the bookies
4. Optionally shutdown bookies when the bookie thread exits

Author: Sijie Guo <si...@apache.org>
Author: Sijie Guo <si...@twitter.com>
Author: Robin Dhamankar <rd...@twitter.com>

Reviewers: Enrico Olivelli, Jia Zhai

Closes #164 from sijie/local_bookkeeper_enhancements


Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/0eca5ff5
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/0eca5ff5
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/0eca5ff5

Branch: refs/heads/master
Commit: 0eca5ff57f6e19fe5829d5ef8df228411467401f
Parents: da70648
Author: Sijie Guo <si...@apache.org>
Authored: Thu Jun 1 11:31:14 2017 +0200
Committer: Enrico Olivelli <eo...@apache.org>
Committed: Thu Jun 1 11:31:14 2017 +0200

----------------------------------------------------------------------
 .../shims/zk/ZooKeeperServerShim.java           |  60 +++++
 .../shims/zk/ZooKeeperServerShimFactory.java    |  35 +++
 .../shims/zk/ZooKeeperServerShimImpl.java       |  63 +++++
 .../apache/bookkeeper/util/LocalBookKeeper.java | 237 ++++++++++++++-----
 4 files changed, 330 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0eca5ff5/bookkeeper-server/src/main/java/org/apache/bookkeeper/shims/zk/ZooKeeperServerShim.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/shims/zk/ZooKeeperServerShim.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/shims/zk/ZooKeeperServerShim.java
new file mode 100644
index 0000000..226c2a8
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/shims/zk/ZooKeeperServerShim.java
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.shims.zk;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * In order to be compatible with multiple versions of ZooKeeper.
+ * All parts of the ZooKeeper Server that are not cross-version
+ * compatible are encapsulated in an implementation of this class.
+ */
+public interface ZooKeeperServerShim {
+
+    /**
+     * Initialize zookeeper server.
+     *
+     * @param snapDir
+     *          Snapshot Dir.
+     * @param logDir
+     *          Log Dir.
+     * @param zkPort
+     *          ZooKeeper Port.
+     * @param maxCC
+     *          Max Concurrency for Client.
+     * @throws IOException when failed to initialize zookeeper server.
+     */
+    void initialize(File snapDir, File logDir, int zkPort, int maxCC) throws IOException;
+
+    /**
+     * Start the zookeeper server.
+     *
+     * @throws IOException when failed to start zookeeper server.
+     */
+    void start() throws IOException;
+
+    /**
+     * Stop the zookeeper server.
+     */
+    void stop();
+
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0eca5ff5/bookkeeper-server/src/main/java/org/apache/bookkeeper/shims/zk/ZooKeeperServerShimFactory.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/shims/zk/ZooKeeperServerShimFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/shims/zk/ZooKeeperServerShimFactory.java
new file mode 100644
index 0000000..59ff88a
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/shims/zk/ZooKeeperServerShimFactory.java
@@ -0,0 +1,35 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.shims.zk;
+
+import java.io.File;
+import java.io.IOException;
+
+public class ZooKeeperServerShimFactory {
+
+    public static ZooKeeperServerShim createServer(File snapDir, File logDir, int zkPort, int maxCC)
+        throws IOException {
+        ZooKeeperServerShim server = new ZooKeeperServerShimImpl();
+        server.initialize(snapDir, logDir, zkPort, maxCC);
+        return server;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0eca5ff5/bookkeeper-server/src/main/java/org/apache/bookkeeper/shims/zk/ZooKeeperServerShimImpl.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/shims/zk/ZooKeeperServerShimImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/shims/zk/ZooKeeperServerShimImpl.java
new file mode 100644
index 0000000..b74df4b
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/shims/zk/ZooKeeperServerShimImpl.java
@@ -0,0 +1,63 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.shims.zk;
+
+import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.apache.zookeeper.server.ZooKeeperServer;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+class ZooKeeperServerShimImpl implements ZooKeeperServerShim {
+
+    ZooKeeperServer zks = null;
+    NIOServerCnxnFactory serverFactory = null;
+
+    @Override
+    public void initialize(File snapDir, File logDir, int zkPort, int maxCC) throws IOException {
+        zks = new ZooKeeperServer(snapDir, logDir, ZooKeeperServer.DEFAULT_TICK_TIME);
+        serverFactory = new NIOServerCnxnFactory();
+        serverFactory.configure(new InetSocketAddress(zkPort), maxCC);
+    }
+
+    @Override
+    public void start() throws IOException {
+        if (null == zks || null == serverFactory) {
+            throw new IOException("Start zookeeper server before initialization.");
+        }
+        try {
+            serverFactory.startup(zks);
+        } catch (InterruptedException e) {
+            throw new IOException("Interrupted when starting zookeeper server : ", e);
+        }
+    }
+
+    @Override
+    public void stop() {
+        if (null != serverFactory) {
+            serverFactory.shutdown();
+        }
+        if (null != zks) {
+            zks.shutdown();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0eca5ff5/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/LocalBookKeeper.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/LocalBookKeeper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/LocalBookKeeper.java
index 5dc0053..a92513c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/LocalBookKeeper.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/LocalBookKeeper.java
@@ -23,11 +23,12 @@ import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.OutputStream;
 import java.net.InetAddress;
-import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.bookkeeper.shims.zk.ZooKeeperServerShim;
+import org.apache.bookkeeper.shims.zk.ZooKeeperServerShimFactory;
 import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
 import org.apache.commons.io.FileUtils;
 import org.apache.bookkeeper.bookie.BookieException;
@@ -39,9 +40,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.server.NIOServerCnxnFactory;
-import org.apache.zookeeper.server.ZooKeeperServer;
-import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.ZooDefs.Ids;
 
 import static com.google.common.base.Charsets.UTF_8;
@@ -53,58 +51,58 @@ public class LocalBookKeeper {
     int numberOfBookies;
 
     public LocalBookKeeper() {
-        numberOfBookies = 3;
+        this(3);
     }
 
     public LocalBookKeeper(int numberOfBookies) {
-        this();
+        this(numberOfBookies, 5000, ZooKeeperDefaultHost, ZooKeeperDefaultPort);
+    }
+
+    public LocalBookKeeper(int numberOfBookies, int initialPort, String zkHost, int zkPort) {
         this.numberOfBookies = numberOfBookies;
-        LOG.info("Running " + this.numberOfBookies + " bookie(s).");
+        this.initialPort = initialPort;
+        this.zkServer = String.format("%s:%d", zkHost, zkPort);
+        LOG.info("Running {} bookie(s) on zkServer {}.", this.numberOfBookies, zkServer);
     }
 
-    NIOServerCnxnFactory serverFactory;
-    ZooKeeperServer zks;
-    ZooKeeper zkc;
-    int ZooKeeperDefaultPort = 2181;
+    private String zkServer;
+    static String ZooKeeperDefaultHost = "127.0.0.1";
+    static int ZooKeeperDefaultPort = 2181;
     static int zkSessionTimeOut = 5000;
-    File ZkTmpDir;
+    static Integer BookieDefaultInitialPort = 5000;
 
     //BookKeeper variables
-    File tmpDirs[];
+    File journalDirs[];
     BookieServer bs[];
     ServerConfiguration bsConfs[];
     Integer initialPort = 5000;
 
     /**
-     * @param args
+     * @param maxCC
+     *          Max Concurrency of Client
+     * @param zookeeperPort
+     *          ZooKeeper Server Port
      */
+    public static ZooKeeperServerShim runZookeeper(int maxCC, int zookeeperPort) throws IOException {
+        File zkTmpDir = IOUtils.createTempDir("zookeeper", "localbookkeeper");
+        return runZookeeper(maxCC, zookeeperPort, zkTmpDir);
+    }
 
-    private void runZookeeper(int maxCC) throws IOException {
-        // create a ZooKeeper server(dataDir, dataLogDir, port)
+    public static ZooKeeperServerShim runZookeeper(int maxCC, int zookeeperPort, File zkDir) throws IOException {
         LOG.info("Starting ZK server");
-        //ServerStats.registerAsConcrete();
-        //ClientBase.setupTestEnv();
-        ZkTmpDir = IOUtils.createTempDir("zookeeper", "localbookkeeper");
+        ZooKeeperServerShim server = ZooKeeperServerShimFactory.createServer(zkDir, zkDir, zookeeperPort, maxCC);
+        server.start();
 
-        InetAddress loopbackIP = InetAddress.getLoopbackAddress();
-        try {
-            zks = new ZooKeeperServer(ZkTmpDir, ZkTmpDir, ZooKeeperServer.DEFAULT_TICK_TIME);
-            serverFactory =  new NIOServerCnxnFactory();
-            LOG.info("Starting Zookeeper server at " + loopbackIP.getHostAddress() + " port:" + ZooKeeperDefaultPort);
-            serverFactory.configure(new InetSocketAddress(loopbackIP, ZooKeeperDefaultPort), maxCC);
-            serverFactory.startup(zks);
-        } catch (Exception e) {
-            // TODO Auto-generated catch block
-            LOG.error("Exception while instantiating ZooKeeper", e);
-        }
-
-        boolean b = waitForServerUp(loopbackIP.getHostAddress() + ":" + ZooKeeperDefaultPort, CONNECTION_TIMEOUT);
+        boolean b = waitForServerUp(InetAddress.getLoopbackAddress().getHostAddress() + ":" + zookeeperPort,
+          CONNECTION_TIMEOUT);
         LOG.debug("ZooKeeper server up: {}", b);
+        return server;
     }
 
     private void initializeZookeeper() throws IOException {
         LOG.info("Instantiate ZK Client");
         //initialize the zk client with values
+        ZooKeeperClient zkc = null;
         try {
             zkc = ZooKeeperClient.newBuilder()
                     .connectString(InetAddress.getLoopbackAddress().getHostAddress() + ":" + ZooKeeperDefaultPort)
@@ -115,11 +113,11 @@ public class LocalBookKeeper {
             // No need to create an entry for each requested bookie anymore as the
             // BookieServers will register themselves with ZooKeeper on startup.
         } catch (KeeperException e) {
-            // TODO Auto-generated catch block
             LOG.error("Exception while creating znodes", e);
+            throw new IOException("Error creating znodes : ", e);
         } catch (InterruptedException e) {
-            // TODO Auto-generated catch block
             LOG.error("Interrupted while creating znodes", e);
+            throw new IOException("Error creating znodes : ", e);
         }
     }
 
@@ -157,53 +155,165 @@ public class LocalBookKeeper {
         }
     }
 
+    @SuppressWarnings("deprecation")
     private void runBookies(ServerConfiguration baseConf, List<File> tempDirs, String dirSuffix)
             throws IOException, KeeperException, InterruptedException, BookieException,
             UnavailableException, CompatibilityException {
         LOG.info("Starting Bookie(s)");
         // Create Bookie Servers (B1, B2, B3)
 
-        tmpDirs = new File[numberOfBookies];
+        journalDirs = new File[numberOfBookies];
         bs = new BookieServer[numberOfBookies];
         bsConfs = new ServerConfiguration[numberOfBookies];
 
-        String loopbackIPAddr = InetAddress.getLoopbackAddress().getHostAddress();
         for(int i = 0; i < numberOfBookies; i++) {
-            tmpDirs[i] = File.createTempFile("bookie" + Integer.toString(i), "test");
-            if (!tmpDirs[i].delete() || !tmpDirs[i].mkdir()) {
-                throw new IOException("Couldn't create bookie dir " + tmpDirs[i]);
+            if (null == baseConf.getJournalDirNameWithoutDefault()) {
+                journalDirs[i] = IOUtils.createTempDir("localbookkeeper" + Integer.toString(i), dirSuffix);
+                tempDirs.add(journalDirs[i]);
+            } else {
+                journalDirs[i] = new File(baseConf.getJournalDirName(), "bookie" + Integer.toString(i));
+            }
+            if (journalDirs[i].exists()) {
+                if (journalDirs[i].isDirectory()) {
+                    FileUtils.deleteDirectory(journalDirs[i]);
+                } else if (!journalDirs[i].delete()) {
+                    throw new IOException("Couldn't cleanup bookie journal dir " + journalDirs[i]);
+                }
+            }
+            if (!journalDirs[i].mkdirs()) {
+                throw new IOException("Couldn't create bookie journal dir " + journalDirs[i]);
+            }
+
+            String [] ledgerDirs = baseConf.getLedgerDirWithoutDefault();
+            if ((null == ledgerDirs) || (0 == ledgerDirs.length)) {
+                ledgerDirs = new String[] { journalDirs[i].getPath() };
+            } else {
+                for (int l = 0; l < ledgerDirs.length; l++) {
+                    File dir = new File(ledgerDirs[l], "bookie" + Integer.toString(i));
+                    if (dir.exists()) {
+                        if (dir.isDirectory()) {
+                            FileUtils.deleteDirectory(dir);
+                        } else if (!dir.delete()) {
+                            throw new IOException("Couldn't cleanup bookie ledger dir " + dir);
+                        }
+                    }
+                    if (!dir.mkdirs()) {
+                        throw new IOException("Couldn't create bookie ledger dir " + dir);
+                    }
+                    ledgerDirs[l] = dir.getPath();
+                }
             }
 
             bsConfs[i] = new ServerConfiguration(baseConf);
-            // override settings
-            bsConfs[i].setBookiePort(initialPort + i);
-            LOG.info("Connecting to Zookeeper at " + loopbackIPAddr + " port:" + ZooKeeperDefaultPort);
-            bsConfs[i].setZkServers(loopbackIPAddr + ":" + ZooKeeperDefaultPort);
 
-            if (null == bsConfs[i].getJournalDirNameWithoutDefault()) {
-                bsConfs[i].setJournalDirName(tmpDirs[i].getPath());
+            // If the caller specified ephemeral ports then use ephemeral ports for all
+            // the bookies else use numBookie ports starting at initialPort
+            if (0 == initialPort) {
+                bsConfs[i].setBookiePort(0);
+            } else {
+                bsConfs[i].setBookiePort(initialPort + i);
             }
-
-            String [] ledgerDirs = bsConfs[i].getLedgerDirWithoutDefault();
-            if ((null == ledgerDirs) || (0 == ledgerDirs.length)) {
-                bsConfs[i].setLedgerDirNames(new String[] { tmpDirs[i].getPath() });
+            
+            if (null == baseConf.getZkServers()) {
+                bsConfs[i].setZkServers(InetAddress.getLocalHost().getHostAddress() + ":"
+                                  + ZooKeeperDefaultPort);
             }
 
-            bsConfs[i].setAllowLoopback(true);
+
+            bsConfs[i].setJournalDirName(journalDirs[i].getPath());
+            bsConfs[i].setLedgerDirNames(ledgerDirs);
 
             bs[i] = new BookieServer(bsConfs[i]);
             bs[i].start();
         }
     }
 
-    public static void main(String[] args) throws IOException, KeeperException,
-            InterruptedException, BookieException, UnavailableException,
-            CompatibilityException {
+    public static void startLocalBookies(String zkHost,
+                                         int zkPort,
+                                         int numBookies,
+                                         boolean shouldStartZK,
+                                         int initialBookiePort)
+            throws Exception {
+        ServerConfiguration conf = new ServerConfiguration();
+        startLocalBookiesInternal(conf, zkHost, zkPort, numBookies, shouldStartZK, initialBookiePort, true, "test");
+    }
+
+    public static void startLocalBookies(String zkHost,
+                                         int zkPort,
+                                         int numBookies,
+                                         boolean shouldStartZK,
+                                         int initialBookiePort,
+                                         ServerConfiguration conf)
+            throws Exception {
+        startLocalBookiesInternal(conf, zkHost, zkPort, numBookies, shouldStartZK, initialBookiePort, true, "test");
+    }
+
+    public static void startLocalBookies(String zkHost,
+                                         int zkPort,
+                                         int numBookies,
+                                         boolean shouldStartZK,
+                                         int initialBookiePort,
+                                         String dirSuffix)
+            throws Exception {
+        ServerConfiguration conf = new ServerConfiguration();
+        startLocalBookiesInternal(conf, zkHost, zkPort, numBookies, shouldStartZK, initialBookiePort, true, dirSuffix);
+    }
+
+    static void startLocalBookiesInternal(ServerConfiguration conf,
+                                          String zkHost,
+                                          int zkPort,
+                                          int numBookies,
+                                          boolean shouldStartZK,
+                                          int initialBookiePort,
+                                          boolean stopOnExit,
+                                          String dirSuffix)
+            throws Exception {
+        LocalBookKeeper lb = new LocalBookKeeper(numBookies, initialBookiePort, zkHost, zkPort);
+
+        ZooKeeperServerShim zks = null;
+        File zkTmpDir = null;
+        List<File> bkTmpDirs = null;
+        try {
+            if (shouldStartZK) {
+                zkTmpDir = IOUtils.createTempDir("zookeeper", dirSuffix);
+                zks = LocalBookKeeper.runZookeeper(1000, zkPort, zkTmpDir);
+            }
+
+            lb.initializeZookeeper();
+            conf.setZkServers(zkHost + ":" + zkPort);
+            bkTmpDirs = lb.runBookies(conf, dirSuffix);
+
+            try {
+                while (true) {
+                    Thread.sleep(5000);
+                }
+            } catch (InterruptedException ie) {
+                if (stopOnExit) {
+                    lb.shutdownBookies();
+
+                    if (null != zks) {
+                        zks.stop();
+                    }
+                }
+                throw ie;
+            }
+        } finally {
+            if (stopOnExit) {
+                cleanupDirectories(bkTmpDirs);
+                if (null != zkTmpDir) {
+                    FileUtils.deleteDirectory(zkTmpDir);
+                }
+            }
+        }
+    }
+
+    public static void main(String[] args) throws Exception {
         if(args.length < 1) {
             usage();
             System.exit(-1);
         }
-        LocalBookKeeper lb = new LocalBookKeeper(Integer.parseInt(args[0]));
+
+        int numBookies = Integer.parseInt(args[0]);
 
         ServerConfiguration conf = new ServerConfiguration();
         if (args.length >= 2) {
@@ -217,17 +327,8 @@ public class LocalBookKeeper {
             }
         }
 
-        lb.runZookeeper(1000);
-        lb.initializeZookeeper();
-        List<File> tmpDirs = lb.runBookies(conf, "test");
-        try {
-            while (true) {
-                Thread.sleep(5000);
-            }
-        } catch (InterruptedException ie) {
-            cleanupDirectories(tmpDirs);
-            throw ie;
-        }
+        startLocalBookiesInternal(conf, ZooKeeperDefaultHost, ZooKeeperDefaultPort,
+                numBookies, true, BookieDefaultInitialPort, false, "test");
     }
 
     private static void usage() {
@@ -279,4 +380,10 @@ public class LocalBookKeeper {
         return false;
     }
 
+    public void shutdownBookies() {
+        for (BookieServer bookieServer: bs) {
+            bookieServer.shutdown();
+        }
+    }
+
 }