You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by si...@apache.org on 2012/06/07 08:37:47 UTC

svn commit: r1347417 - in /zookeeper/bookkeeper/tags/release-4.1.0: ./ bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ bookkeeper-server/src/main/java/org/apache/bookkeeper/uti...

Author: sijie
Date: Thu Jun  7 06:37:46 2012
New Revision: 1347417

URL: http://svn.apache.org/viewvc?rev=1347417&view=rev
Log:
 svn merge -c 1346253,1346258,1346328,1346343,1346976 https://svn.apache.org/repos/asf/zookeeper/bookkeeper/branches/branch-4.1 

Added:
    zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
      - copied unchanged from r1346253, zookeeper/bookkeeper/branches/branch-4.1/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
Modified:
    zookeeper/bookkeeper/tags/release-4.1.0/   (props changed)
    zookeeper/bookkeeper/tags/release-4.1.0/CHANGES.txt
    zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
    zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
    zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
    zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
    zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/LocalBookKeeper.java
    zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
    zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadTimeout.java
    zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperUtil.java
    zookeeper/bookkeeper/tags/release-4.1.0/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/TestZkSubscriptionManager.java
    zookeeper/bookkeeper/tags/release-4.1.0/pom.xml

Propchange: zookeeper/bookkeeper/tags/release-4.1.0/
------------------------------------------------------------------------------
--- svn:mergeinfo (added)
+++ svn:mergeinfo Thu Jun  7 06:37:46 2012
@@ -0,0 +1 @@
+/zookeeper/bookkeeper/branches/branch-4.1:1346253,1346258,1346328,1346343,1346976

Modified: zookeeper/bookkeeper/tags/release-4.1.0/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/tags/release-4.1.0/CHANGES.txt?rev=1347417&r1=1347416&r2=1347417&view=diff
==============================================================================
--- zookeeper/bookkeeper/tags/release-4.1.0/CHANGES.txt (original)
+++ zookeeper/bookkeeper/tags/release-4.1.0/CHANGES.txt Thu Jun  7 06:37:46 2012
@@ -120,6 +120,14 @@ Release 4.1.0 - 2012-05-31
 
         BOOKKEEPER-273: LedgerHandle.deleteLedger() should be idempotent (Matteo Merli via ivank)
 
+        BOOKKEEPER-281: BKClient is failing when zkclient connection delays (ivank via sijie)
+
+        BOOKKEEPER-279: LocalBookKeeper is failing intermittently due to zkclient connection establishment delay (Rakesh R via sijie)
+
+        BOOKKEEPER-286: Compilation warning (ivank via sijie)
+
+        BOOKKEEPER-287: NoSuchElementException in LedgerCacheImpl (sijie)
+
       hedwig-client/
 
         BOOKKEEPER-217: NPE in hedwig client when enable DEBUG (sijie via ivank)
@@ -146,6 +154,8 @@ Release 4.1.0 - 2012-05-31
 
         BOOKKEEPER-146: TestConcurrentTopicAcquisition sometimes hangs (ivank)
 
+        BOOKKEEPER-285: TestZkSubscriptionManager quits due to NPE, so other tests are not run in hedwig server. (sijie)
+
       bookkeeper-benchmark/
 	
 	BOOKKEEPER-207: BenchBookie doesn't run correctly (ivank via fpj)

Modified: zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java?rev=1347417&r1=1347416&r2=1347417&view=diff
==============================================================================
--- zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java (original)
+++ zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java Thu Jun  7 06:37:46 2012
@@ -176,7 +176,7 @@ public class LedgerCacheImpl implements 
             }   
         } catch (IOException ie) {
             // if we grab a clean page, but failed to update the page
-            // we are exhuasting the count of ledger entry pages.
+            // we are exhausting the count of ledger entry pages.
             // since this page will be never used, so we need to decrement
             // page count of ledger cache.
             lep.releasePage();
@@ -317,7 +317,7 @@ public class LedgerCacheImpl implements 
                 if (!doAll) {
                     break;
                 }
-                // Yeild. if we are doing all the ledgers we don't want to block other flushes that
+                // Yield. if we are doing all the ledgers we don't want to block other flushes that
                 // need to happen
                 try {
                     dirtyLedgers.wait(1);
@@ -449,22 +449,22 @@ public class LedgerCacheImpl implements 
         if (entry % entriesPerPage != 0) {
             throw new IllegalArgumentException(entry + " is not a multiple of " + entriesPerPage);
         }
-        synchronized(this) {
-            if (pageCount  < pageLimit) {
-                // let's see if we can allocate something
-                LedgerEntryPage lep = new LedgerEntryPage(pageSize, entriesPerPage);
-                lep.setLedger(ledger);
-                lep.setFirstEntry(entry);
-
-                // note, this will not block since it is a new page
-                lep.usePage();
-                pageCount++;
-                return lep;
-            }
-        }
-
         outerLoop:
         while(true) {
+            synchronized(this) {
+                if (pageCount  < pageLimit) {
+                    // let's see if we can allocate something
+                    LedgerEntryPage lep = new LedgerEntryPage(pageSize, entriesPerPage);
+                    lep.setLedger(ledger);
+                    lep.setFirstEntry(entry);
+
+                    // note, this will not block since it is a new page
+                    lep.usePage();
+                    pageCount++;
+                    return lep;
+                }
+            }
+
             synchronized(cleanLedgers) {
                 if (cleanLedgers.isEmpty()) {
                     flushLedger(false);
@@ -475,6 +475,14 @@ public class LedgerCacheImpl implements 
                     }
                 }
                 synchronized(this) {
+                    // if ledgers deleted between checking pageCount and putting
+                    // ledgers into cleanLedgers list, the cleanLedgers list would be empty.
+                    // so give it a chance to go back to check pageCount again because
+                    // deleteLedger would decrement pageCount to return the number of pages
+                    // occupied by deleted ledgers.
+                    if (cleanLedgers.isEmpty()) {
+                        continue outerLoop;
+                    }
                     Long cleanLedger = cleanLedgers.getFirst();
                     Map<Long, LedgerEntryPage> map = pages.get(cleanLedger);
                     while (map == null || map.isEmpty()) {
@@ -610,7 +618,13 @@ public class LedgerCacheImpl implements 
 
         // remove pages first to avoid page flushed when deleting file info
         synchronized(this) {
-            pages.remove(ledgerId);
+            Map<Long, LedgerEntryPage> lpages = pages.remove(ledgerId);
+            if (null != lpages) {
+                pageCount -= lpages.size();
+                if (pageCount < 0) {
+                    LOG.error("Page count of ledger cache has been decremented to be less than zero.");
+                }
+            }
         }
         // Delete the ledger's index file and close the FileInfo
         FileInfo fi = null;

Modified: zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java?rev=1347417&r1=1347416&r2=1347417&view=diff
==============================================================================
--- zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java (original)
+++ zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java Thu Jun  7 06:37:46 2012
@@ -119,27 +119,35 @@ public class BookKeeper {
      * @throws InterruptedException
      * @throws KeeperException
      */
-    public BookKeeper(ClientConfiguration conf)
+    public BookKeeper(final ClientConfiguration conf)
             throws IOException, InterruptedException, KeeperException {
         this.conf = conf;
+
+        final CountDownLatch zkConnectLatch = new CountDownLatch(1);
         this.zk = new ZooKeeper(conf.getZkServers(), conf.getZkTimeout(),
                 new Watcher() {
                     @Override
                     public void process(WatchedEvent event) {
-                        if (event.getState().equals(Watcher.Event.KeeperState.SyncConnected)) {
-                            connectLatch.countDown();
-                        }
+                        // countdown the latch on all events, even if we haven't
+                        // successfully connected.
+                        zkConnectLatch.countDown();
+
                         // TODO: handle session disconnects and expires
                         LOG.debug("Process: {} {}", event.getType(), event.getPath());
                     }
                 });
+        if (!zkConnectLatch.await(conf.getZkTimeout(), TimeUnit.MILLISECONDS)
+            || !zk.getState().isConnected()) {
+            throw KeeperException.create(KeeperException.Code.CONNECTIONLOSS);
+        }
+
         this.channelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
                                                                 Executors.newCachedThreadPool());
-        bookieWatcher = new BookieWatcher(conf, this);
-        bookieWatcher.readBookiesBlocking();
         mainWorkerPool = new OrderedSafeExecutor(conf.getNumWorkerThreads());
         bookieClient = new BookieClient(conf, channelFactory, mainWorkerPool);
-        // initialize ledger meta manager
+        bookieWatcher = new BookieWatcher(conf, this);
+        bookieWatcher.readBookiesBlocking();
+
         ledgerManager = LedgerManagerFactory.newLedgerManager(conf, zk);
 
         ownChannelFactory = true;
@@ -176,49 +184,33 @@ public class BookKeeper {
      *          {@link ClientConfiguration}
      * @param zk
      *          Zookeeper client instance connected to the zookeeper with which
-     *          the bookies have registered
+     *          the bookies have registered. The ZooKeeper client must be connected
+     *          before it is passed to BookKeeper. Otherwise a KeeperException is thrown.
      * @param channelFactory
      *          A factory that will be used to create connections to the bookies
      * @throws IOException
      * @throws InterruptedException
-     * @throws KeeperException
+     * @throws KeeperException if the passed zk handle is not connected
      */
     public BookKeeper(ClientConfiguration conf, ZooKeeper zk, ClientSocketChannelFactory channelFactory)
             throws IOException, InterruptedException, KeeperException {
         if (zk == null || channelFactory == null) {
             throw new NullPointerException();
         }
+        if (!zk.getState().isConnected()) {
+            LOG.error("Unconnected zookeeper handle passed to bookkeeper");
+            throw KeeperException.create(KeeperException.Code.CONNECTIONLOSS);
+        }
         this.conf = conf;
         this.zk = zk;
         this.channelFactory = channelFactory;
-        bookieWatcher = new BookieWatcher(conf, this);
-        bookieWatcher.readBookiesBlocking();
+
         mainWorkerPool = new OrderedSafeExecutor(conf.getNumWorkerThreads());
         bookieClient = new BookieClient(conf, channelFactory, mainWorkerPool);
-        // initialize ledger meta manager
-        ledgerManager = LedgerManagerFactory.newLedgerManager(conf, zk);
-    }
+        bookieWatcher = new BookieWatcher(conf, this);
+        bookieWatcher.readBookiesBlocking();
 
-    void withZKConnected(final ZKConnectCallback cb) {
-        if (ownZKHandle) {
-            mainWorkerPool.submit(new SafeRunnable() {
-                    @Override
-                    public void safeRun() {
-                        try {
-                            if (!connectLatch.await(zkConnectTimeoutMs, TimeUnit.MILLISECONDS)) {
-                                cb.connectionFailed(BKException.Code.ZKException);
-                            } else {
-                                cb.connected();
-                            }
-                        } catch (InterruptedException ie) {
-                            // someone trying to kill the process
-                            cb.connectionFailed(BKException.Code.InterruptedException);
-                        }
-                    }
-                });
-        } else {
-            cb.connected();
-        }
+        ledgerManager = LedgerManagerFactory.newLedgerManager(conf, zk);
     }
 
     LedgerManager getLedgerManager() {
@@ -278,15 +270,8 @@ public class BookKeeper {
      */
     public void asyncCreateLedger(final int ensSize, final int qSize, final DigestType digestType,
                                   final byte[] passwd, final CreateCallback cb, final Object ctx) {
-        withZKConnected(new ZKConnectCallback() {
-                public void connected() {
-                    new LedgerCreateOp(BookKeeper.this, ensSize, qSize, digestType, passwd, cb, ctx)
-                        .initiate();
-                }
-                public void connectionFailed(int code) {
-                    cb.createComplete(code, null, ctx);
-                }
-            });
+        new LedgerCreateOp(BookKeeper.this, ensSize, qSize, digestType, passwd, cb, ctx)
+            .initiate();
     }
 
 
@@ -370,14 +355,7 @@ public class BookKeeper {
      */
     public void asyncOpenLedger(final long lId, final DigestType digestType, final byte passwd[],
                                 final OpenCallback cb, final Object ctx) {
-        withZKConnected(new ZKConnectCallback() {
-                public void connected() {
-                    new LedgerOpenOp(BookKeeper.this, lId, digestType, passwd, cb, ctx).initiate();
-                }
-                public void connectionFailed(int code) {
-                    cb.openComplete(code, null, ctx);
-                }
-            });
+        new LedgerOpenOp(BookKeeper.this, lId, digestType, passwd, cb, ctx).initiate();
     }
 
     /**
@@ -409,14 +387,7 @@ public class BookKeeper {
      */
     public void asyncOpenLedgerNoRecovery(final long lId, final DigestType digestType, final byte passwd[],
                                           final OpenCallback cb, final Object ctx) {
-        withZKConnected(new ZKConnectCallback() {
-                public void connected() {
-                    new LedgerOpenOp(BookKeeper.this, lId, digestType, passwd, cb, ctx).initiateWithoutRecovery();
-                }
-                public void connectionFailed(int code) {
-                    cb.openComplete(code, null, ctx);
-                }
-            });
+        new LedgerOpenOp(BookKeeper.this, lId, digestType, passwd, cb, ctx).initiateWithoutRecovery();
     }
 
 
@@ -502,14 +473,7 @@ public class BookKeeper {
      *            optional control object
      */
     public void asyncDeleteLedger(final long lId, final DeleteCallback cb, final Object ctx) {
-        withZKConnected(new ZKConnectCallback() {
-                public void connected() {
-                    new LedgerDeleteOp(BookKeeper.this, lId, cb, ctx).initiate();
-                }
-                public void connectionFailed(int code) {
-                    cb.deleteComplete(code, ctx);
-                }
-            });
+        new LedgerDeleteOp(BookKeeper.this, lId, cb, ctx).initiate();
     }
 
 

Modified: zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java?rev=1347417&r1=1347416&r2=1347417&view=diff
==============================================================================
--- zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java (original)
+++ zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java Thu Jun  7 06:37:46 2012
@@ -30,6 +30,8 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
@@ -127,18 +129,25 @@ public class BookKeeperAdmin {
      */
     public BookKeeperAdmin(ClientConfiguration conf) throws IOException, InterruptedException, KeeperException {
         // Create the ZooKeeper client instance
+        final CountDownLatch latch = new CountDownLatch(1);
         zk = new ZooKeeper(conf.getZkServers(), conf.getZkTimeout(), new Watcher() {
             @Override
             public void process(WatchedEvent event) {
+                latch.countDown();
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("Process: " + event.getType() + " " + event.getPath());
                 }
             }
         });
+        if (!latch.await(conf.getZkTimeout(), TimeUnit.MILLISECONDS)
+            || !zk.getState().isConnected()) {
+            throw KeeperException.create(KeeperException.Code.CONNECTIONLOSS);
+        }
         // Create the bookie path
         bookiesPath = conf.getZkAvailableBookiesPath();
         // Create the BookKeeper client instance
-        bkc = new BookKeeper(conf);
+        bkc = new BookKeeper(conf, zk);
+
         DIGEST_TYPE = conf.getBookieRecoveryDigestType();
         PASSWD = conf.getBookieRecoveryPasswd();
     }

Modified: zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java?rev=1347417&r1=1347416&r2=1347417&view=diff
==============================================================================
--- zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java (original)
+++ zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java Thu Jun  7 06:37:46 2012
@@ -115,10 +115,10 @@ class BookieWatcher implements Watcher, 
             newBookieAddrs.add(bookieAddr);
         }
 
-        HashSet<InetSocketAddress> deadBookies = (HashSet<InetSocketAddress>)knownBookies.clone();
-        deadBookies.removeAll(newBookieAddrs);
-
+        final HashSet<InetSocketAddress> deadBookies;
         synchronized (this) {
+            deadBookies = (HashSet<InetSocketAddress>)knownBookies.clone();
+            deadBookies.removeAll(newBookieAddrs);
             knownBookies = newBookieAddrs;
         }
 

Modified: zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/LocalBookKeeper.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/LocalBookKeeper.java?rev=1347417&r1=1347416&r2=1347417&view=diff
==============================================================================
--- zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/LocalBookKeeper.java (original)
+++ zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/LocalBookKeeper.java Thu Jun  7 06:37:46 2012
@@ -26,23 +26,20 @@ import java.io.OutputStream;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.Socket;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.bookkeeper.bookie.BookieException;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.proto.BookieServer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.server.NIOServerCnxnFactory;
 import org.apache.zookeeper.server.ZooKeeperServer;
@@ -68,6 +65,7 @@ public class LocalBookKeeper {
     ZooKeeperServer zks;
     ZooKeeper zkc;
     int ZooKeeperDefaultPort = 2181;
+    static int zkSessionTimeOut = 5000;
     File ZkTmpDir;
 
     //BookKeeper variables
@@ -104,11 +102,14 @@ public class LocalBookKeeper {
         LOG.debug("ZooKeeper server up: " + b);
     }
 
-    private void initializeZookeper() {
+    private void initializeZookeper() throws IOException {
         LOG.info("Instantiate ZK Client");
         //initialize the zk client with values
         try {
-            zkc = new ZooKeeper("127.0.0.1", ZooKeeperDefaultPort, new emptyWatcher());
+            ZKConnectionWatcher zkConnectionWatcher = new ZKConnectionWatcher();
+            zkc = new ZooKeeper(HOSTPORT, zkSessionTimeOut,
+                    zkConnectionWatcher);
+            zkConnectionWatcher.waitForConnection();
             zkc.create("/ledgers", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
             zkc.create("/ledgers/available", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
             // No need to create an entry for each requested bookie anymore as the
@@ -119,9 +120,6 @@ public class LocalBookKeeper {
         } catch (InterruptedException e) {
             // TODO Auto-generated catch block
             LOG.error("Interrupted while creating znodes", e);
-        } catch (IOException e) {
-            // TODO Auto-generated catch block
-            LOG.error("Exception while creating znodes", e);
         }
     }
     private void runBookies(ServerConfiguration baseConf) 
@@ -184,9 +182,30 @@ public class LocalBookKeeper {
         System.err.println("Usage: LocalBookKeeper number-of-bookies");
     }
 
-    /*	User for testing purposes, void */
-    static class emptyWatcher implements Watcher {
-        public void process(WatchedEvent event) {}
+    /* Watching SyncConnected event from ZooKeeper */
+    static class ZKConnectionWatcher implements Watcher {
+        private CountDownLatch clientConnectLatch = new CountDownLatch(1);
+
+        @Override
+        public void process(WatchedEvent event) {
+            if (event.getState() == KeeperState.SyncConnected) {
+                clientConnectLatch.countDown();
+            }
+        }
+
+        // Waiting for the SyncConnected event from the ZooKeeper server
+        public void waitForConnection() throws IOException {
+            try {
+                if (!clientConnectLatch.await(zkSessionTimeOut,
+                        TimeUnit.MILLISECONDS)) {
+                    throw new IOException(
+                            "Couldn't connect to zookeeper server");
+                }
+            } catch (InterruptedException e) {
+                throw new IOException(
+                        "Interrupted when connecting to zookeeper server", e);
+            }
+        }
     }
 
     public static boolean waitForServerUp(String hp, long timeout) {

Modified: zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java?rev=1347417&r1=1347416&r2=1347417&view=diff
==============================================================================
--- zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java (original)
+++ zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java Thu Jun  7 06:37:46 2012
@@ -126,6 +126,41 @@ public class LedgerCacheTest extends Tes
     }
 
     @Test
+    public void testDeleteLedger() throws Exception {
+        int numEntries = 10;
+        // limit open files & pages
+        conf.setOpenFileLimit(999).setPageLimit(2)
+            .setPageSize(8 * numEntries);
+        // create ledger cache
+        newLedgerCache();
+        try {
+            int numLedgers = 2;
+            byte[] masterKey = "blah".getBytes();
+            for (int i=1; i<=numLedgers; i++) {
+                ledgerCache.setMasterKey((long)i, masterKey);
+                for (int j=0; j<numEntries; j++) {
+                    ledgerCache.putEntryOffset(i, j, i*numEntries + j);
+                }
+            }
+            // ledger cache is exhausted
+            // delete ledgers
+            for (int i=1; i<=numLedgers; i++) {
+                ledgerCache.deleteLedger((long)i);
+            }
+            // create num ledgers to add entries
+            for (int i=numLedgers+1; i<=2*numLedgers; i++) {
+                ledgerCache.setMasterKey((long)i, masterKey);
+                for (int j=0; j<numEntries; j++) {
+                    ledgerCache.putEntryOffset(i, j, i*numEntries + j);
+                }
+            }
+        } catch (Exception e) {
+            LOG.error("Got Exception.", e);
+            fail("Failed to add entry.");
+        }
+    }
+
+    @Test
     public void testPageEviction() throws Exception {
         int numLedgers = 10;
         byte[] masterKey = "blah".getBytes();

Modified: zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadTimeout.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadTimeout.java?rev=1347417&r1=1347416&r2=1347417&view=diff
==============================================================================
--- zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadTimeout.java (original)
+++ zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadTimeout.java Thu Jun  7 06:37:46 2012
@@ -88,7 +88,7 @@ public class TestReadTimeout extends Boo
                         completed.set(true);
                     }
                 }, null);
-        Thread.sleep((baseClientConf.getReadTimeout()*2)*1000);
+        Thread.sleep((baseClientConf.getReadTimeout()*3)*1000);
         Assert.assertTrue("Write request did not finish", completed.get());
 
         Set<InetSocketAddress> afterSet = new HashSet<InetSocketAddress>();

Modified: zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperUtil.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperUtil.java?rev=1347417&r1=1347416&r2=1347417&view=diff
==============================================================================
--- zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperUtil.java (original)
+++ zookeeper/bookkeeper/tags/release-4.1.0/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperUtil.java Thu Jun  7 06:37:46 2012
@@ -22,6 +22,8 @@
 package org.apache.bookkeeper.test;
 
 import java.io.File;
+import java.io.IOException;
+
 import java.net.InetSocketAddress;
 
 import org.apache.commons.io.FileUtils;
@@ -115,6 +117,31 @@ public class ZooKeeperUtil {
         zkc.create("/ledgers/available", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
     }
 
+    public void sleepServer(final int seconds, final CountDownLatch l)
+            throws InterruptedException, IOException {
+        Thread[] allthreads = new Thread[Thread.activeCount()];
+        Thread.enumerate(allthreads);
+        for (final Thread t : allthreads) {
+            if (t.getName().contains("SyncThread:0")) {
+                Thread sleeper = new Thread() {
+                    public void run() {
+                        try {
+                            t.suspend();
+                            l.countDown();
+                            Thread.sleep(seconds*1000);
+                            t.resume();
+                        } catch (Exception e) {
+                            LOG.error("Error suspending thread", e);
+                        }
+                    }
+                };
+                sleeper.start();
+                return;
+            }
+        }
+        throw new IOException("ZooKeeper thread not found");
+    }
+
     public void killServer() throws Exception {
         if (zkc != null) {
             zkc.close();

Modified: zookeeper/bookkeeper/tags/release-4.1.0/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/TestZkSubscriptionManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/tags/release-4.1.0/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/TestZkSubscriptionManager.java?rev=1347417&r1=1347416&r2=1347417&view=diff
==============================================================================
--- zookeeper/bookkeeper/tags/release-4.1.0/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/TestZkSubscriptionManager.java (original)
+++ zookeeper/bookkeeper/tags/release-4.1.0/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/TestZkSubscriptionManager.java Thu Jun  7 06:37:46 2012
@@ -31,6 +31,7 @@ import org.apache.hedwig.protocol.PubSub
 import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
 import org.apache.hedwig.server.common.ServerConfiguration;
 import org.apache.hedwig.server.topics.TrivialOwnAllTopicManager;
+import org.apache.hedwig.server.persistence.LocalDBPersistenceManager;
 import org.apache.hedwig.util.ConcurrencyUtils;
 import org.apache.hedwig.util.Either;
 import org.apache.hedwig.util.Callback;
@@ -50,7 +51,8 @@ public class TestZkSubscriptionManager e
         super.setUp();
         cfg = new ServerConfiguration();
         final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
-        sm = new ZkSubscriptionManager(zk, new TrivialOwnAllTopicManager(cfg, scheduler), null, cfg, scheduler);
+        sm = new ZkSubscriptionManager(zk, new TrivialOwnAllTopicManager(cfg, scheduler),
+                                       LocalDBPersistenceManager.instance(), cfg, scheduler);
         msgIdCallback = new Callback<MessageSeqId>() {
             @Override
             public void operationFailed(Object ctx, final PubSubException exception) {

Modified: zookeeper/bookkeeper/tags/release-4.1.0/pom.xml
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/tags/release-4.1.0/pom.xml?rev=1347417&r1=1347416&r2=1347417&view=diff
==============================================================================
--- zookeeper/bookkeeper/tags/release-4.1.0/pom.xml (original)
+++ zookeeper/bookkeeper/tags/release-4.1.0/pom.xml Thu Jun  7 06:37:46 2012
@@ -45,6 +45,11 @@
   <build>
     <plugins>
       <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>findbugs-maven-plugin</artifactId>
+        <version>2.3.2</version>
+      </plugin>
+      <plugin>
         <artifactId>maven-compiler-plugin</artifactId>
         <version>2.3.2</version>
         <configuration>
@@ -122,11 +127,6 @@
         <version>2.1</version>
       </plugin>
       <plugin>
-        <groupId>org.codehaus.mojo</groupId>
-        <artifactId>findbugs-maven-plugin</artifactId>
-        <version>2.1</version>
-      </plugin>
-      <plugin>
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-pmd-plugin</artifactId>
         <version>2.3</version>