You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/07/05 14:45:33 UTC

svn commit: r1357581 - in /zookeeper/bookkeeper/trunk: ./ bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/

Author: ivank
Date: Thu Jul  5 12:45:33 2012
New Revision: 1357581

URL: http://svn.apache.org/viewvc?rev=1357581&view=rev
Log:
BOOKKEEPER-294: Not able to start the bookkeeper before the ZK session timeout. (rakeshr via ivank)

Added:
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1357581&r1=1357580&r2=1357581&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Thu Jul  5 12:45:33 2012
@@ -36,6 +36,8 @@ Trunk (unreleased changes)
 
         BOOKKEEPER-296: It's better provide stop script for bookie (nijel via sijie)
 
+        BOOKKEEPER-294: Not able to start the bookkeeper before the ZK session timeout. (rakeshr via ivank)
+
       hedwig-client:
 
         BOOKKEEPER-274: Hedwig cpp client library should not link to cppunit which is just used for test. (sijie via ivank)

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java?rev=1357581&r1=1357580&r2=1357581&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java Thu Jul  5 12:45:33 2012
@@ -34,6 +34,8 @@ import java.util.Collections;
 import java.util.Map;
 import java.util.HashMap;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.bookkeeper.meta.ActiveLedgerManager;
@@ -52,6 +54,7 @@ import org.apache.zookeeper.WatchedEvent
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.Watcher.Event.EventType;
 
 /**
  * Implements a bookie.
@@ -361,12 +364,9 @@ public class Bookie extends Thread {
         handles = new HandleFactoryImpl(ledgerStorage);
         // instantiate the journal
         journal = new Journal(conf);
-
-        // replay journals
-        readJournal();
     }
 
-    private void readJournal() throws IOException, BookieException {
+    void readJournal() throws IOException, BookieException {
         journal.replay(new JournalScanner() {
             @Override
             public void process(int journalVersion, long offset, ByteBuffer recBuff) throws IOException {
@@ -408,6 +408,16 @@ public class Bookie extends Thread {
     synchronized public void start() {
         setDaemon(true);
         LOG.debug("I'm starting a bookie with journal directory " + journalDirectory.getName());
+        // replay journals
+        try {
+            readJournal();
+        } catch (IOException ioe) {
+            LOG.error("Exception while replaying journals, shutting down", ioe);
+            shutdown(ExitCode.BOOKIE_EXCEPTION);
+        } catch (BookieException be) {
+            LOG.error("Exception while replaying journals, shutting down", be);
+            shutdown(ExitCode.BOOKIE_EXCEPTION);
+        }
         // start bookie thread
         super.start();
 
@@ -488,21 +498,56 @@ public class Bookie extends Thread {
     /**
      * Register as an available bookie
      */
-    private void registerBookie(int port) throws IOException {
+    protected void registerBookie(int port) throws IOException {
         if (null == zk) {
             // zookeeper instance is null, means not register itself to zk
             return;
         }
-        // Create the ZK ephemeral node for this Bookie.
-        try {
-            zk.create(this.bookieRegistrationPath + InetAddress.getLocalHost().getHostAddress() + ":" + port, new byte[0],
-                      Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
-        } catch (Exception e) {
-            LOG.error("ZK exception registering ephemeral Znode for Bookie!", e);
+        // ZK ephemeral node for this Bookie.
+        String zkBookieRegPath = this.bookieRegistrationPath
+                + InetAddress.getLocalHost().getHostAddress() + ":" + port;
+        final CountDownLatch prevNodeLatch = new CountDownLatch(1);
+        try{
+            Watcher zkPrevRegNodewatcher = new Watcher() {
+                @Override
+                public void process(WatchedEvent event) {
+                    // Check for prev znode deletion. Connection expiration is
+                    // not handling, since bookie has logic to shutdown.
+                    if (EventType.NodeDeleted == event.getType()) {
+                        prevNodeLatch.countDown();
+                    }
+                }
+            };
+            if (null != zk.exists(zkBookieRegPath, zkPrevRegNodewatcher)) {
+                LOG.info("Previous bookie registration znode: "
+                        + zkBookieRegPath
+                        + " exists, so waiting zk sessiontimeout: "
+                        + conf.getZkTimeout() + "ms for znode deletion");
+                // waiting for the previous bookie reg znode deletion
+                if (!prevNodeLatch.await(conf.getZkTimeout(),
+                        TimeUnit.MILLISECONDS)) {
+                    throw new KeeperException.NodeExistsException(
+                            zkBookieRegPath);
+                }
+            }
+
+            // Create the ZK ephemeral node for this Bookie.
+            zk.create(zkBookieRegPath, new byte[0], Ids.OPEN_ACL_UNSAFE,
+                    CreateMode.EPHEMERAL);
+        } catch (KeeperException ke) {
+            LOG.error("ZK exception registering ephemeral Znode for Bookie!",
+                    ke);
+            // Throw an IOException back up. This will cause the Bookie
+            // constructor to error out. Alternatively, we could do a System
+            // exit here as this is a fatal error.
+            throw new IOException(ke);
+        } catch (InterruptedException ie) {
+            LOG.error("ZK exception registering ephemeral Znode for Bookie!",
+                    ie);
             // Throw an IOException back up. This will cause the Bookie
             // constructor to error out. Alternatively, we could do a System
             // exit here as this is a fatal error.
-            throw new IOException(e);
+            throw new IOException(ie);
         }
     }
 

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java?rev=1357581&r1=1357580&r2=1357581&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java Thu Jul  5 12:45:33 2012
@@ -29,8 +29,6 @@ import java.net.MalformedURLException;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 
-import javax.management.JMException;
-
 import org.apache.zookeeper.KeeperException;
 
 import org.apache.bookkeeper.bookie.Bookie;
@@ -83,9 +81,10 @@ public class BookieServer implements NIO
     }
 
     public void start() throws IOException {
+        nioServerFactory = new NIOServerFactory(conf, this);
+
         this.bookie.start();
 
-        nioServerFactory = new NIOServerFactory(conf, this);
         nioServerFactory.start();
         running = true;
         deathWatcher = new DeathWatcher(conf);

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java?rev=1357581&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java Thu Jul  5 12:45:33 2012
@@ -0,0 +1,257 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie;
+
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.BindException;
+import java.net.InetAddress;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.Assert;
+
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.BookieException;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.proto.BookieServer;
+import org.apache.bookkeeper.test.ZooKeeperUtil;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.KeeperException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Testing bookie initialization cases
+ */
+public class BookieInitializationTest {
+    private static final Logger LOG = LoggerFactory
+            .getLogger(BookieInitializationTest.class);
+    ZooKeeperUtil zkutil;
+    ZooKeeper zkc = null;
+    ZooKeeper newzk = null;
+
+    @Before
+    public void setupZooKeeper() throws Exception {
+        zkutil = new ZooKeeperUtil();
+        zkutil.startServer();
+        zkc = zkutil.getZooKeeperClient();
+    }
+
+    @After
+    public void tearDownZooKeeper() throws Exception {
+        if (newzk != null) {
+            newzk.close();
+        }
+        zkutil.killServer();
+    }
+
+    private static class MockBookie extends Bookie {
+        MockBookie(ServerConfiguration conf) throws IOException,
+                KeeperException, InterruptedException, BookieException {
+            super(conf);
+        }
+
+        void testRegisterBookie(int port) throws IOException {
+            super.registerBookie(port);
+        }
+    }
+
+    /**
+     * Verify the bookie reg. Restarting bookie server will wait for the session
+     * timeout when previous reg node exists in zk. On zNode delete event,
+     * should continue startup
+     */
+    @Test
+    public void testBookieRegistration() throws Exception {
+        File tmpDir = File.createTempFile("bookie", "test");
+        tmpDir.delete();
+        tmpDir.mkdir();
+
+        final ServerConfiguration conf = new ServerConfiguration()
+                .setZkServers(null).setJournalDirName(tmpDir.getPath())
+                .setLedgerDirNames(new String[] { tmpDir.getPath() });
+
+        final String bkRegPath = conf.getZkAvailableBookiesPath() + "/"
+                + InetAddress.getLocalHost().getHostAddress() + ":"
+                + conf.getBookiePort();
+
+        MockBookie b = new MockBookie(conf);
+        b.zk = zkc;
+        b.testRegisterBookie(conf.getBookiePort());
+        Stat bkRegNode1 = zkc.exists(bkRegPath, false);
+        Assert.assertNotNull("Bookie registration node doesn't exists!",
+                bkRegNode1);
+
+        // simulating bookie restart, on restart bookie will create new
+        // zkclient and doing the registration.
+        createNewZKClient();
+        b.zk = newzk;
+
+        // deleting the znode, so that the bookie registration should
+        // continue successfully on NodeDeleted event
+        new Thread() {
+            @Override
+            public void run() {
+                try {
+                    Thread.sleep(conf.getZkTimeout() / 3);
+                    zkc.delete(bkRegPath, -1);
+                } catch (Exception e) {
+                    // Not handling, since the testRegisterBookie will fail
+                    LOG.error("Failed to delete the znode :" + bkRegPath, e);
+                }
+            }
+        }.start();
+        try {
+            b.testRegisterBookie(conf.getBookiePort());
+        } catch (IOException e) {
+            Throwable t = e.getCause();
+            if (t instanceof KeeperException) {
+                KeeperException ke = (KeeperException) t;
+                Assert.assertTrue("ErrorCode:" + ke.code()
+                        + ", Registration node exists",
+                        ke.code() != KeeperException.Code.NODEEXISTS);
+            }
+            throw e;
+        }
+
+        // verify ephemeral owner of the bkReg znode
+        Stat bkRegNode2 = newzk.exists(bkRegPath, false);
+        Assert.assertNotNull("Bookie registration has been failed", bkRegNode2);
+        Assert.assertTrue("Bookie is referring to old registration znode:"
+                + bkRegNode1 + ", New ZNode:" + bkRegNode2, bkRegNode1
+                .getEphemeralOwner() != bkRegNode2.getEphemeralOwner());
+    }
+
+    /**
+     * Verify the bookie registration, it should throw
+     * KeeperException.NodeExistsException if the znode still exists even after
+     * the zk session timeout.
+     */
+    @Test
+    public void testRegNodeExistsAfterSessionTimeOut() throws Exception {
+        File tmpDir = File.createTempFile("bookie", "test");
+        tmpDir.delete();
+        tmpDir.mkdir();
+
+        ServerConfiguration conf = new ServerConfiguration().setZkServers(null)
+                .setJournalDirName(tmpDir.getPath()).setLedgerDirNames(
+                        new String[] { tmpDir.getPath() });
+
+        String bkRegPath = conf.getZkAvailableBookiesPath() + "/"
+                + InetAddress.getLocalHost().getHostAddress() + ":"
+                + conf.getBookiePort();
+
+        MockBookie b = new MockBookie(conf);
+        b.zk = zkc;
+        b.testRegisterBookie(conf.getBookiePort());
+        Stat bkRegNode1 = zkc.exists(bkRegPath, false);
+        Assert.assertNotNull("Bookie registration node doesn't exists!",
+                bkRegNode1);
+
+        // simulating bookie restart, on restart bookie will create new
+        // zkclient and doing the registration.
+        createNewZKClient();
+        b.zk = newzk;
+        try {
+            b.testRegisterBookie(conf.getBookiePort());
+            fail("Should throw NodeExistsException as the znode is not getting expired");
+        } catch (IOException e) {
+            Throwable t = e.getCause();
+            if (t instanceof KeeperException) {
+                KeeperException ke = (KeeperException) t;
+                Assert.assertTrue("ErrorCode:" + ke.code()
+                        + ", Registration node doesn't exists",
+                        ke.code() == KeeperException.Code.NODEEXISTS);
+
+                // verify ephemeral owner of the bkReg znode
+                Stat bkRegNode2 = newzk.exists(bkRegPath, false);
+                Assert.assertNotNull("Bookie registration has been failed",
+                        bkRegNode2);
+                Assert.assertTrue(
+                        "Bookie wrongly registered. Old registration znode:"
+                                + bkRegNode1 + ", New znode:" + bkRegNode2,
+                        bkRegNode1.getEphemeralOwner() == bkRegNode2
+                                .getEphemeralOwner());
+                return;
+            }
+            throw e;
+        }
+    }
+
+    /**
+     * Verify duplicate bookie server startup. Should throw
+     * java.net.BindException if already BK server is running
+     */
+    @Test
+    public void testDuplicateBookieServerStartup() throws Exception {
+        File tmpDir = File.createTempFile("bookie", "test");
+        tmpDir.delete();
+        tmpDir.mkdir();
+
+        ServerConfiguration conf = new ServerConfiguration();
+        int port = 12555;
+        conf.setZkServers(null).setBookiePort(port).setJournalDirName(
+                tmpDir.getPath()).setLedgerDirNames(
+                new String[] { tmpDir.getPath() });
+        BookieServer bs1 = new BookieServer(conf);
+        bs1.start();
+
+        // starting bk server with same conf
+        try {
+            BookieServer bs2 = new BookieServer(conf);
+            bs2.start();
+            fail("Should throw BindException, as the bk server is already running!");
+        } catch (BindException be) {
+            Assert.assertTrue("BKServer allowed duplicate startups!", be
+                    .getMessage().contains("Address already in use"));
+        }
+    }
+
+    private void createNewZKClient() throws Exception {
+        // create a zookeeper client
+        LOG.debug("Instantiate ZK Client");
+        final CountDownLatch latch = new CountDownLatch(1);
+        newzk = new ZooKeeper(zkutil.getZooKeeperConnectString(), 10000,
+                new Watcher() {
+                    @Override
+                    public void process(WatchedEvent event) {
+                        // handle session disconnects and expires
+                        if (event.getState().equals(
+                                Watcher.Event.KeeperState.SyncConnected)) {
+                            latch.countDown();
+                        }
+                    }
+                });
+        if (!latch.await(10000, TimeUnit.MILLISECONDS)) {
+            newzk.close();
+            fail("Could not connect to zookeeper server");
+        }
+    }
+}

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java?rev=1357581&r1=1357580&r2=1357581&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java Thu Jul  5 12:45:33 2012
@@ -397,6 +397,7 @@ public class BookieJournalTest {
             .setLedgerDirNames(new String[] { ledgerDir.getPath() });
 
         Bookie b = new Bookie(conf);
+        b.readJournal();
 
         b.readEntry(1, 99);
 
@@ -444,6 +445,7 @@ public class BookieJournalTest {
             .setLedgerDirNames(new String[] { ledgerDir.getPath() });
 
         Bookie b = new Bookie(conf);
+        b.readJournal();
         b.readEntry(1, 99);
 
         // still able to read last entry, but it's junk
@@ -511,12 +513,13 @@ public class BookieJournalTest {
         if (truncateMasterKey) {
             try {
                 Bookie b = new Bookie(conf);
+                b.readJournal();
                 fail("Should not reach here!");
             } catch (IOException ie) {
             }
         } else {
             Bookie b = new Bookie(conf);
-
+            b.readJournal();
             b.readEntry(1, 100);
             try {
                 b.readEntry(1, 101);
@@ -570,7 +573,7 @@ public class BookieJournalTest {
             .setLedgerDirNames(new String[] { ledgerDir.getPath() });
 
         Bookie b = new Bookie(conf);
-
+        b.readJournal();
         b.readEntry(1, 100);
         try {
             b.readEntry(1, 101);