You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ma...@apache.org on 2010/01/27 00:16:49 UTC

svn commit: r903483 [5/6] - in /hadoop/zookeeper/trunk: ./ src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/ src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/ src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/ src/cont...

Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/util/SafeRunnable.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/util/SafeRunnable.java?rev=903483&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/util/SafeRunnable.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/util/SafeRunnable.java Tue Jan 26 23:16:45 2010
@@ -0,0 +1,38 @@
+package org.apache.bookkeeper.util;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.log4j.Logger;
+
+public abstract class SafeRunnable implements Runnable{
+
+    static final Logger logger = Logger.getLogger(SafeRunnable.class);
+    
+@Override
+    public void run() {
+        try{
+            safeRun();
+        }catch(Throwable t){
+            logger.fatal("Unexpected throwable caught ", t);
+        }
+    }
+    
+    public abstract void safeRun();
+    
+}

Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/util/StringUtils.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/util/StringUtils.java?rev=903483&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/util/StringUtils.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/util/StringUtils.java Tue Jan 26 23:16:45 2010
@@ -0,0 +1,94 @@
+package org.apache.bookkeeper.util;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+/**
+ * Provided utilites for parsing network addresses, ledger-id from node paths
+ * etc.
+ * 
+ */
+public class StringUtils {
+
+    /*
+     * Path to ledger metadata. ZooKeeper appends a sequence number to L.
+     */
+    static public final String prefix = "/ledgers/L";
+
+    /**
+     * Parses address into IP and port.
+     * 
+     * @param addr
+     *            String
+     */
+
+    public static InetSocketAddress parseAddr(String s) throws IOException {
+
+        String parts[] = s.split(":");
+        if (parts.length != 2) {
+            throw new IOException(s + " does not have the form host:port");
+        }
+        int port;
+        try {
+            port = Integer.parseInt(parts[1]);
+        } catch (NumberFormatException e) {
+            throw new IOException(s + " does not have the form host:port");
+        }
+
+        InetSocketAddress addr = new InetSocketAddress(parts[0], port);
+        return addr;
+    }
+
+    public static StringBuilder addrToString(StringBuilder sb, InetSocketAddress addr) {
+        return sb.append(addr.getAddress().getHostAddress()).append(":").append(addr.getPort());
+    }
+
+    /**
+     * Formats ledger ID according to ZooKeeper rules
+     * 
+     * @param id
+     *            znode id
+     */
+    public static String getZKStringId(long id) {
+        return String.format("%010d", id);
+    }
+
+    /**
+     * Get the path for the ledger metadata node
+     * 
+     * @return
+     */
+    public static String getLedgerNodePath(long ledgerId) {
+        return prefix + StringUtils.getZKStringId(ledgerId);
+    }
+
+    public static long getLedgerId(String nodeName) throws IOException {
+        long ledgerId;
+        try {
+            String parts[] = nodeName.split(prefix);
+            ledgerId = Long.parseLong(parts[parts.length - 1]);
+        } catch (NumberFormatException e) {
+            throw new IOException(e);
+        }
+        return ledgerId;
+    }
+
+}

Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/AsyncLedgerOpsTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/AsyncLedgerOpsTest.java?rev=903483&r1=903482&r2=903483&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/AsyncLedgerOpsTest.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/AsyncLedgerOpsTest.java Tue Jan 26 23:16:45 2010
@@ -1,4 +1,5 @@
 package org.apache.bookkeeper.test;
+
 /*
  * 
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -20,192 +21,158 @@
  * 
  */
 
-
-import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Enumeration;
 import java.util.Random;
 import java.util.Set;
 
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
-import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerEntry;
 import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
 import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
 import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
 import org.apache.bookkeeper.client.LedgerHandle;
-import org.apache.bookkeeper.client.LedgerSequence;
 import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
-import org.apache.bookkeeper.client.LedgerHandle.QMode;
-import org.apache.bookkeeper.proto.BookieServer;
-import org.apache.bookkeeper.streaming.LedgerInputStream;
-import org.apache.bookkeeper.streaming.LedgerOutputStream;
-
-import org.apache.log4j.ConsoleAppender;
-import org.apache.log4j.Level;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.log4j.Logger;
-import org.apache.log4j.PatternLayout;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.server.NIOServerCnxn;
-import org.apache.zookeeper.server.ZooKeeperServer;
-import org.apache.zookeeper.test.ClientBase;
-
+import org.junit.Before;
 import org.junit.Test;
-
-//import BookieReadWriteTest.SyncObj;
-//import BookieReadWriteTest.emptyWatcher;
+import org.junit.runners.Parameterized.Parameters;
 
 /**
- * This test tests read and write, synchronous and 
- * asynchronous, strings and integers for a BookKeeper client. 
- * The test deployment uses a ZooKeeper server 
- * and three BookKeepers. 
+ * This test tests read and write, synchronous and asynchronous, strings and
+ * integers for a BookKeeper client. The test deployment uses a ZooKeeper server
+ * and three BookKeepers.
  * 
  */
-
-public class AsyncLedgerOpsTest 
-    extends junit.framework.TestCase 
-    implements AddCallback, 
-    ReadCallback, 
-    CreateCallback,
-    CloseCallback,
-    OpenCallback{
+public class AsyncLedgerOpsTest extends BaseTestCase implements AddCallback, ReadCallback, CreateCallback,
+        CloseCallback, OpenCallback {
     static Logger LOG = Logger.getLogger(BookieClientTest.class);
 
-    static ConsoleAppender ca = new ConsoleAppender(new PatternLayout());
-
-    // ZooKeeper related variables
-    private static final String HOSTPORT = "127.0.0.1:2181";
-    static Integer ZooKeeperDefaultPort = 2181;
-    ZooKeeperServer zks;
-    ZooKeeper zkc; //zookeeper client
-    NIOServerCnxn.Factory serverFactory;
-    File ZkTmpDir;
+    DigestType digestType;
+    
+    public AsyncLedgerOpsTest(DigestType digestType) {
+        super(3);
+        this.digestType = digestType;
+    }
+    
+    @Parameters
+    public static Collection<Object[]> configs(){
+        return Arrays.asList(new Object[][]{ {DigestType.MAC }, {DigestType.CRC32}});
+    }
     
-    //BookKeeper 
-    File tmpDirB1, tmpDirB2, tmpDirB3;
-    BookieServer bs1, bs2, bs3;
-    Integer initialPort = 5000;
-    BookKeeper bkc; // bookkeeper client
+ 
     byte[] ledgerPassword = "aaa".getBytes();
     LedgerHandle lh, lh2;
     long ledgerId;
-    LedgerSequence ls;
-    
-    //test related variables 
+    Enumeration<LedgerEntry> ls;
+
+    // test related variables
     int numEntriesToWrite = 20;
     int maxInt = 2147483647;
-    Random rng; // Random Number Generator 
+    Random rng; // Random Number Generator
     ArrayList<byte[]> entries; // generated entries
     ArrayList<Integer> entriesSize;
-    
+
     // Synchronization
     SyncObj sync;
     Set<Object> syncObjs;
-    
+
     class SyncObj {
         int counter;
-        boolean value;      
+        boolean value;
+
         public SyncObj() {
             counter = 0;
             value = false;
-        }       
+        }
     }
-    
-    class ControlObj{
+
+    class ControlObj {
         LedgerHandle lh;
-        
-        void setLh(LedgerHandle lh){
+
+        void setLh(LedgerHandle lh) {
             this.lh = lh;
         }
-        
-        LedgerHandle getLh(){
+
+        LedgerHandle getLh() {
             return lh;
         }
     }
-    
+
     @Test
-    public void testAsyncCreateClose() throws IOException{
+    public void testAsyncCreateClose() throws IOException {
         try {
-            // Create a BookKeeper client and a ledger
-            bkc = new BookKeeper("127.0.0.1");
-           
-            ControlObj ctx = new ControlObj();
             
-            synchronized(ctx){
-                bkc.asyncCreateLedger(3, 2, 
-                    QMode.VERIFIABLE, 
-                    ledgerPassword,
-                    this,
-                    ctx);
+            ControlObj ctx = new ControlObj();
+
+            synchronized (ctx) {
+                LOG.info("Going to create ledger asynchronously");
+                bkc.asyncCreateLedger(3, 2, digestType, ledgerPassword, this, ctx);
+
                 ctx.wait();
             }
-            
-            
-            //bkc.initMessageDigest("SHA1");
+
+            // bkc.initMessageDigest("SHA1");
             LedgerHandle lh = ctx.getLh();
             ledgerId = lh.getId();
             LOG.info("Ledger ID: " + lh.getId());
-            for(int i = 0; i < numEntriesToWrite; i++){
+            for (int i = 0; i < numEntriesToWrite; i++) {
                 ByteBuffer entry = ByteBuffer.allocate(4);
                 entry.putInt(rng.nextInt(maxInt));
                 entry.position(0);
-                
+
                 entries.add(entry.array());
                 entriesSize.add(entry.array().length);
                 lh.asyncAddEntry(entry.array(), this, sync);
             }
-            
+
             // wait for all entries to be acknowledged
             synchronized (sync) {
-                if (sync.counter < numEntriesToWrite){
+                while (sync.counter < numEntriesToWrite) {
                     LOG.debug("Entries counter = " + sync.counter);
                     sync.wait();
                 }
             }
-            
+
             LOG.debug("*** WRITE COMPLETE ***");
-            // close ledger 
-            synchronized(ctx){
+            // close ledger
+            synchronized (ctx) {
                 lh.asyncClose(this, ctx);
                 ctx.wait();
             }
-            
-            //*** WRITING PART COMPLETE // READ PART BEGINS ***
-            
+
+            // *** WRITING PART COMPLETE // READ PART BEGINS ***
+
             // open ledger
-            synchronized(ctx){
-                bkc.asyncOpenLedger(ledgerId, ledgerPassword, this, ctx);
+            synchronized (ctx) {
+                bkc.asyncOpenLedger(ledgerId, digestType, ledgerPassword, this, ctx);
                 ctx.wait();
             }
             lh = ctx.getLh();
-            
-            LOG.debug("Number of entries written: " + lh.getLast());
-            assertTrue("Verifying number of entries written", lh.getLast() == numEntriesToWrite);       
-            
-            //read entries
-            lh.asyncReadEntries(0, numEntriesToWrite - 1, this, (Object) sync);
-            
+
+            LOG.debug("Number of entries written: " + lh.getLastAddConfirmed());
+            assertTrue("Verifying number of entries written", lh.getLastAddConfirmed() == (numEntriesToWrite - 1));
+
+            // read entries
+            lh.asyncReadEntries(0, numEntriesToWrite - 1, this, sync);
+
             synchronized (sync) {
-                while(sync.value == false){
+                while (sync.value == false) {
                     sync.wait();
-                }               
+                }
             }
-            
-            assertTrue("Checking number of read entries", ls.size() == numEntriesToWrite);
-            
+
             LOG.debug("*** READ COMPLETE ***");
-            
-            // at this point, LedgerSequence ls is filled with the returned values
+
+            // at this point, LedgerSequence ls is filled with the returned
+            // values
             int i = 0;
-            while(ls.hasMoreElements()){
+            while (ls.hasMoreElements()) {
                 ByteBuffer origbb = ByteBuffer.wrap(entries.get(i));
                 Integer origEntry = origbb.getInt();
                 byte[] entry = ls.nextElement().getEntry();
@@ -219,20 +186,17 @@
                 assertTrue("Checking entry " + i + " for size", entry.length == entriesSize.get(i).intValue());
                 i++;
             }
+            assertTrue("Checking number of read entries", i == numEntriesToWrite);
             lh.close();
-        } catch (KeeperException e) {
-            e.printStackTrace();
-        } catch (BKException e) {
-            e.printStackTrace();
         } catch (InterruptedException e) {
-            e.printStackTrace();
-        } //catch (NoSuchAlgorithmException e) {
-        //  e.printStackTrace();
-        //}
-        
+            LOG.error(e);
+            fail("InterruptedException");
+        } // catch (NoSuchAlgorithmException e) {
+        // e.printStackTrace();
+        // }
+
     }
-    
-    
+
     public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
         SyncObj x = (SyncObj) ctx;
         synchronized (x) {
@@ -241,154 +205,52 @@
         }
     }
 
-    public void readComplete(int rc, LedgerHandle lh, LedgerSequence seq,
-            Object ctx) {
-        ls = seq;               
+    public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx) {
+        ls = seq;
         synchronized (sync) {
             sync.value = true;
             sync.notify();
         }
-        
+
     }
-    
-    public void createComplete(int rc, LedgerHandle lh, Object ctx){
-        synchronized(ctx){
+
+    public void createComplete(int rc, LedgerHandle lh, Object ctx) {
+        synchronized (ctx) {
             ControlObj cobj = (ControlObj) ctx;
             cobj.setLh(lh);
             cobj.notify();
-        }   
+        }
     }
-    
-    public void openComplete(int rc, LedgerHandle lh, Object ctx){
-        synchronized(ctx){
+
+    public void openComplete(int rc, LedgerHandle lh, Object ctx) {
+        synchronized (ctx) {
             ControlObj cobj = (ControlObj) ctx;
             cobj.setLh(lh);
             cobj.notify();
-        }   
+        }
     }
-    
-    public void closeComplete(int rc, LedgerHandle lh, Object ctx){
-        synchronized(ctx){
+
+    public void closeComplete(int rc, LedgerHandle lh, Object ctx) {
+        synchronized (ctx) {
             ControlObj cobj = (ControlObj) ctx;
             cobj.notify();
         }
     }
-     
-    protected void setUp() throws IOException {
-        LOG.addAppender(ca);
-        LOG.setLevel((Level) Level.DEBUG);
-        
-        // create a ZooKeeper server(dataDir, dataLogDir, port)
-        LOG.debug("Running ZK server");
-        //ServerStats.registerAsConcrete();
-        ClientBase.setupTestEnv();
-        ZkTmpDir = File.createTempFile("zookeeper", "test");
-        ZkTmpDir.delete();
-        ZkTmpDir.mkdir();
-            
-        try {
-            zks = new ZooKeeperServer(ZkTmpDir, ZkTmpDir, ZooKeeperDefaultPort);
-            serverFactory =  new NIOServerCnxn.Factory(ZooKeeperDefaultPort);
-            serverFactory.startup(zks);
-        } catch (IOException e1) {
-            // TODO Auto-generated catch block
-            e1.printStackTrace();
-        } catch (InterruptedException e) {
-            // TODO Auto-generated catch block
-            e.printStackTrace();
-        }
-        boolean b = ClientBase.waitForServerUp(HOSTPORT, ClientBase.CONNECTION_TIMEOUT);
-        
-        LOG.debug("Server up: " + b);
-        
-        // create a zookeeper client
-        LOG.debug("Instantiate ZK Client");
-        zkc = new ZooKeeper("127.0.0.1", ZooKeeperDefaultPort, new emptyWatcher());
-        
-        //initialize the zk client with values
-        try {
-            zkc.create("/ledgers", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-            zkc.create("/ledgers/available", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-            zkc.create("/ledgers/available/127.0.0.1:" + Integer.toString(initialPort), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-            zkc.create("/ledgers/available/127.0.0.1:" + Integer.toString(initialPort + 1), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-            zkc.create("/ledgers/available/127.0.0.1:" + Integer.toString(initialPort + 2), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-        } catch (KeeperException e) {
-            // TODO Auto-generated catch block
-            e.printStackTrace();
-        } catch (InterruptedException e) {
-            // TODO Auto-generated catch block
-            e.printStackTrace();
-        }
-        
-        // Create Bookie Servers (B1, B2, B3)
-        tmpDirB1 = File.createTempFile("bookie1", "test");
-        tmpDirB1.delete();
-        tmpDirB1.mkdir();
-         
-        bs1 = new BookieServer(initialPort, tmpDirB1, new File[]{tmpDirB1});
-        bs1.start();
-        
-        tmpDirB2 = File.createTempFile("bookie2", "test");
-        tmpDirB2.delete();
-        tmpDirB2.mkdir();
-            
-        bs2 = new BookieServer(initialPort + 1, tmpDirB2, new File[]{tmpDirB2});
-        bs2.start();
 
-        tmpDirB3 = File.createTempFile("bookie3", "test");
-        tmpDirB3.delete();
-        tmpDirB3.mkdir();
-        
-        bs3 = new BookieServer(initialPort + 2, tmpDirB3, new File[]{tmpDirB3});
-        bs3.start();
-        
-        rng = new Random(System.currentTimeMillis());   // Initialize the Random Number Generator 
-        entries = new ArrayList<byte[]>(); // initialize the  entries list
-        entriesSize = new ArrayList<Integer>(); 
+
+    @Before
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+        rng = new Random(System.currentTimeMillis()); // Initialize the Random
+                                                      // Number Generator
+        entries = new ArrayList<byte[]>(); // initialize the entries list
+        entriesSize = new ArrayList<Integer>();
         sync = new SyncObj(); // initialize the synchronization data structure
     }
+
     
-    protected void tearDown(){
-        LOG.info("TearDown");
 
-        //shutdown bookie servers 
-        try {
-            bs1.shutdown();
-            bs2.shutdown();
-            bs3.shutdown();
-        } catch (InterruptedException e) {
-            e.printStackTrace();
-        }
-        cleanUpDir(tmpDirB1);
-        cleanUpDir(tmpDirB2);
-        cleanUpDir(tmpDirB3);
-        
-        //shutdown ZK server
-        serverFactory.shutdown();
-        assertTrue("waiting for server down",
-                ClientBase.waitForServerDown(HOSTPORT,
-                                             ClientBase.CONNECTION_TIMEOUT));
-        //ServerStats.unregister();
-        cleanUpDir(ZkTmpDir);
-        
-    }
-
-    /*  Clean up a directory recursively */
-    protected boolean cleanUpDir(File dir){
-        if (dir.isDirectory()) {
-            LOG.info("Cleaning up " + dir.getName());
-            String[] children = dir.list();
-            for (String string : children) {
-                boolean success = cleanUpDir(new File(dir, string));
-                if (!success) return false;
-            }
-        }
-        // The directory is now empty so delete it
-        return dir.delete();        
-    }
 
-    /*  User for testing purposes, void */
-    class emptyWatcher implements Watcher{
-        public void process(WatchedEvent event) {}
-    }
+
 }
\ No newline at end of file

Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BaseTestCase.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BaseTestCase.java?rev=903483&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BaseTestCase.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BaseTestCase.java Tue Jan 26 23:16:45 2010
@@ -0,0 +1,178 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.test;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.proto.BookieServer;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.server.NIOServerCnxn;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.test.ClientBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import junit.framework.TestCase;
+
+@RunWith(Parameterized.class)
+public abstract class BaseTestCase extends TestCase {
+    static final Logger LOG = Logger.getLogger(BaseTestCase.class);
+    // ZooKeeper related variables
+    private static final String HOSTPORT = "127.0.0.1:2181";
+    static Integer ZooKeeperDefaultPort = 2181;
+    ZooKeeperServer zks;
+    ZooKeeper zkc; // zookeeper client
+    NIOServerCnxn.Factory serverFactory;
+    File ZkTmpDir;
+
+    // BookKeeper
+    List<File> tmpDirs = new ArrayList<File>();
+    List<BookieServer> bs = new ArrayList<BookieServer>();
+    Integer initialPort = 5000;
+    int numBookies;
+    BookKeeper bkc;
+
+    public BaseTestCase(int numBookies) {
+        this.numBookies = numBookies;
+    }
+    
+    @Parameters
+    public static Collection<Object[]> configs(){
+        return Arrays.asList(new Object[][]{ {DigestType.MAC }, {DigestType.CRC32}});
+    }
+
+
+    @Before
+    @Override
+    public void setUp() throws Exception {
+        try {
+        // create a ZooKeeper server(dataDir, dataLogDir, port)
+        LOG.debug("Running ZK server");
+        // ServerStats.registerAsConcrete();
+        ClientBase.setupTestEnv();
+        ZkTmpDir = File.createTempFile("zookeeper", "test");
+        ZkTmpDir.delete();
+        ZkTmpDir.mkdir();
+
+        zks = new ZooKeeperServer(ZkTmpDir, ZkTmpDir, ZooKeeperDefaultPort);
+        serverFactory = new NIOServerCnxn.Factory(ZooKeeperDefaultPort);
+        serverFactory.startup(zks);
+
+        boolean b = ClientBase.waitForServerUp(HOSTPORT, ClientBase.CONNECTION_TIMEOUT);
+
+        LOG.debug("Server up: " + b);
+
+        // create a zookeeper client
+        LOG.debug("Instantiate ZK Client");
+        zkc = new ZooKeeper("127.0.0.1", ZooKeeperDefaultPort, new emptyWatcher());
+
+        // initialize the zk client with values
+        zkc.create("/ledgers", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        zkc.create("/ledgers/available", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        for (int i = 0; i < numBookies; i++) {
+            zkc.create("/ledgers/available/127.0.0.1:" + Integer.toString(initialPort + i), new byte[0],
+                    Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        }
+
+        // Create Bookie Servers (B1, B2, B3)
+        for (int i = 0; i < numBookies; i++) {
+            File f = File.createTempFile("bookie", "test");
+            tmpDirs.add(f);
+            f.delete();
+            f.mkdir();
+
+            BookieServer server = new BookieServer(initialPort + i, f, new File[] { f });
+            server.start();
+            bs.add(server);
+        }
+        zkc.close();
+        bkc = new BookKeeper("127.0.0.1");
+        } catch(Exception e) {
+            e.printStackTrace();
+            throw e;
+        }
+    }
+
+    @After
+    @Override
+    public void tearDown() throws Exception {
+        LOG.info("TearDown");
+
+        if (bkc != null) {
+            bkc.halt();;
+        }
+        
+        for (BookieServer server : bs) {
+            server.shutdown();
+        }
+
+        for (File f : tmpDirs) {
+            cleanUpDir(f);
+        }
+
+        // shutdown ZK server
+        if (serverFactory != null) {
+            serverFactory.shutdown();
+            assertTrue("waiting for server down", ClientBase.waitForServerDown(HOSTPORT, ClientBase.CONNECTION_TIMEOUT));
+        }
+        // ServerStats.unregister();
+        cleanUpDir(ZkTmpDir);
+        
+
+    }
+
+    /* Clean up a directory recursively */
+    protected boolean cleanUpDir(File dir) {
+        if (dir.isDirectory()) {
+            LOG.info("Cleaning up " + dir.getName());
+            String[] children = dir.list();
+            for (String string : children) {
+                boolean success = cleanUpDir(new File(dir, string));
+                if (!success)
+                    return false;
+            }
+        }
+        // The directory is now empty so delete it
+        return dir.delete();
+    }
+
+    /* User for testing purposes, void */
+    class emptyWatcher implements Watcher {
+        public void process(WatchedEvent event) {
+        }
+    }
+
+}

Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieClientTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieClientTest.java?rev=903483&r1=903482&r2=903483&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieClientTest.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieClientTest.java Tue Jan 26 23:16:45 2010
@@ -1,4 +1,5 @@
 package org.apache.bookkeeper.test;
+
 /*
  * 
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -20,20 +21,25 @@
  * 
  */
 
-
 import java.io.File;
+import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
+import java.util.concurrent.Executors;
 
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
 import org.junit.Test;
+import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.proto.BookieClient;
-import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.proto.BookieServer;
-import org.apache.bookkeeper.proto.ReadEntryCallback;
-import org.apache.bookkeeper.proto.WriteCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
+import org.apache.bookkeeper.util.OrderedSafeExecutor;
 import org.apache.log4j.Logger;
 
-
 import junit.framework.TestCase;
 
 public class BookieClientTest extends TestCase {
@@ -41,57 +47,64 @@
     BookieServer bs;
     File tmpDir;
     int port = 13645;
+    ClientSocketChannelFactory channelFactory;
+    OrderedSafeExecutor executor;
+
+    @Override
     protected void setUp() throws Exception {
         tmpDir = File.createTempFile("bookie", "test");
         tmpDir.delete();
         tmpDir.mkdir();
         bs = new BookieServer(port, tmpDir, new File[] { tmpDir });
         bs.start();
+        channelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors
+                .newCachedThreadPool());
+        executor = new OrderedSafeExecutor(2);
     }
+
+    @Override
     protected void tearDown() throws Exception {
         bs.shutdown();
         recursiveDelete(tmpDir);
+        channelFactory.releaseExternalResources();
+        executor.shutdown();
     }
+
     private static void recursiveDelete(File dir) {
         File children[] = dir.listFiles();
         if (children != null) {
-            for(File child: children) {
+            for (File child : children) {
                 recursiveDelete(child);
             }
         }
         dir.delete();
     }
-    
+
     static class ResultStruct {
         int rc;
         ByteBuffer entry;
     }
+
     ReadEntryCallback recb = new ReadEntryCallback() {
 
-        public void readEntryComplete(int rc, long ledgerId, long entryId,
-                ByteBuffer bb, Object ctx) {
-            ResultStruct rs = (ResultStruct)ctx;
-            synchronized(rs) {
-                LOG.info("Capacity " + bb.capacity() + ", " + bb.position());
+        public void readEntryComplete(int rc, long ledgerId, long entryId, ChannelBuffer bb, Object ctx) {
+            ResultStruct rs = (ResultStruct) ctx;
+            synchronized (rs) {
                 rs.rc = rc;
-                bb.position(bb.position()+16);
-                //if (bb.remaining() >=4) {
-                //    // Skip the len
-                //    bb.position(bb.position()+4);
-                //}
-                rs.entry = bb.slice();
-                LOG.info("Received " + bb.remaining());
-                rs.notifyAll();
+                if (bb != null) {
+                    bb.readerIndex(16);
+                    rs.entry = bb.toByteBuffer();
+                    rs.notifyAll();
+                }
             }
         }
-        
+
     };
 
     WriteCallback wrcb = new WriteCallback() {
-        public void writeComplete(int rc, long ledgerId, long entryId,
-                Object ctx) {
+        public void writeComplete(int rc, long ledgerId, long entryId, InetSocketAddress addr, Object ctx) {
             if (ctx != null) {
-                synchronized(ctx) {
+                synchronized (ctx) {
                     ctx.notifyAll();
                 }
             }
@@ -103,104 +116,114 @@
         final Object notifyObject = new Object();
         byte[] passwd = new byte[20];
         Arrays.fill(passwd, (byte) 'a');
-        
-        BookieClient bc = new BookieClient("127.0.0.1", port, 50000);
-        ByteBuffer bb;
-        bb = createByteBuffer(1,1,1);
-        bc.addEntry(1, passwd, 1, bb, wrcb, null);
-        bb = createByteBuffer(2,1,2);
-        bc.addEntry(1, passwd, 2, bb, wrcb, null);
-        bb = createByteBuffer(3,1,3);
-        bc.addEntry(1, passwd, 3, bb, wrcb, null);
-        bb = createByteBuffer(5,1,5);
-        bc.addEntry(1, passwd, 5, bb, wrcb, null);
-        bb = createByteBuffer(7,1,7);
-        bc.addEntry(1, passwd, 7, bb, wrcb, null);
-        synchronized(notifyObject) {
-            bb = createByteBuffer(11,1,11);
-            bc.addEntry(1, passwd, 11, bb, wrcb, notifyObject);
+        InetSocketAddress addr = new InetSocketAddress("127.0.0.1", port);
+        ResultStruct arc = new ResultStruct();
+
+        BookieClient bc = new BookieClient(channelFactory, executor);
+        ChannelBuffer bb;
+        bb = createByteBuffer(1, 1, 1);
+        bc.addEntry(addr, 1, passwd, 1, bb, wrcb, null);
+        synchronized (arc) {
+            bc.readEntry(addr, 1, 1, recb, arc);
+            arc.wait(1000);
+            assertEquals(0, arc.rc);
+            assertEquals(1, arc.entry.getInt());
+        }
+        bb = createByteBuffer(2, 1, 2);
+        bc.addEntry(addr, 1, passwd, 2, bb, wrcb, null);
+        bb = createByteBuffer(3, 1, 3);
+        bc.addEntry(addr, 1, passwd, 3, bb, wrcb, null);
+        bb = createByteBuffer(5, 1, 5);
+        bc.addEntry(addr, 1, passwd, 5, bb, wrcb, null);
+        bb = createByteBuffer(7, 1, 7);
+        bc.addEntry(addr, 1, passwd, 7, bb, wrcb, null);
+        synchronized (notifyObject) {
+            bb = createByteBuffer(11, 1, 11);
+            bc.addEntry(addr, 1, passwd, 11, bb, wrcb, notifyObject);
             notifyObject.wait();
         }
-        ResultStruct arc = new ResultStruct();
-        synchronized(arc) {
-            bc.readEntry(1, 6, recb, arc);
+        synchronized (arc) {
+            bc.readEntry(addr, 1, 6, recb, arc);
             arc.wait(1000);
-            assertEquals(BookieProtocol.ENOENTRY, arc.rc);
+            assertEquals(BKException.Code.NoSuchEntryException, arc.rc);
         }
-        synchronized(arc) {
-            bc.readEntry(1, 7, recb, arc);
+        synchronized (arc) {
+            bc.readEntry(addr, 1, 7, recb, arc);
             arc.wait(1000);
             assertEquals(0, arc.rc);
             assertEquals(7, arc.entry.getInt());
         }
-        synchronized(arc) {
-            bc.readEntry(1, 1, recb, arc);
+        synchronized (arc) {
+            bc.readEntry(addr, 1, 1, recb, arc);
             arc.wait(1000);
             assertEquals(0, arc.rc);
             assertEquals(1, arc.entry.getInt());
         }
-        synchronized(arc) {
-            bc.readEntry(1, 2, recb, arc);
+        synchronized (arc) {
+            bc.readEntry(addr, 1, 2, recb, arc);
             arc.wait(1000);
             assertEquals(0, arc.rc);
             assertEquals(2, arc.entry.getInt());
         }
-        synchronized(arc) {
-            bc.readEntry(1, 3, recb, arc);
+        synchronized (arc) {
+            bc.readEntry(addr, 1, 3, recb, arc);
             arc.wait(1000);
             assertEquals(0, arc.rc);
             assertEquals(3, arc.entry.getInt());
         }
-        synchronized(arc) {
-            bc.readEntry(1, 4, recb, arc);
+        synchronized (arc) {
+            bc.readEntry(addr, 1, 4, recb, arc);
             arc.wait(1000);
-            assertEquals(BookieProtocol.ENOENTRY, arc.rc);
+            assertEquals(BKException.Code.NoSuchEntryException, arc.rc);
         }
-        synchronized(arc) {
-            bc.readEntry(1, 11, recb, arc);
+        synchronized (arc) {
+            bc.readEntry(addr, 1, 11, recb, arc);
             arc.wait(1000);
             assertEquals(0, arc.rc);
             assertEquals(11, arc.entry.getInt());
         }
-        synchronized(arc) {
-            bc.readEntry(1, 5, recb, arc);
+        synchronized (arc) {
+            bc.readEntry(addr, 1, 5, recb, arc);
             arc.wait(1000);
             assertEquals(0, arc.rc);
             assertEquals(5, arc.entry.getInt());
         }
-        synchronized(arc) {
-            bc.readEntry(1, 10, recb, arc);
+        synchronized (arc) {
+            bc.readEntry(addr, 1, 10, recb, arc);
             arc.wait(1000);
-            assertEquals(BookieProtocol.ENOENTRY, arc.rc);
+            assertEquals(BKException.Code.NoSuchEntryException, arc.rc);
         }
-        synchronized(arc) {
-            bc.readEntry(1, 12, recb, arc);
+        synchronized (arc) {
+            bc.readEntry(addr, 1, 12, recb, arc);
             arc.wait(1000);
-            assertEquals(BookieProtocol.ENOENTRY, arc.rc);
+            assertEquals(BKException.Code.NoSuchEntryException, arc.rc);
         }
-        synchronized(arc) {
-            bc.readEntry(1, 13, recb, arc);
+        synchronized (arc) {
+            bc.readEntry(addr, 1, 13, recb, arc);
             arc.wait(1000);
-            assertEquals(BookieProtocol.ENOENTRY, arc.rc);
+            assertEquals(BKException.Code.NoSuchEntryException, arc.rc);
         }
     }
-    private ByteBuffer createByteBuffer(int i, long lid, long eid) {
+
+    private ChannelBuffer createByteBuffer(int i, long lid, long eid) {
         ByteBuffer bb;
-        bb = ByteBuffer.allocate(4+16);
-        bb.putInt(i);
+        bb = ByteBuffer.allocate(4 + 16);
         bb.putLong(lid);
         bb.putLong(eid);
+        bb.putInt(i);
         bb.flip();
-        return bb;
+        return ChannelBuffers.wrappedBuffer(bb);
     }
+
     @Test
     public void testNoLedger() throws Exception {
         ResultStruct arc = new ResultStruct();
-        BookieClient bc = new BookieClient("127.0.0.1", port, 50000);
-        synchronized(arc) {
-            bc.readEntry(2, 13, recb, arc);
+        InetSocketAddress addr = new InetSocketAddress("127.0.0.1", port);
+        BookieClient bc = new BookieClient(channelFactory, executor);
+        synchronized (arc) {
+            bc.readEntry(addr, 2, 13, recb, arc);
             arc.wait(1000);
-            assertEquals(BookieProtocol.ENOLEDGER, arc.rc);
+            assertEquals(BKException.Code.NoSuchEntryException, arc.rc);
         }
     }
 }

Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieFailureTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieFailureTest.java?rev=903483&r1=903482&r2=903483&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieFailureTest.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieFailureTest.java Tue Jan 26 23:16:45 2010
@@ -1,4 +1,5 @@
 package org.apache.bookkeeper.test;
+
 /*
  * 
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -20,190 +21,200 @@
  * 
  */
 
-
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
 import java.util.ArrayList;
+import java.util.Enumeration;
 import java.util.Random;
 import java.util.Set;
 
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerEntry;
 import org.apache.bookkeeper.client.LedgerHandle;
-import org.apache.bookkeeper.client.LedgerHandle.QMode;
-import org.apache.bookkeeper.client.LedgerSequence;
 import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.proto.BookieServer;
-import org.apache.bookkeeper.streaming.LedgerInputStream;
-import org.apache.bookkeeper.streaming.LedgerOutputStream;
-
-import org.apache.log4j.ConsoleAppender;
-import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
-import org.apache.log4j.PatternLayout;
-import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.server.NIOServerCnxn;
-import org.apache.zookeeper.server.ZooKeeperServer;
-import org.apache.zookeeper.test.ClientBase;
-
+import org.junit.Before;
 import org.junit.Test;
 
 /**
- * This test tests read and write, synchronous and 
- * asynchronous, strings and integers for a BookKeeper client. 
- * The test deployment uses a ZooKeeper server 
- * and three BookKeepers. 
+ * This test tests read and write, synchronous and asynchronous, strings and
+ * integers for a BookKeeper client. The test deployment uses a ZooKeeper server
+ * and three BookKeepers.
  * 
  */
 
-public class BookieFailureTest 
-    extends junit.framework.TestCase 
-    implements AddCallback, ReadCallback{
-    
+public class BookieFailureTest extends BaseTestCase implements AddCallback, ReadCallback {
 
-    //Depending on the taste, select the amount of logging
+    // Depending on the taste, select the amount of logging
     // by decommenting one of the two lines below
-    //static Logger LOG = Logger.getRootLogger();
-    static Logger LOG = Logger.getLogger(BookieReadWriteTest.class);
-
-    static ConsoleAppender ca = new ConsoleAppender(new PatternLayout());
+    // static Logger LOG = Logger.getRootLogger();
+    static Logger LOG = Logger.getLogger(BookieFailureTest.class);
 
-    // ZooKeeper related variables
-    private static final String HOSTPORT = "127.0.0.1:2181";
-    static Integer ZooKeeperDefaultPort = 2181;
-    ZooKeeperServer zks;
-    ZooKeeper zkc; //zookeeper client
-    NIOServerCnxn.Factory serverFactory;
-    File ZkTmpDir;
-    
-    //BookKeeper 
-    File tmpDirB1, tmpDirB2, tmpDirB3, tmpDirB4;
-    BookieServer bs1, bs2, bs3, bs4;
-    Integer initialPort = 5000;
-    BookKeeper bkc; // bookkeeper client
     byte[] ledgerPassword = "aaa".getBytes();
     LedgerHandle lh, lh2;
     long ledgerId;
-    LedgerSequence ls;
-    
-    //test related variables 
-    int numEntriesToWrite = 20000;
+    Enumeration<LedgerEntry> ls;
+
+    // test related variables
+    int numEntriesToWrite = 200;
     int maxInt = 2147483647;
-    Random rng; // Random Number Generator 
+    Random rng; // Random Number Generator
     ArrayList<byte[]> entries; // generated entries
     ArrayList<Integer> entriesSize;
+    DigestType digestType;
     
     // Synchronization
     SyncObj sync;
     Set<Object> syncObjs;
-    
+
     class SyncObj {
         int counter;
-        boolean value;      
+        boolean value;
+
         public SyncObj() {
             counter = 0;
             value = false;
-        }       
+        }
+    }
+
+    public BookieFailureTest(DigestType digestType) {
+        super(4);
+        this.digestType = digestType;        
     }
     
-   /**
-    * Tests writes and reads when a bookie fails.
-    *  
-    * @throws {@link IOException}
-    */
+    /**
+     * Tests writes and reads when a bookie fails.
+     * 
+     * @throws {@link IOException}
+     */
     @Test
-    public void testAsyncBK1() throws IOException{ 
+    public void testAsyncBK1() throws IOException {
         LOG.info("#### BK1 ####");
-        auxTestReadWriteAsyncSingleClient(bs1);
+        auxTestReadWriteAsyncSingleClient(bs.get(0));
+    }
+    
+    @Test
+    public void testAsyncBK2() throws IOException {
+        LOG.info("#### BK2 ####");
+        auxTestReadWriteAsyncSingleClient(bs.get(1));
+    }
+
+    @Test
+    public void testAsyncBK3() throws IOException {
+        LOG.info("#### BK3 ####");
+        auxTestReadWriteAsyncSingleClient(bs.get(2));
+    }
+
+    @Test
+    public void testAsyncBK4() throws IOException {
+        LOG.info("#### BK4 ####");
+        auxTestReadWriteAsyncSingleClient(bs.get(3));
     }
-   
-   @Test
-   public void testAsyncBK2() throws IOException{    
-       LOG.info("#### BK2 ####");
-       auxTestReadWriteAsyncSingleClient(bs2);
-   }
-   
-   @Test
-   public void testAsyncBK3() throws IOException{    
-       LOG.info("#### BK3 ####"); 
-       auxTestReadWriteAsyncSingleClient(bs3);
-   }
-   
-   @Test
-   public void testAsyncBK4() throws IOException{
-       LOG.info("#### BK4 ####");
-        auxTestReadWriteAsyncSingleClient(bs4);
-   }
     
-    void auxTestReadWriteAsyncSingleClient(BookieServer bs) throws IOException{
+    @Test
+    public void testBookieRecovery() throws Exception{
+        bkc = new BookKeeper("127.0.0.1");
+        
+        //Shutdown all but 1 bookie
+        bs.get(0).shutdown();
+        bs.get(1).shutdown();
+        bs.get(2).shutdown();
+        
+        byte[] passwd = "blah".getBytes();
+        LedgerHandle lh = bkc.createLedger(1, 1,digestType, passwd);
+        
+        int numEntries = 100;
+        for (int i=0; i< numEntries; i++){
+            byte[] data = (""+i).getBytes();
+            lh.addEntry(data);
+        }
+        
+        bs.get(3).shutdown();
+        BookieServer server = new BookieServer(initialPort + 3, tmpDirs.get(3), new File[] { tmpDirs.get(3)});
+        server.start();
+        bs.set(3, server);
+
+        assertEquals(numEntries - 1 , lh.getLastAddConfirmed());
+        Enumeration<LedgerEntry> entries = lh.readEntries(0, lh.getLastAddConfirmed());
+        
+        int numScanned = 0;
+        while (entries.hasMoreElements()){
+            assertEquals((""+numScanned), new String(entries.nextElement().getEntry()));
+            numScanned++;
+        }
+        assertEquals(numEntries, numScanned);
+        
+        
+    }
+
+    void auxTestReadWriteAsyncSingleClient(BookieServer bs) throws IOException {
         try {
             // Create a BookKeeper client and a ledger
-            bkc = new BookKeeper("127.0.0.1");
-            lh = bkc.createLedger(4, 2, QMode.VERIFIABLE, ledgerPassword);
-        
+            lh = bkc.createLedger(3, 2, digestType, ledgerPassword);
+
             ledgerId = lh.getId();
             LOG.info("Ledger ID: " + lh.getId());
-            for(int i = 0; i < numEntriesToWrite; i++){
+            for (int i = 0; i < numEntriesToWrite; i++) {
                 ByteBuffer entry = ByteBuffer.allocate(4);
                 entry.putInt(rng.nextInt(maxInt));
                 entry.position(0);
-                
+
                 entries.add(entry.array());
                 entriesSize.add(entry.array().length);
                 lh.asyncAddEntry(entry.array(), this, sync);
-                if(i == 5000){
-                    //Bookie fail
-                    bs.shutdown();
-                }
+                
             }
             
+            LOG.info("Wrote " + numEntriesToWrite + " and now going to fail bookie.");
+            // Bookie fail
+            bs.shutdown();
+
             // wait for all entries to be acknowledged
             synchronized (sync) {
-                while (sync.counter < numEntriesToWrite){
+                while (sync.counter < numEntriesToWrite) {
                     LOG.debug("Entries counter = " + sync.counter);
                     sync.wait();
                 }
             }
-            
+
             LOG.debug("*** WRITE COMPLETE ***");
-            // close ledger 
+            // close ledger
             lh.close();
-            
-            //*** WRITING PART COMPLETE // READ PART BEGINS ***
-            
+
+            // *** WRITING PART COMPLETE // READ PART BEGINS ***
+
             // open ledger
+            bkc.halt();
             bkc = new BookKeeper("127.0.0.1");
-            lh = bkc.openLedger(ledgerId, ledgerPassword);
-            LOG.debug("Number of entries written: " + lh.getLast());
-            assertTrue("Verifying number of entries written", lh.getLast() == (numEntriesToWrite - 1));     
-            
-            //read entries
-            
-            lh.asyncReadEntries(0, numEntriesToWrite - 1, this, (Object) sync);
-            
+            lh = bkc.openLedger(ledgerId, digestType, ledgerPassword);
+            LOG.debug("Number of entries written: " + (lh.getLastAddConfirmed() + 1));
+            assertTrue("Verifying number of entries written", lh.getLastAddConfirmed() == (numEntriesToWrite - 1));
+
+            // read entries
+
+            lh.asyncReadEntries(0, numEntriesToWrite - 1, this, sync);
+
             synchronized (sync) {
-                while(sync.value == false){
+                while (sync.value == false) {
                     sync.wait(10000);
                     assertTrue("Haven't received entries", sync.value);
-                }               
+                }
             }
-            
-            assertTrue("Checking number of read entries", ls.size() == numEntriesToWrite);
-            
+
             LOG.debug("*** READ COMPLETE ***");
-            
-            // at this point, LedgerSequence ls is filled with the returned values
+
+            // at this point, LedgerSequence ls is filled with the returned
+            // values
             int i = 0;
-            LOG.info("Size of ledger sequence: " + ls.size());
-            while(ls.hasMoreElements()){
+            while (ls.hasMoreElements()) {
                 ByteBuffer origbb = ByteBuffer.wrap(entries.get(i));
                 Integer origEntry = origbb.getInt();
                 byte[] entry = ls.nextElement().getEntry();
@@ -215,24 +226,26 @@
                 assertTrue("Checking entry " + i + " for size", entry.length == entriesSize.get(i).intValue());
                 i++;
             }
-            
+
+            assertTrue("Checking number of read entries", i == numEntriesToWrite);
+
             LOG.info("Verified that entries are ok, and now closing ledger");
             lh.close();
         } catch (KeeperException e) {
+            LOG.error("Caught KeeperException", e);
             fail(e.toString());
         } catch (BKException e) {
+            LOG.error("Caught BKException", e);
             fail(e.toString());
         } catch (InterruptedException e) {
+            LOG.error("Caught InterruptedException", e);
             fail(e.toString());
-        } 
-        
+        }
+
     }
-    
-    public void addComplete(int rc, 
-            LedgerHandle lh, 
-            long entryId, 
-            Object ctx) {
-        if(rc != 0)
+
+    public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
+        if (rc != 0)
             fail("Failed to write entry: " + entryId);
         SyncObj x = (SyncObj) ctx;
         synchronized (x) {
@@ -241,146 +254,52 @@
         }
     }
 
-    public void readComplete(int rc, 
-            LedgerHandle lh, 
-            LedgerSequence seq,
-            Object ctx) {
-        if(rc != 0)
+    public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx) {
+        if (rc != 0)
             fail("Failed to write entry");
-        ls = seq;               
+        ls = seq;
         synchronized (sync) {
             sync.value = true;
             sync.notify();
         }
-        
+
     }
-    
-    protected void setUp() throws IOException, InterruptedException {
-        LOG.addAppender(ca);
-        LOG.setLevel((Level) Level.DEBUG);
-        
-        // create a ZooKeeper server(dataDir, dataLogDir, port)
-        LOG.debug("Running ZK server (setup)");
-        //ServerStats.registerAsConcrete();
-        ClientBase.setupTestEnv();
-        ZkTmpDir = File.createTempFile("zookeeper", "test");
-        ZkTmpDir.delete();
-        ZkTmpDir.mkdir();
-            
-        try {
-            zks = new ZooKeeperServer(ZkTmpDir, ZkTmpDir, ZooKeeperDefaultPort);
-            serverFactory =  new NIOServerCnxn.Factory(ZooKeeperDefaultPort);
-            serverFactory.startup(zks);
-        } catch (IOException e1) {
-            // TODO Auto-generated catch block
-            e1.printStackTrace();
-        } catch (InterruptedException e) {
-            // TODO Auto-generated catch block
-            e.printStackTrace();
-        }
-        boolean b = ClientBase.waitForServerUp(HOSTPORT, ClientBase.CONNECTION_TIMEOUT);
-        
-        LOG.debug("Server up: " + b);
-        
-        // create a zookeeper client
-        LOG.debug("Instantiate ZK Client");
-        zkc = new ZooKeeper("127.0.0.1", ZooKeeperDefaultPort, new emptyWatcher());
-        
-        //initialize the zk client with values
-        try {
-            zkc.create("/ledgers", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-            zkc.create("/ledgers/available", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-            zkc.create("/ledgers/available/127.0.0.1:" + Integer.toString(initialPort), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-            zkc.create("/ledgers/available/127.0.0.1:" + Integer.toString(initialPort + 1), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-            zkc.create("/ledgers/available/127.0.0.1:" + Integer.toString(initialPort + 2), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-            zkc.create("/ledgers/available/127.0.0.1:" + Integer.toString(initialPort + 3), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-        } catch (KeeperException e) {
-            // TODO Auto-generated catch block
-            e.printStackTrace();
-        } catch (InterruptedException e) {
-            // TODO Auto-generated catch block
-            e.printStackTrace();
-        }
-        
-        // Create Bookie Servers (B1, B2, B3)
-        tmpDirB1 = File.createTempFile("bookie1", "test");
-        tmpDirB1.delete();
-        tmpDirB1.mkdir();
-         
-        bs1 = new BookieServer(initialPort, tmpDirB1, new File[]{tmpDirB1});
-        bs1.start();
-        
-        tmpDirB2 = File.createTempFile("bookie2", "test");
-        tmpDirB2.delete();
-        tmpDirB2.mkdir();
-            
-        bs2 = new BookieServer(initialPort + 1, tmpDirB2, new File[]{tmpDirB2});
-        bs2.start();
 
-        tmpDirB3 = File.createTempFile("bookie3", "test");
-        tmpDirB3.delete();
-        tmpDirB3.mkdir();
-        
-        bs3 = new BookieServer(initialPort + 2, tmpDirB3, new File[]{tmpDirB3});
-        bs3.start();
-        
-        tmpDirB4 = File.createTempFile("bookie4", "test");
-        tmpDirB4.delete();
-        tmpDirB4.mkdir();
-        
-        bs4 = new BookieServer(initialPort + 3, tmpDirB4, new File[]{tmpDirB4});
-        bs4.start();
-        
-        rng = new Random(System.currentTimeMillis());   // Initialize the Random Number Generator 
-        entries = new ArrayList<byte[]>(); // initialize the  entries list
-        entriesSize = new ArrayList<Integer>(); 
+    @Before
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+
+        rng = new Random(System.currentTimeMillis()); // Initialize the Random
+                                                      // Number Generator
+        entries = new ArrayList<byte[]>(); // initialize the entries list
+        entriesSize = new ArrayList<Integer>();
         sync = new SyncObj(); // initialize the synchronization data structure
-        
+
         zkc.close();
     }
-    
-    protected void tearDown() throws InterruptedException {
-        LOG.info("TearDown");
-        bkc.halt();
-        
-        //shutdown bookie servers 
-        if(!bs1.isDown()) bs1.shutdown();
-        if(!bs2.isDown()) bs2.shutdown();
-        if(!bs3.isDown()) bs3.shutdown();
-        if(!bs4.isDown()) bs4.shutdown();
-             
-        cleanUpDir(tmpDirB1);
-        cleanUpDir(tmpDirB2);
-        cleanUpDir(tmpDirB3);
-        cleanUpDir(tmpDirB4);
-        //shutdown ZK server
-        serverFactory.shutdown();
-        assertTrue("waiting for server down",
-                ClientBase.waitForServerDown(HOSTPORT,
-                                             ClientBase.CONNECTION_TIMEOUT));
-        //ServerStats.unregister();
-        cleanUpDir(ZkTmpDir);
-        
-    }
 
-    /*  Clean up a directory recursively */
-    protected boolean cleanUpDir(File dir){
+
+    /* Clean up a directory recursively */
+    @Override
+    protected boolean cleanUpDir(File dir) {
         if (dir.isDirectory()) {
             LOG.info("Cleaning up " + dir.getName());
             String[] children = dir.list();
             for (String string : children) {
                 boolean success = cleanUpDir(new File(dir, string));
-                if (!success) return false;
+                if (!success)
+                    return false;
             }
         }
         // The directory is now empty so delete it
-        return dir.delete();        
+        return dir.delete();
     }
 
-    /*  User for testing purposes, void */
-    class emptyWatcher implements Watcher{
-        public void process(WatchedEvent event) {}
+    /* User for testing purposes, void */
+    class emptyWatcher implements Watcher {
+        public void process(WatchedEvent event) {
+        }
     }
-    
 
 }
\ No newline at end of file

Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieReadWriteTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieReadWriteTest.java?rev=903483&r1=903482&r2=903483&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieReadWriteTest.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieReadWriteTest.java Tue Jan 26 23:16:45 2010
@@ -1,4 +1,5 @@
 package org.apache.bookkeeper.test;
+
 /*
  * 
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -20,157 +21,132 @@
  * 
  */
 
-
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
+import java.util.Enumeration;
 import java.util.Random;
 import java.util.Set;
 
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerEntry;
 import org.apache.bookkeeper.client.LedgerHandle;
-import org.apache.bookkeeper.client.LedgerSequence;
 import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
-import org.apache.bookkeeper.proto.BookieServer;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.streaming.LedgerInputStream;
 import org.apache.bookkeeper.streaming.LedgerOutputStream;
-
-import org.apache.log4j.ConsoleAppender;
-import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
-import org.apache.log4j.PatternLayout;
-import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.server.NIOServerCnxn;
-import org.apache.zookeeper.server.ZooKeeperServer;
-import org.apache.zookeeper.test.ClientBase;
-
+import org.junit.Before;
 import org.junit.Test;
 
 /**
- * This test tests read and write, synchronous and 
- * asynchronous, strings and integers for a BookKeeper client. 
- * The test deployment uses a ZooKeeper server 
- * and three BookKeepers. 
+ * This test tests read and write, synchronous and asynchronous, strings and
+ * integers for a BookKeeper client. The test deployment uses a ZooKeeper server
+ * and three BookKeepers.
  * 
  */
 
-public class BookieReadWriteTest 
-    extends junit.framework.TestCase 
-    implements AddCallback, ReadCallback{
+public class BookieReadWriteTest extends BaseTestCase implements AddCallback, ReadCallback {
 
-    //Depending on the taste, select the amount of logging
+    // Depending on the taste, select the amount of logging
     // by decommenting one of the two lines below
-    //static Logger LOG = Logger.getRootLogger();
+    // static Logger LOG = Logger.getRootLogger();
     static Logger LOG = Logger.getLogger(BookieReadWriteTest.class);
 
-    static ConsoleAppender ca = new ConsoleAppender(new PatternLayout());
-
-    // ZooKeeper related variables
-    private static final String HOSTPORT = "127.0.0.1:2181";
-    static Integer ZooKeeperDefaultPort = 2181;
-    ZooKeeperServer zks;
-    ZooKeeper zkc; //zookeeper client
-    NIOServerCnxn.Factory serverFactory;
-    File ZkTmpDir;
-    
-    //BookKeeper 
-    File tmpDirB1, tmpDirB2, tmpDirB3;
-    BookieServer bs1, bs2, bs3;
-    Integer initialPort = 5000;
-    BookKeeper bkc; // bookkeeper client
     byte[] ledgerPassword = "aaa".getBytes();
     LedgerHandle lh, lh2;
     long ledgerId;
-    LedgerSequence ls;
-    
-    //test related variables 
+    Enumeration<LedgerEntry> ls;
+
+    // test related variables
     int numEntriesToWrite = 200;
     int maxInt = 2147483647;
-    Random rng; // Random Number Generator 
+    Random rng; // Random Number Generator
     ArrayList<byte[]> entries; // generated entries
     ArrayList<Integer> entriesSize;
     
+    DigestType digestType;
+    
+    public BookieReadWriteTest(DigestType digestType){
+        super(3);
+        this.digestType = digestType;
+    }
     // Synchronization
     SyncObj sync;
     Set<Object> syncObjs;
-    
+
     class SyncObj {
         int counter;
-        boolean value;      
+        boolean value;
+
         public SyncObj() {
             counter = 0;
             value = false;
-        }       
+        }
     }
-    
+
     @Test
-    public void testOpenException() 
-    throws KeeperException, IOException, InterruptedException {
-        bkc = new BookKeeper("127.0.0.1");
-        try{
-            lh = bkc.openLedger(0, ledgerPassword);
+    public void testOpenException() throws KeeperException, IOException, InterruptedException {
+        try {
+            lh = bkc.openLedger(0, digestType, ledgerPassword);
             fail("Haven't thrown exception");
         } catch (BKException e) {
             LOG.warn("Successfully thrown and caught exception:", e);
         }
     }
-    
+
     /**
-     * test the streaming api for reading
-     * and writing
+     * test the streaming api for reading and writing
+     * 
      * @throws {@link IOException}, {@link KeeperException}
      */
     @Test
-    public void testStreamingClients() throws IOException,
-        KeeperException, BKException, InterruptedException {
+    public void testStreamingClients() throws IOException, KeeperException, BKException, InterruptedException {
         bkc = new BookKeeper("127.0.0.1");
-        lh = bkc.createLedger(ledgerPassword);
-        //write a string so that we cna
+        lh = bkc.createLedger(digestType, ledgerPassword);
+        // write a string so that we cna
         // create a buffer of a single bytes
         // and check for corner cases
-        String toWrite = "we need to check for this string to match " +
-                "and for the record mahadev is the best";
-        LedgerOutputStream lout = new LedgerOutputStream(lh , 1);
+        String toWrite = "we need to check for this string to match " + "and for the record mahadev is the best";
+        LedgerOutputStream lout = new LedgerOutputStream(lh, 1);
         byte[] b = toWrite.getBytes();
         lout.write(b);
         lout.close();
         long lId = lh.getId();
         lh.close();
-        //check for sanity
-        lh = bkc.openLedger(lId, ledgerPassword);
-        LedgerInputStream lin = new LedgerInputStream(lh,  1);
+        // check for sanity
+        lh = bkc.openLedger(lId, digestType, ledgerPassword);
+        LedgerInputStream lin = new LedgerInputStream(lh, 1);
         byte[] bread = new byte[b.length];
         int read = 0;
-        while (read < b.length) { 
+        while (read < b.length) {
             read = read + lin.read(bread, read, b.length);
         }
-        
+
         String newString = new String(bread);
         assertTrue("these two should same", toWrite.equals(newString));
         lin.close();
         lh.close();
-        //create another ledger to write one byte at a time
-        lh = bkc.createLedger(ledgerPassword);
+        // create another ledger to write one byte at a time
+        lh = bkc.createLedger(digestType, ledgerPassword);
         lout = new LedgerOutputStream(lh);
-        for (int i=0; i < b.length;i++) {
+        for (int i = 0; i < b.length; i++) {
             lout.write(b[i]);
         }
         lout.close();
         lId = lh.getId();
         lh.close();
-        lh = bkc.openLedger(lId, ledgerPassword);
+        lh = bkc.openLedger(lId, digestType, ledgerPassword);
         lin = new LedgerInputStream(lh);
         bread = new byte[b.length];
-        read= 0;
+        read = 0;
         while (read < b.length) {
             read = read + lin.read(bread, read, b.length);
         }
@@ -179,62 +155,60 @@
         lin.close();
         lh.close();
     }
-        
-    
+
     @Test
-    public void testReadWriteAsyncSingleClient() throws IOException{
+    public void testReadWriteAsyncSingleClient() throws IOException {
         try {
             // Create a BookKeeper client and a ledger
             bkc = new BookKeeper("127.0.0.1");
-            lh = bkc.createLedger(ledgerPassword);
-            //bkc.initMessageDigest("SHA1");
+            lh = bkc.createLedger(digestType, ledgerPassword);
+            // bkc.initMessageDigest("SHA1");
             ledgerId = lh.getId();
             LOG.info("Ledger ID: " + lh.getId());
-            for(int i = 0; i < numEntriesToWrite; i++){
+            for (int i = 0; i < numEntriesToWrite; i++) {
                 ByteBuffer entry = ByteBuffer.allocate(4);
                 entry.putInt(rng.nextInt(maxInt));
                 entry.position(0);
-                
+
                 entries.add(entry.array());
                 entriesSize.add(entry.array().length);
                 lh.asyncAddEntry(entry.array(), this, sync);
             }
-            
+
             // wait for all entries to be acknowledged
             synchronized (sync) {
-                while (sync.counter < numEntriesToWrite){
+                while (sync.counter < numEntriesToWrite) {
                     LOG.debug("Entries counter = " + sync.counter);
                     sync.wait();
                 }
             }
-            
+
             LOG.debug("*** WRITE COMPLETE ***");
-            // close ledger 
+            // close ledger
             lh.close();
-            
-            //*** WRITING PART COMPLETE // READ PART BEGINS ***
-            
+
+            // *** WRITING PART COMPLETE // READ PART BEGINS ***
+
             // open ledger
-            lh = bkc.openLedger(ledgerId, ledgerPassword);
-            LOG.debug("Number of entries written: " + lh.getLast());
-            assertTrue("Verifying number of entries written", lh.getLast() == (numEntriesToWrite - 1));     
-            
-            //read entries
+            lh = bkc.openLedger(ledgerId, digestType, ledgerPassword);
+            LOG.debug("Number of entries written: " + (lh.getLastAddConfirmed() + 1));
+            assertTrue("Verifying number of entries written", lh.getLastAddConfirmed() == (numEntriesToWrite - 1));
+
+            // read entries
             lh.asyncReadEntries(0, numEntriesToWrite - 1, this, (Object) sync);
-            
+
             synchronized (sync) {
-                while(sync.value == false){
+                while (sync.value == false) {
                     sync.wait();
-                }               
+                }
             }
-            
-            assertTrue("Checking number of read entries", ls.size() == numEntriesToWrite);
-            
+
             LOG.debug("*** READ COMPLETE ***");
-            
-            // at this point, LedgerSequence ls is filled with the returned values
+
+            // at this point, LedgerSequence ls is filled with the returned
+            // values
             int i = 0;
-            while(ls.hasMoreElements()){
+            while (ls.hasMoreElements()) {
                 ByteBuffer origbb = ByteBuffer.wrap(entries.get(i));
                 Integer origEntry = origbb.getInt();
                 byte[] entry = ls.nextElement().getEntry();
@@ -248,6 +222,8 @@
                 assertTrue("Checking entry " + i + " for size", entry.length == entriesSize.get(i).intValue());
                 i++;
             }
+            assertTrue("Checking number of read entries", i == numEntriesToWrite);
+
             lh.close();
         } catch (KeeperException e) {
             LOG.error("Test failed", e);
@@ -258,71 +234,72 @@
         } catch (InterruptedException e) {
             LOG.error("Test failed", e);
             fail("Test failed due to interruption");
-        } 
+        }
     }
-    
+
     @Test
-    public void testSyncReadAsyncWriteStringsSingleClient() throws IOException{
+    public void testSyncReadAsyncWriteStringsSingleClient() throws IOException {
         LOG.info("TEST READ WRITE STRINGS MIXED SINGLE CLIENT");
         String charset = "utf-8";
-        LOG.debug("Default charset: "  + Charset.defaultCharset());
+        LOG.debug("Default charset: " + Charset.defaultCharset());
         try {
             // Create a BookKeeper client and a ledger
             bkc = new BookKeeper("127.0.0.1");
-            lh = bkc.createLedger(ledgerPassword);
-            //bkc.initMessageDigest("SHA1");
+            lh = bkc.createLedger(digestType, ledgerPassword);
+            // bkc.initMessageDigest("SHA1");
             ledgerId = lh.getId();
             LOG.info("Ledger ID: " + lh.getId());
-            for(int i = 0; i < numEntriesToWrite; i++){
+            for (int i = 0; i < numEntriesToWrite; i++) {
                 int randomInt = rng.nextInt(maxInt);
                 byte[] entry = new String(Integer.toString(randomInt)).getBytes(charset);
                 entries.add(entry);
                 lh.asyncAddEntry(entry, this, sync);
             }
-            
+
             // wait for all entries to be acknowledged
             synchronized (sync) {
-                while (sync.counter < numEntriesToWrite){
+                while (sync.counter < numEntriesToWrite) {
                     LOG.debug("Entries counter = " + sync.counter);
                     sync.wait();
                 }
             }
-            
+
             LOG.debug("*** ASYNC WRITE COMPLETE ***");
-            // close ledger 
+            // close ledger
             lh.close();
-            
-            //*** WRITING PART COMPLETED // READ PART BEGINS ***
-            
+
+            // *** WRITING PART COMPLETED // READ PART BEGINS ***
+
             // open ledger
-            lh = bkc.openLedger(ledgerId, ledgerPassword);
-            LOG.debug("Number of entries written: " + lh.getLast());
-            assertTrue("Verifying number of entries written", lh.getLast() == (numEntriesToWrite - 1));     
-            
-            //read entries          
+            lh = bkc.openLedger(ledgerId, digestType, ledgerPassword);
+            LOG.debug("Number of entries written: " + (lh.getLastAddConfirmed() + 1));
+            assertTrue("Verifying number of entries written", lh.getLastAddConfirmed() == (numEntriesToWrite - 1));
+
+            // read entries
             ls = lh.readEntries(0, numEntriesToWrite - 1);
-            
-            assertTrue("Checking number of read entries", ls.size() == numEntriesToWrite);
-            
+
             LOG.debug("*** SYNC READ COMPLETE ***");
-            
-            // at this point, LedgerSequence ls is filled with the returned values
+
+            // at this point, LedgerSequence ls is filled with the returned
+            // values
             int i = 0;
-            while(ls.hasMoreElements()){
+            while (ls.hasMoreElements()) {
                 byte[] origEntryBytes = entries.get(i++);
                 byte[] retrEntryBytes = ls.nextElement().getEntry();
-                
+
                 LOG.debug("Original byte entry size: " + origEntryBytes.length);
                 LOG.debug("Saved byte entry size: " + retrEntryBytes.length);
-                
+
                 String origEntry = new String(origEntryBytes, charset);
                 String retrEntry = new String(retrEntryBytes, charset);
-                
+
                 LOG.debug("Original entry: " + origEntry);
                 LOG.debug("Retrieved entry: " + retrEntry);
-                
+
                 assertTrue("Checking entry " + i + " for equality", origEntry.equals(retrEntry));
             }
+            assertTrue("Checking number of read entries", i == numEntriesToWrite);
+
             lh.close();
         } catch (KeeperException e) {
             LOG.error("Test failed", e);
@@ -333,34 +310,34 @@
         } catch (InterruptedException e) {
             LOG.error("Test failed", e);
             fail("Test failed due to interruption");
-        } 
-        
+        }
+
     }
-    
+
     @Test
     public void testReadWriteSyncSingleClient() throws IOException {
         try {
             // Create a BookKeeper client and a ledger
             bkc = new BookKeeper("127.0.0.1");
-            lh = bkc.createLedger(ledgerPassword);
-            //bkc.initMessageDigest("SHA1");
+            lh = bkc.createLedger(digestType, ledgerPassword);
+            // bkc.initMessageDigest("SHA1");
             ledgerId = lh.getId();
             LOG.info("Ledger ID: " + lh.getId());
-            for(int i = 0; i < numEntriesToWrite; i++){
+            for (int i = 0; i < numEntriesToWrite; i++) {
                 ByteBuffer entry = ByteBuffer.allocate(4);
                 entry.putInt(rng.nextInt(maxInt));
                 entry.position(0);
-                entries.add(entry.array());             
+                entries.add(entry.array());
                 lh.addEntry(entry.array());
             }
             lh.close();
-            lh = bkc.openLedger(ledgerId, ledgerPassword);
-            LOG.debug("Number of entries written: " + lh.getLast());
-            assertTrue("Verifying number of entries written", lh.getLast() == (numEntriesToWrite - 1));     
-            
+            lh = bkc.openLedger(ledgerId, digestType, ledgerPassword);
+            LOG.debug("Number of entries written: " + lh.getLastAddConfirmed());
+            assertTrue("Verifying number of entries written", lh.getLastAddConfirmed() == (numEntriesToWrite - 1));
+
             ls = lh.readEntries(0, numEntriesToWrite - 1);
             int i = 0;
-            while(ls.hasMoreElements()){
+            while (ls.hasMoreElements()) {
                 ByteBuffer origbb = ByteBuffer.wrap(entries.get(i++));
                 Integer origEntry = origbb.getInt();
                 ByteBuffer result = ByteBuffer.wrap(ls.nextElement().getEntry());
@@ -381,42 +358,42 @@
         } catch (InterruptedException e) {
             LOG.error("Test failed", e);
             fail("Test failed due to interruption");
-        } 
+        }
     }
-    
+
     @Test
     public void testReadWriteZero() throws IOException {
         try {
             // Create a BookKeeper client and a ledger
             bkc = new BookKeeper("127.0.0.1");
-            lh = bkc.createLedger(ledgerPassword);
-            //bkc.initMessageDigest("SHA1");
+            lh = bkc.createLedger(digestType, ledgerPassword);
+            // bkc.initMessageDigest("SHA1");
             ledgerId = lh.getId();
             LOG.info("Ledger ID: " + lh.getId());
-            for(int i = 0; i < numEntriesToWrite; i++){             
-            lh.addEntry(new byte[0]);
+            for (int i = 0; i < numEntriesToWrite; i++) {
+                lh.addEntry(new byte[0]);
             }
-            
+
             /*
              * Write a non-zero entry
              */
             ByteBuffer entry = ByteBuffer.allocate(4);
             entry.putInt(rng.nextInt(maxInt));
             entry.position(0);
-            entries.add(entry.array());             
-            lh.addEntry( entry.array());
-            
+            entries.add(entry.array());
+            lh.addEntry(entry.array());
+
             lh.close();
-            lh = bkc.openLedger(ledgerId, ledgerPassword);
-            LOG.debug("Number of entries written: " + lh.getLast());
-            assertTrue("Verifying number of entries written", lh.getLast() == numEntriesToWrite);       
-            
+            lh = bkc.openLedger(ledgerId, digestType, ledgerPassword);
+            LOG.debug("Number of entries written: " + lh.getLastAddConfirmed());
+            assertTrue("Verifying number of entries written", lh.getLastAddConfirmed() == numEntriesToWrite);
+
             ls = lh.readEntries(0, numEntriesToWrite - 1);
             int i = 0;
-            while(ls.hasMoreElements()){
+            while (ls.hasMoreElements()) {
                 ByteBuffer result = ByteBuffer.wrap(ls.nextElement().getEntry());
                 LOG.debug("Length of result: " + result.capacity());
-                
+
                 assertTrue("Checking if entry " + i + " has zero bytes", result.capacity() == 0);
             }
             lh.close();
@@ -429,52 +406,54 @@
         } catch (InterruptedException e) {
             LOG.error("Test failed", e);
             fail("Test failed due to interruption");
-        } 
+        }
     }
-    
+
     @Test
     public void testMultiLedger() throws IOException {
         try {
             // Create a BookKeeper client and a ledger
             bkc = new BookKeeper("127.0.0.1");
-            lh = bkc.createLedger(ledgerPassword);
-            lh2 = bkc.createLedger(ledgerPassword);
-            
+            lh = bkc.createLedger(digestType, ledgerPassword);
+            lh2 = bkc.createLedger(digestType, ledgerPassword);
+
             long ledgerId = lh.getId();
             long ledgerId2 = lh2.getId();
-            
-            //bkc.initMessageDigest("SHA1");
+
+            // bkc.initMessageDigest("SHA1");
             LOG.info("Ledger ID 1: " + lh.getId() + ", Ledger ID 2: " + lh2.getId());
-            for(int i = 0; i < numEntriesToWrite; i++){             
-                lh.addEntry( new byte[0]);
+            for (int i = 0; i < numEntriesToWrite; i++) {
+                lh.addEntry(new byte[0]);
                 lh2.addEntry(new byte[0]);
             }
-            
+
             lh.close();
             lh2.close();
-                
-            lh = bkc.openLedger(ledgerId, ledgerPassword);
-            lh2 = bkc.openLedger(ledgerId2, ledgerPassword);
-            
-            LOG.debug("Number of entries written: " + lh.getLast() + ", " + lh2.getLast());
-            assertTrue("Verifying number of entries written lh (" + lh.getLast() + ")" , lh.getLast() == (numEntriesToWrite - 1));
-            assertTrue("Verifying number of entries written lh2 (" + lh2.getLast() + ")", lh2.getLast() == (numEntriesToWrite - 1));
-            
+
+            lh = bkc.openLedger(ledgerId, digestType, ledgerPassword);
+            lh2 = bkc.openLedger(ledgerId2, digestType, ledgerPassword);
+
+            LOG.debug("Number of entries written: " + lh.getLastAddConfirmed() + ", " + lh2.getLastAddConfirmed());
+            assertTrue("Verifying number of entries written lh (" + lh.getLastAddConfirmed() + ")", lh
+                    .getLastAddConfirmed() == (numEntriesToWrite - 1));
+            assertTrue("Verifying number of entries written lh2 (" + lh2.getLastAddConfirmed() + ")", lh2
+                    .getLastAddConfirmed() == (numEntriesToWrite - 1));
+
             ls = lh.readEntries(0, numEntriesToWrite - 1);
             int i = 0;
-            while(ls.hasMoreElements()){
+            while (ls.hasMoreElements()) {
                 ByteBuffer result = ByteBuffer.wrap(ls.nextElement().getEntry());
                 LOG.debug("Length of result: " + result.capacity());
-                
+
                 assertTrue("Checking if entry " + i + " has zero bytes", result.capacity() == 0);
             }
             lh.close();
-            ls = lh2.readEntries( 0, numEntriesToWrite - 1);
+            ls = lh2.readEntries(0, numEntriesToWrite - 1);
             i = 0;
-            while(ls.hasMoreElements()){
+            while (ls.hasMoreElements()) {
                 ByteBuffer result = ByteBuffer.wrap(ls.nextElement().getEntry());
                 LOG.debug("Length of result: " + result.capacity());
-                
+
                 assertTrue("Checking if entry " + i + " has zero bytes", result.capacity() == 0);
             }
             lh2.close();
@@ -487,14 +466,10 @@
         } catch (InterruptedException e) {
             LOG.error("Test failed", e);
             fail("Test failed due to interruption");
-        } 
+        }
     }
-    
-    
-    public void addComplete(int rc, 
-            LedgerHandle lh, 
-            long entryId, 
-            Object ctx) {
+
+    public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
         SyncObj x = (SyncObj) ctx;
         synchronized (x) {
             x.counter++;
@@ -502,135 +477,45 @@
         }
     }
 
-    public void readComplete(int rc, 
-            LedgerHandle lh, 
-            LedgerSequence seq,
-            Object ctx) {
-        ls = seq;               
+    public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx) {
+        ls = seq;
         synchronized (sync) {
             sync.value = true;
             sync.notify();
         }
-        
-    }
-     
-    protected void setUp() throws IOException, InterruptedException {
-        LOG.addAppender(ca);
-        LOG.setLevel((Level) Level.DEBUG);
-        
-        // create a ZooKeeper server(dataDir, dataLogDir, port)
-        LOG.debug("Running ZK server");
-        //ServerStats.registerAsConcrete();
-        ClientBase.setupTestEnv();
-        ZkTmpDir = File.createTempFile("zookeeper", "test");
-        ZkTmpDir.delete();
-        ZkTmpDir.mkdir();
-            
-        try {
-            zks = new ZooKeeperServer(ZkTmpDir, ZkTmpDir, ZooKeeperDefaultPort);
-            serverFactory =  new NIOServerCnxn.Factory(ZooKeeperDefaultPort);
-            serverFactory.startup(zks);
-        } catch (IOException e1) {
-            // TODO Auto-generated catch block
-            e1.printStackTrace();
-        } catch (InterruptedException e) {
-            // TODO Auto-generated catch block
-            e.printStackTrace();
-        }
-        boolean b = ClientBase.waitForServerUp(HOSTPORT, ClientBase.CONNECTION_TIMEOUT);
-        
-        LOG.debug("Server up: " + b);
-        
-        // create a zookeeper client
-        LOG.debug("Instantiate ZK Client");
-        zkc = new ZooKeeper("127.0.0.1", ZooKeeperDefaultPort, new emptyWatcher());
-        
-        //initialize the zk client with values
-        try {
-            zkc.create("/ledgers", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-            zkc.create("/ledgers/available", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-            zkc.create("/ledgers/available/127.0.0.1:" + Integer.toString(initialPort), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-            zkc.create("/ledgers/available/127.0.0.1:" + Integer.toString(initialPort + 1), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-            zkc.create("/ledgers/available/127.0.0.1:" + Integer.toString(initialPort + 2), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-        } catch (KeeperException e) {
-            // TODO Auto-generated catch block
-            e.printStackTrace();
-        } catch (InterruptedException e) {
-            // TODO Auto-generated catch block
-            e.printStackTrace();
-        }
-        
-        // Create Bookie Servers (B1, B2, B3)
-        tmpDirB1 = File.createTempFile("bookie1", "test");
-        tmpDirB1.delete();
-        tmpDirB1.mkdir();
-         
-        bs1 = new BookieServer(initialPort, tmpDirB1, new File[]{tmpDirB1});
-        bs1.start();
-        
-        tmpDirB2 = File.createTempFile("bookie2", "test");
-        tmpDirB2.delete();
-        tmpDirB2.mkdir();
-            
-        bs2 = new BookieServer(initialPort + 1, tmpDirB2, new File[]{tmpDirB2});
-        bs2.start();
-
-        tmpDirB3 = File.createTempFile("bookie3", "test");
-        tmpDirB3.delete();
-        tmpDirB3.mkdir();
-        
-        bs3 = new BookieServer(initialPort + 2, tmpDirB3, new File[]{tmpDirB3});
-        bs3.start();
-        
-        rng = new Random(System.currentTimeMillis());   // Initialize the Random Number Generator 
-        entries = new ArrayList<byte[]>(); // initialize the  entries list
-        entriesSize = new ArrayList<Integer>(); 
-        sync = new SyncObj(); // initialize the synchronization data structure
-        zkc.close();
+
     }
-    
-    protected void tearDown(){
-        LOG.info("TearDown");
 
-        //shutdown bookie servers 
-        try {
-            bs1.shutdown();
-            bs2.shutdown();
-            bs3.shutdown();
-        } catch (InterruptedException e) {
-            e.printStackTrace();
-        }
-        cleanUpDir(tmpDirB1);
-        cleanUpDir(tmpDirB2);
-        cleanUpDir(tmpDirB3);
-        
-        //shutdown ZK server
-        serverFactory.shutdown();
-        assertTrue("waiting for server down",
-                ClientBase.waitForServerDown(HOSTPORT,
-                                             ClientBase.CONNECTION_TIMEOUT));
-        //ServerStats.unregister();
-        cleanUpDir(ZkTmpDir);
+    @Before
+    public void setUp() throws Exception{
+        super.setUp();
+        rng = new Random(System.currentTimeMillis()); // Initialize the Random
+                                                      // Number Generator
+        entries = new ArrayList<byte[]>(); // initialize the entries list
+        entriesSize = new ArrayList<Integer>();
+        sync = new SyncObj(); // initialize the synchronization data structure
         
     }
 
-    /*  Clean up a directory recursively */
-    protected boolean cleanUpDir(File dir){
+    /* Clean up a directory recursively */
+    protected boolean cleanUpDir(File dir) {
         if (dir.isDirectory()) {
             LOG.info("Cleaning up " + dir.getName());
             String[] children = dir.list();
             for (String string : children) {
                 boolean success = cleanUpDir(new File(dir, string));
-                if (!success) return false;
+                if (!success)
+                    return false;
             }
         }
         // The directory is now empty so delete it
-        return dir.delete();        
+        return dir.delete();
     }
 
-    /*  User for testing purposes, void */
-    class emptyWatcher implements Watcher{
-        public void process(WatchedEvent event) {}
+    /* User for testing purposes, void */
+    class emptyWatcher implements Watcher {
+        public void process(WatchedEvent event) {
+        }
     }
 
 }