You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by br...@apache.org on 2009/06/24 07:07:24 UTC
svn commit: r787907 [2/2] - in /hadoop/zookeeper/trunk: ./
src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/
src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/
src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/ src/cont...
Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieClientTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieClientTest.java?rev=787907&r1=787906&r2=787907&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieClientTest.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieClientTest.java Wed Jun 24 05:07:23 2009
@@ -106,18 +106,18 @@
BookieClient bc = new BookieClient("127.0.0.1", port, 50000);
ByteBuffer bb;
- bb = createByteBuffer(1);
+ bb = createByteBuffer(1,1,1);
bc.addEntry(1, passwd, 1, bb, wrcb, null);
- bb = createByteBuffer(2);
+ bb = createByteBuffer(2,1,2);
bc.addEntry(1, passwd, 2, bb, wrcb, null);
- bb = createByteBuffer(3);
+ bb = createByteBuffer(3,1,3);
bc.addEntry(1, passwd, 3, bb, wrcb, null);
- bb = createByteBuffer(5);
+ bb = createByteBuffer(5,1,5);
bc.addEntry(1, passwd, 5, bb, wrcb, null);
- bb = createByteBuffer(7);
+ bb = createByteBuffer(7,1,7);
bc.addEntry(1, passwd, 7, bb, wrcb, null);
synchronized(notifyObject) {
- bb = createByteBuffer(11);
+ bb = createByteBuffer(11,1,11);
bc.addEntry(1, passwd, 11, bb, wrcb, notifyObject);
notifyObject.wait();
}
@@ -184,10 +184,12 @@
assertEquals(BookieProtocol.ENOENTRY, arc.rc);
}
}
- private ByteBuffer createByteBuffer(int i) {
+ private ByteBuffer createByteBuffer(int i, long lid, long eid) {
ByteBuffer bb;
- bb = ByteBuffer.allocate(4);
+ bb = ByteBuffer.allocate(4+16);
bb.putInt(i);
+ bb.putLong(lid);
+ bb.putLong(eid);
bb.flip();
return bb;
}
Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieFailureTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieFailureTest.java?rev=787907&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieFailureTest.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieFailureTest.java Wed Jun 24 05:07:23 2009
@@ -0,0 +1,385 @@
+package org.apache.bookkeeper.test;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.LedgerHandle.QMode;
+import org.apache.bookkeeper.client.LedgerSequence;
+import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
+import org.apache.bookkeeper.proto.BookieServer;
+import org.apache.bookkeeper.streaming.LedgerInputStream;
+import org.apache.bookkeeper.streaming.LedgerOutputStream;
+import org.apache.bookkeeper.util.ClientBase;
+import org.apache.log4j.ConsoleAppender;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.PatternLayout;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.NIOServerCnxn;
+import org.apache.zookeeper.server.ZooKeeperServer;
+
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.junit.Test;
+
+/**
+ * This test tests read and write, synchronous and
+ * asynchronous, strings and integers for a BookKeeper client.
+ * The test deployment uses a ZooKeeper server
+ * and three BookKeepers.
+ *
+ */
+
+public class BookieFailureTest
+ extends junit.framework.TestCase
+ implements AddCallback, ReadCallback{
+
+
+ //Depending on the taste, select the amount of logging
+ // by decommenting one of the two lines below
+ //static Logger LOG = Logger.getRootLogger();
+ static Logger LOG = Logger.getLogger(BookieReadWriteTest.class);
+
+ static ConsoleAppender ca = new ConsoleAppender(new PatternLayout());
+
+ // ZooKeeper related variables
+ private static final String HOSTPORT = "127.0.0.1:2181";
+ static Integer ZooKeeperDefaultPort = 2181;
+ ZooKeeperServer zks;
+ ZooKeeper zkc; //zookeeper client
+ NIOServerCnxn.Factory serverFactory;
+ File ZkTmpDir;
+
+ //BookKeeper
+ File tmpDirB1, tmpDirB2, tmpDirB3, tmpDirB4;
+ BookieServer bs1, bs2, bs3, bs4;
+ Integer initialPort = 5000;
+ BookKeeper bkc; // bookkeeper client
+ byte[] ledgerPassword = "aaa".getBytes();
+ LedgerHandle lh, lh2;
+ long ledgerId;
+ LedgerSequence ls;
+
+ //test related variables
+ int numEntriesToWrite = 20000;
+ int maxInt = 2147483647;
+ Random rng; // Random Number Generator
+ ArrayList<byte[]> entries; // generated entries
+ ArrayList<Integer> entriesSize;
+
+ // Synchronization
+ SyncObj sync;
+ Set<Object> syncObjs;
+
+ class SyncObj {
+ int counter;
+ boolean value;
+ public SyncObj() {
+ counter = 0;
+ value = false;
+ }
+ }
+
+ /**
+ * Tests writes and reads when a bookie fails.
+ *
+ * @throws {@link IOException}
+ */
+ @Test
+ public void testAsyncBK1() throws IOException{
+ LOG.info("#### BK1 ####");
+ auxTestReadWriteAsyncSingleClient(bs1);
+ }
+
+ @Test
+ public void testAsyncBK2() throws IOException{
+ LOG.info("#### BK2 ####");
+ auxTestReadWriteAsyncSingleClient(bs2);
+ }
+
+ @Test
+ public void testAsyncBK3() throws IOException{
+ LOG.info("#### BK3 ####");
+ auxTestReadWriteAsyncSingleClient(bs3);
+ }
+
+ @Test
+ public void testAsyncBK4() throws IOException{
+ LOG.info("#### BK4 ####");
+ auxTestReadWriteAsyncSingleClient(bs4);
+ }
+
+ void auxTestReadWriteAsyncSingleClient(BookieServer bs) throws IOException{
+ try {
+ // Create a BookKeeper client and a ledger
+ bkc = new BookKeeper("127.0.0.1");
+ lh = bkc.createLedger(4, 2, QMode.VERIFIABLE, ledgerPassword);
+
+ ledgerId = lh.getId();
+ LOG.info("Ledger ID: " + lh.getId());
+ for(int i = 0; i < numEntriesToWrite; i++){
+ ByteBuffer entry = ByteBuffer.allocate(4);
+ entry.putInt(rng.nextInt(maxInt));
+ entry.position(0);
+
+ entries.add(entry.array());
+ entriesSize.add(entry.array().length);
+ lh.asyncAddEntry(entry.array(), this, sync);
+ if(i == 5000){
+ //Bookie fail
+ bs.shutdown();
+ }
+ }
+
+ // wait for all entries to be acknowledged
+ synchronized (sync) {
+ while (sync.counter < numEntriesToWrite){
+ LOG.debug("Entries counter = " + sync.counter);
+ sync.wait();
+ }
+ }
+
+ LOG.debug("*** WRITE COMPLETE ***");
+ // close ledger
+ lh.close();
+
+ //*** WRITING PART COMPLETE // READ PART BEGINS ***
+
+ // open ledger
+ bkc = new BookKeeper("127.0.0.1");
+ lh = bkc.openLedger(ledgerId, ledgerPassword);
+ LOG.debug("Number of entries written: " + lh.getLast());
+ assertTrue("Verifying number of entries written", lh.getLast() == (numEntriesToWrite - 1));
+
+ //read entries
+
+ lh.asyncReadEntries(0, numEntriesToWrite - 1, this, (Object) sync);
+
+ synchronized (sync) {
+ while(sync.value == false){
+ sync.wait(10000);
+ assertTrue("Haven't received entries", sync.value);
+ }
+ }
+
+ assertTrue("Checking number of read entries", ls.size() == numEntriesToWrite);
+
+ LOG.debug("*** READ COMPLETE ***");
+
+ // at this point, LedgerSequence ls is filled with the returned values
+ int i = 0;
+ LOG.info("Size of ledger sequence: " + ls.size());
+ while(ls.hasMoreElements()){
+ ByteBuffer origbb = ByteBuffer.wrap(entries.get(i));
+ Integer origEntry = origbb.getInt();
+ byte[] entry = ls.nextElement().getEntry();
+ ByteBuffer result = ByteBuffer.wrap(entry);
+
+ Integer retrEntry = result.getInt();
+ LOG.debug("Retrieved entry: " + i);
+ assertTrue("Checking entry " + i + " for equality", origEntry.equals(retrEntry));
+ assertTrue("Checking entry " + i + " for size", entry.length == entriesSize.get(i).intValue());
+ i++;
+ }
+
+ LOG.info("Verified that entries are ok, and now closing ledger");
+ lh.close();
+ } catch (KeeperException e) {
+ fail(e.toString());
+ } catch (BKException e) {
+ fail(e.toString());
+ } catch (InterruptedException e) {
+ fail(e.toString());
+ }
+
+ }
+
+ public void addComplete(int rc,
+ LedgerHandle lh,
+ long entryId,
+ Object ctx) {
+ if(rc != 0)
+ fail("Failed to write entry: " + entryId);
+ SyncObj x = (SyncObj) ctx;
+ synchronized (x) {
+ x.counter++;
+ x.notify();
+ }
+ }
+
+ public void readComplete(int rc,
+ LedgerHandle lh,
+ LedgerSequence seq,
+ Object ctx) {
+ if(rc != 0)
+ fail("Failed to write entry");
+ ls = seq;
+ synchronized (sync) {
+ sync.value = true;
+ sync.notify();
+ }
+
+ }
+
+ protected void setUp() throws IOException, InterruptedException {
+ LOG.addAppender(ca);
+ LOG.setLevel((Level) Level.DEBUG);
+
+ // create a ZooKeeper server(dataDir, dataLogDir, port)
+ LOG.debug("Running ZK server (setup)");
+ //ServerStats.registerAsConcrete();
+ ClientBase.setupTestEnv();
+ ZkTmpDir = File.createTempFile("zookeeper", "test");
+ ZkTmpDir.delete();
+ ZkTmpDir.mkdir();
+
+ try {
+ zks = new ZooKeeperServer(ZkTmpDir, ZkTmpDir, ZooKeeperDefaultPort);
+ serverFactory = new NIOServerCnxn.Factory(ZooKeeperDefaultPort);
+ serverFactory.startup(zks);
+ } catch (IOException e1) {
+ // TODO Auto-generated catch block
+ e1.printStackTrace();
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ boolean b = ClientBase.waitForServerUp(HOSTPORT, ClientBase.CONNECTION_TIMEOUT);
+
+ LOG.debug("Server up: " + b);
+
+ // create a zookeeper client
+ LOG.debug("Instantiate ZK Client");
+ zkc = new ZooKeeper("127.0.0.1", ZooKeeperDefaultPort, new emptyWatcher());
+
+ //initialize the zk client with values
+ try {
+ zkc.create("/ledgers", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ zkc.create("/ledgers/available", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ zkc.create("/ledgers/available/127.0.0.1:" + Integer.toString(initialPort), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ zkc.create("/ledgers/available/127.0.0.1:" + Integer.toString(initialPort + 1), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ zkc.create("/ledgers/available/127.0.0.1:" + Integer.toString(initialPort + 2), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ zkc.create("/ledgers/available/127.0.0.1:" + Integer.toString(initialPort + 3), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ } catch (KeeperException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+
+ // Create Bookie Servers (B1, B2, B3)
+ tmpDirB1 = File.createTempFile("bookie1", "test");
+ tmpDirB1.delete();
+ tmpDirB1.mkdir();
+
+ bs1 = new BookieServer(initialPort, tmpDirB1, new File[]{tmpDirB1});
+ bs1.start();
+
+ tmpDirB2 = File.createTempFile("bookie2", "test");
+ tmpDirB2.delete();
+ tmpDirB2.mkdir();
+
+ bs2 = new BookieServer(initialPort + 1, tmpDirB2, new File[]{tmpDirB2});
+ bs2.start();
+
+ tmpDirB3 = File.createTempFile("bookie3", "test");
+ tmpDirB3.delete();
+ tmpDirB3.mkdir();
+
+ bs3 = new BookieServer(initialPort + 2, tmpDirB3, new File[]{tmpDirB3});
+ bs3.start();
+
+ tmpDirB4 = File.createTempFile("bookie4", "test");
+ tmpDirB4.delete();
+ tmpDirB4.mkdir();
+
+ bs4 = new BookieServer(initialPort + 3, tmpDirB4, new File[]{tmpDirB4});
+ bs4.start();
+
+ rng = new Random(System.currentTimeMillis()); // Initialize the Random Number Generator
+ entries = new ArrayList<byte[]>(); // initialize the entries list
+ entriesSize = new ArrayList<Integer>();
+ sync = new SyncObj(); // initialize the synchronization data structure
+
+ zkc.close();
+ }
+
+ protected void tearDown() throws InterruptedException {
+ LOG.info("TearDown");
+ bkc.halt();
+
+ //shutdown bookie servers
+ if(!bs1.isDown()) bs1.shutdown();
+ if(!bs2.isDown()) bs2.shutdown();
+ if(!bs3.isDown()) bs3.shutdown();
+ if(!bs4.isDown()) bs4.shutdown();
+
+ cleanUpDir(tmpDirB1);
+ cleanUpDir(tmpDirB2);
+ cleanUpDir(tmpDirB3);
+ cleanUpDir(tmpDirB4);
+ //shutdown ZK server
+ serverFactory.shutdown();
+ assertTrue("waiting for server down",
+ ClientBase.waitForServerDown(HOSTPORT,
+ ClientBase.CONNECTION_TIMEOUT));
+ //ServerStats.unregister();
+ cleanUpDir(ZkTmpDir);
+
+ }
+
+ /* Clean up a directory recursively */
+ protected boolean cleanUpDir(File dir){
+ if (dir.isDirectory()) {
+ LOG.info("Cleaning up " + dir.getName());
+ String[] children = dir.list();
+ for (String string : children) {
+ boolean success = cleanUpDir(new File(dir, string));
+ if (!success) return false;
+ }
+ }
+ // The directory is now empty so delete it
+ return dir.delete();
+ }
+
+ /* User for testing purposes, void */
+ class emptyWatcher implements Watcher{
+ public void process(WatchedEvent event) {}
+ }
+
+
+}
\ No newline at end of file
Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieReadWriteTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieReadWriteTest.java?rev=787907&r1=787906&r2=787907&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieReadWriteTest.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieReadWriteTest.java Wed Jun 24 05:07:23 2009
@@ -63,52 +63,52 @@
*/
public class BookieReadWriteTest
- extends junit.framework.TestCase
- implements AddCallback, ReadCallback{
+ extends junit.framework.TestCase
+ implements AddCallback, ReadCallback{
- //Depending on the taste, select the amount of logging
- // by decommenting one of the two lines below
- //static Logger LOG = Logger.getRootLogger();
- static Logger LOG = Logger.getLogger(BookieClientTest.class);
+ //Depending on the taste, select the amount of logging
+ // by decommenting one of the two lines below
+ //static Logger LOG = Logger.getRootLogger();
+ static Logger LOG = Logger.getLogger(BookieReadWriteTest.class);
- static ConsoleAppender ca = new ConsoleAppender(new PatternLayout());
+ static ConsoleAppender ca = new ConsoleAppender(new PatternLayout());
- // ZooKeeper related variables
+ // ZooKeeper related variables
private static final String HOSTPORT = "127.0.0.1:2181";
- static Integer ZooKeeperDefaultPort = 2181;
- ZooKeeperServer zks;
- ZooKeeper zkc; //zookeeper client
- NIOServerCnxn.Factory serverFactory;
- File ZkTmpDir;
-
- //BookKeeper
- File tmpDirB1, tmpDirB2, tmpDirB3;
- BookieServer bs1, bs2, bs3;
- Integer initialPort = 5000;
- BookKeeper bkc; // bookkeeper client
- byte[] ledgerPassword = "aaa".getBytes();
- LedgerHandle lh, lh2;
- long ledgerId;
- LedgerSequence ls;
-
- //test related variables
- int numEntriesToWrite = 20;
- int maxInt = 2147483647;
- Random rng; // Random Number Generator
- ArrayList<byte[]> entries; // generated entries
- ArrayList<Integer> entriesSize;
-
- // Synchronization
- SyncObj sync;
- Set<Object> syncObjs;
-
+ static Integer ZooKeeperDefaultPort = 2181;
+ ZooKeeperServer zks;
+ ZooKeeper zkc; //zookeeper client
+ NIOServerCnxn.Factory serverFactory;
+ File ZkTmpDir;
+
+ //BookKeeper
+ File tmpDirB1, tmpDirB2, tmpDirB3;
+ BookieServer bs1, bs2, bs3;
+ Integer initialPort = 5000;
+ BookKeeper bkc; // bookkeeper client
+ byte[] ledgerPassword = "aaa".getBytes();
+ LedgerHandle lh, lh2;
+ long ledgerId;
+ LedgerSequence ls;
+
+ //test related variables
+ int numEntriesToWrite = 200;
+ int maxInt = 2147483647;
+ Random rng; // Random Number Generator
+ ArrayList<byte[]> entries; // generated entries
+ ArrayList<Integer> entriesSize;
+
+ // Synchronization
+ SyncObj sync;
+ Set<Object> syncObjs;
+
class SyncObj {
- int counter;
- boolean value;
- public SyncObj() {
- counter = 0;
- value = false;
- }
+ int counter;
+ boolean value;
+ public SyncObj() {
+ counter = 0;
+ value = false;
+ }
}
@Test
@@ -137,7 +137,7 @@
// create a buffer of a single bytes
// and check for corner cases
String toWrite = "we need to check for this string to match " +
- "and for the record mahadev is the best";
+ "and for the record mahadev is the best";
LedgerOutputStream lout = new LedgerOutputStream(lh , 1);
byte[] b = toWrite.getBytes();
lout.write(b);
@@ -181,252 +181,255 @@
@Test
- public void testReadWriteAsyncSingleClient() throws IOException{
- try {
- // Create a BookKeeper client and a ledger
- bkc = new BookKeeper("127.0.0.1");
- lh = bkc.createLedger(ledgerPassword);
- //bkc.initMessageDigest("SHA1");
- ledgerId = lh.getId();
- LOG.info("Ledger ID: " + lh.getId());
- for(int i = 0; i < numEntriesToWrite; i++){
- ByteBuffer entry = ByteBuffer.allocate(4);
- entry.putInt(rng.nextInt(maxInt));
- entry.position(0);
-
- entries.add(entry.array());
- entriesSize.add(entry.array().length);
- lh.asyncAddEntry(entry.array(), this, sync);
- }
-
- // wait for all entries to be acknowledged
- synchronized (sync) {
- while (sync.counter < numEntriesToWrite){
- LOG.debug("Entries counter = " + sync.counter);
- sync.wait();
- }
- }
-
- LOG.debug("*** WRITE COMPLETE ***");
- // close ledger
- lh.close();
-
- //*** WRITING PART COMPLETE // READ PART BEGINS ***
-
- // open ledger
- lh = bkc.openLedger(ledgerId, ledgerPassword);
- LOG.debug("Number of entries written: " + lh.getLast());
- assertTrue("Verifying number of entries written", lh.getLast() == (numEntriesToWrite - 1));
-
- //read entries
- lh.asyncReadEntries(0, numEntriesToWrite - 1, this, (Object) sync);
-
- synchronized (sync) {
- while(sync.value == false){
- sync.wait();
- }
- }
-
- assertTrue("Checking number of read entries", ls.size() == numEntriesToWrite);
-
- LOG.debug("*** READ COMPLETE ***");
-
- // at this point, LedgerSequence ls is filled with the returned values
- int i = 0;
- while(ls.hasMoreElements()){
- ByteBuffer origbb = ByteBuffer.wrap(entries.get(i));
- Integer origEntry = origbb.getInt();
- byte[] entry = ls.nextElement().getEntry();
- ByteBuffer result = ByteBuffer.wrap(entry);
- LOG.debug("Length of result: " + result.capacity());
- LOG.debug("Original entry: " + origEntry);
-
- Integer retrEntry = result.getInt();
- LOG.debug("Retrieved entry: " + retrEntry);
- assertTrue("Checking entry " + i + " for equality", origEntry.equals(retrEntry));
- assertTrue("Checking entry " + i + " for size", entry.length == entriesSize.get(i).intValue());
- i++;
- }
- lh.close();
- } catch (KeeperException e) {
- e.printStackTrace();
- } catch (BKException e) {
- e.printStackTrace();
- } catch (InterruptedException e) {
- e.printStackTrace();
- } //catch (NoSuchAlgorithmException e) {
- // e.printStackTrace();
- //}
-
- }
-
+ public void testReadWriteAsyncSingleClient() throws IOException{
+ try {
+ // Create a BookKeeper client and a ledger
+ bkc = new BookKeeper("127.0.0.1");
+ lh = bkc.createLedger(ledgerPassword);
+ //bkc.initMessageDigest("SHA1");
+ ledgerId = lh.getId();
+ LOG.info("Ledger ID: " + lh.getId());
+ for(int i = 0; i < numEntriesToWrite; i++){
+ ByteBuffer entry = ByteBuffer.allocate(4);
+ entry.putInt(rng.nextInt(maxInt));
+ entry.position(0);
+
+ entries.add(entry.array());
+ entriesSize.add(entry.array().length);
+ lh.asyncAddEntry(entry.array(), this, sync);
+ }
+
+ // wait for all entries to be acknowledged
+ synchronized (sync) {
+ while (sync.counter < numEntriesToWrite){
+ LOG.debug("Entries counter = " + sync.counter);
+ sync.wait();
+ }
+ }
+
+ LOG.debug("*** WRITE COMPLETE ***");
+ // close ledger
+ lh.close();
+
+ //*** WRITING PART COMPLETE // READ PART BEGINS ***
+
+ // open ledger
+ lh = bkc.openLedger(ledgerId, ledgerPassword);
+ LOG.debug("Number of entries written: " + lh.getLast());
+ assertTrue("Verifying number of entries written", lh.getLast() == (numEntriesToWrite - 1));
+
+ //read entries
+ lh.asyncReadEntries(0, numEntriesToWrite - 1, this, (Object) sync);
+
+ synchronized (sync) {
+ while(sync.value == false){
+ sync.wait();
+ }
+ }
+
+ assertTrue("Checking number of read entries", ls.size() == numEntriesToWrite);
+
+ LOG.debug("*** READ COMPLETE ***");
+
+ // at this point, LedgerSequence ls is filled with the returned values
+ int i = 0;
+ while(ls.hasMoreElements()){
+ ByteBuffer origbb = ByteBuffer.wrap(entries.get(i));
+ Integer origEntry = origbb.getInt();
+ byte[] entry = ls.nextElement().getEntry();
+ ByteBuffer result = ByteBuffer.wrap(entry);
+ LOG.debug("Length of result: " + result.capacity());
+ LOG.debug("Original entry: " + origEntry);
+
+ Integer retrEntry = result.getInt();
+ LOG.debug("Retrieved entry: " + retrEntry);
+ assertTrue("Checking entry " + i + " for equality", origEntry.equals(retrEntry));
+ assertTrue("Checking entry " + i + " for size", entry.length == entriesSize.get(i).intValue());
+ i++;
+ }
+ lh.close();
+ } catch (KeeperException e) {
+ LOG.error("Test failed", e);
+ fail("Test failed due to ZooKeeper exception");
+ } catch (BKException e) {
+ LOG.error("Test failed", e);
+ fail("Test failed due to BookKeeper exception");
+ } catch (InterruptedException e) {
+ LOG.error("Test failed", e);
+ fail("Test failed due to interruption");
+ }
+ }
+
@Test
- public void testSyncReadAsyncWriteStringsSingleClient() throws IOException{
- LOG.info("TEST READ WRITE STRINGS MIXED SINGLE CLIENT");
- String charset = "utf-8";
- LOG.debug("Default charset: " + Charset.defaultCharset());
- try {
- // Create a BookKeeper client and a ledger
- bkc = new BookKeeper("127.0.0.1");
- lh = bkc.createLedger(ledgerPassword);
- //bkc.initMessageDigest("SHA1");
- ledgerId = lh.getId();
- LOG.info("Ledger ID: " + lh.getId());
- for(int i = 0; i < numEntriesToWrite; i++){
- int randomInt = rng.nextInt(maxInt);
- byte[] entry = new String(Integer.toString(randomInt)).getBytes(charset);
- entries.add(entry);
- lh.asyncAddEntry(entry, this, sync);
- }
-
- // wait for all entries to be acknowledged
- synchronized (sync) {
- while (sync.counter < numEntriesToWrite){
- LOG.debug("Entries counter = " + sync.counter);
- sync.wait();
- }
- }
-
- LOG.debug("*** ASYNC WRITE COMPLETE ***");
- // close ledger
- lh.close();
-
- //*** WRITING PART COMPLETED // READ PART BEGINS ***
-
- // open ledger
- lh = bkc.openLedger(ledgerId, ledgerPassword);
- LOG.debug("Number of entries written: " + lh.getLast());
- assertTrue("Verifying number of entries written", lh.getLast() == (numEntriesToWrite - 1));
-
- //read entries
- ls = lh.readEntries(0, numEntriesToWrite - 1);
-
- assertTrue("Checking number of read entries", ls.size() == numEntriesToWrite);
-
- LOG.debug("*** SYNC READ COMPLETE ***");
-
- // at this point, LedgerSequence ls is filled with the returned values
- int i = 0;
- while(ls.hasMoreElements()){
- byte[] origEntryBytes = entries.get(i++);
- byte[] retrEntryBytes = ls.nextElement().getEntry();
-
- LOG.debug("Original byte entry size: " + origEntryBytes.length);
- LOG.debug("Saved byte entry size: " + retrEntryBytes.length);
-
- String origEntry = new String(origEntryBytes, charset);
- String retrEntry = new String(retrEntryBytes, charset);
-
- LOG.debug("Original entry: " + origEntry);
- LOG.debug("Retrieved entry: " + retrEntry);
-
- assertTrue("Checking entry " + i + " for equality", origEntry.equals(retrEntry));
- }
- lh.close();
- } catch (KeeperException e) {
- e.printStackTrace();
- } catch (BKException e) {
- e.printStackTrace();
- } catch (InterruptedException e) {
- e.printStackTrace();
- } //catch (NoSuchAlgorithmException e) {
- // e.printStackTrace();
- //}
-
- }
+ public void testSyncReadAsyncWriteStringsSingleClient() throws IOException{
+ LOG.info("TEST READ WRITE STRINGS MIXED SINGLE CLIENT");
+ String charset = "utf-8";
+ LOG.debug("Default charset: " + Charset.defaultCharset());
+ try {
+ // Create a BookKeeper client and a ledger
+ bkc = new BookKeeper("127.0.0.1");
+ lh = bkc.createLedger(ledgerPassword);
+ //bkc.initMessageDigest("SHA1");
+ ledgerId = lh.getId();
+ LOG.info("Ledger ID: " + lh.getId());
+ for(int i = 0; i < numEntriesToWrite; i++){
+ int randomInt = rng.nextInt(maxInt);
+ byte[] entry = new String(Integer.toString(randomInt)).getBytes(charset);
+ entries.add(entry);
+ lh.asyncAddEntry(entry, this, sync);
+ }
+
+ // wait for all entries to be acknowledged
+ synchronized (sync) {
+ while (sync.counter < numEntriesToWrite){
+ LOG.debug("Entries counter = " + sync.counter);
+ sync.wait();
+ }
+ }
+
+ LOG.debug("*** ASYNC WRITE COMPLETE ***");
+ // close ledger
+ lh.close();
+
+ //*** WRITING PART COMPLETED // READ PART BEGINS ***
+
+ // open ledger
+ lh = bkc.openLedger(ledgerId, ledgerPassword);
+ LOG.debug("Number of entries written: " + lh.getLast());
+ assertTrue("Verifying number of entries written", lh.getLast() == (numEntriesToWrite - 1));
+
+ //read entries
+ ls = lh.readEntries(0, numEntriesToWrite - 1);
+
+ assertTrue("Checking number of read entries", ls.size() == numEntriesToWrite);
+
+ LOG.debug("*** SYNC READ COMPLETE ***");
+
+ // at this point, LedgerSequence ls is filled with the returned values
+ int i = 0;
+ while(ls.hasMoreElements()){
+ byte[] origEntryBytes = entries.get(i++);
+ byte[] retrEntryBytes = ls.nextElement().getEntry();
+
+ LOG.debug("Original byte entry size: " + origEntryBytes.length);
+ LOG.debug("Saved byte entry size: " + retrEntryBytes.length);
+
+ String origEntry = new String(origEntryBytes, charset);
+ String retrEntry = new String(retrEntryBytes, charset);
+
+ LOG.debug("Original entry: " + origEntry);
+ LOG.debug("Retrieved entry: " + retrEntry);
+
+ assertTrue("Checking entry " + i + " for equality", origEntry.equals(retrEntry));
+ }
+ lh.close();
+ } catch (KeeperException e) {
+ LOG.error("Test failed", e);
+ fail("Test failed due to ZooKeeper exception");
+ } catch (BKException e) {
+ LOG.error("Test failed", e);
+ fail("Test failed due to BookKeeper exception");
+ } catch (InterruptedException e) {
+ LOG.error("Test failed", e);
+ fail("Test failed due to interruption");
+ }
+
+ }
@Test
public void testReadWriteSyncSingleClient() throws IOException {
- try {
- // Create a BookKeeper client and a ledger
- bkc = new BookKeeper("127.0.0.1");
- lh = bkc.createLedger(ledgerPassword);
- //bkc.initMessageDigest("SHA1");
- ledgerId = lh.getId();
- LOG.info("Ledger ID: " + lh.getId());
- for(int i = 0; i < numEntriesToWrite; i++){
- ByteBuffer entry = ByteBuffer.allocate(4);
- entry.putInt(rng.nextInt(maxInt));
- entry.position(0);
- entries.add(entry.array());
- lh.addEntry(entry.array());
- }
- lh.close();
- lh = bkc.openLedger(ledgerId, ledgerPassword);
- LOG.debug("Number of entries written: " + lh.getLast());
- assertTrue("Verifying number of entries written", lh.getLast() == (numEntriesToWrite - 1));
-
- ls = lh.readEntries(0, numEntriesToWrite - 1);
- int i = 0;
- while(ls.hasMoreElements()){
- ByteBuffer origbb = ByteBuffer.wrap(entries.get(i++));
- Integer origEntry = origbb.getInt();
- ByteBuffer result = ByteBuffer.wrap(ls.nextElement().getEntry());
- LOG.debug("Length of result: " + result.capacity());
- LOG.debug("Original entry: " + origEntry);
-
- Integer retrEntry = result.getInt();
- LOG.debug("Retrieved entry: " + retrEntry);
- assertTrue("Checking entry " + i + " for equality", origEntry.equals(retrEntry));
- }
- lh.close();
- } catch (KeeperException e) {
- e.printStackTrace();
- } catch (BKException e) {
- e.printStackTrace();
- } catch (InterruptedException e) {
- e.printStackTrace();
- } //catch (NoSuchAlgorithmException e) {
- // e.printStackTrace();
- //}
- }
-
+ try {
+ // Create a BookKeeper client and a ledger
+ bkc = new BookKeeper("127.0.0.1");
+ lh = bkc.createLedger(ledgerPassword);
+ //bkc.initMessageDigest("SHA1");
+ ledgerId = lh.getId();
+ LOG.info("Ledger ID: " + lh.getId());
+ for(int i = 0; i < numEntriesToWrite; i++){
+ ByteBuffer entry = ByteBuffer.allocate(4);
+ entry.putInt(rng.nextInt(maxInt));
+ entry.position(0);
+ entries.add(entry.array());
+ lh.addEntry(entry.array());
+ }
+ lh.close();
+ lh = bkc.openLedger(ledgerId, ledgerPassword);
+ LOG.debug("Number of entries written: " + lh.getLast());
+ assertTrue("Verifying number of entries written", lh.getLast() == (numEntriesToWrite - 1));
+
+ ls = lh.readEntries(0, numEntriesToWrite - 1);
+ int i = 0;
+ while(ls.hasMoreElements()){
+ ByteBuffer origbb = ByteBuffer.wrap(entries.get(i++));
+ Integer origEntry = origbb.getInt();
+ ByteBuffer result = ByteBuffer.wrap(ls.nextElement().getEntry());
+ LOG.debug("Length of result: " + result.capacity());
+ LOG.debug("Original entry: " + origEntry);
+
+ Integer retrEntry = result.getInt();
+ LOG.debug("Retrieved entry: " + retrEntry);
+ assertTrue("Checking entry " + i + " for equality", origEntry.equals(retrEntry));
+ }
+ lh.close();
+ } catch (KeeperException e) {
+ LOG.error("Test failed", e);
+ fail("Test failed due to ZooKeeper exception");
+ } catch (BKException e) {
+ LOG.error("Test failed", e);
+ fail("Test failed due to BookKeeper exception");
+ } catch (InterruptedException e) {
+ LOG.error("Test failed", e);
+ fail("Test failed due to interruption");
+ }
+ }
+
@Test
public void testReadWriteZero() throws IOException {
- try {
- // Create a BookKeeper client and a ledger
- bkc = new BookKeeper("127.0.0.1");
- lh = bkc.createLedger(ledgerPassword);
- //bkc.initMessageDigest("SHA1");
- ledgerId = lh.getId();
- LOG.info("Ledger ID: " + lh.getId());
- for(int i = 0; i < numEntriesToWrite; i++){
- lh.addEntry(new byte[0]);
- }
-
- /*
- * Write a non-zero entry
- */
- ByteBuffer entry = ByteBuffer.allocate(4);
- entry.putInt(rng.nextInt(maxInt));
- entry.position(0);
- entries.add(entry.array());
- lh.addEntry( entry.array());
-
- lh.close();
- lh = bkc.openLedger(ledgerId, ledgerPassword);
- LOG.debug("Number of entries written: " + lh.getLast());
- assertTrue("Verifying number of entries written", lh.getLast() == numEntriesToWrite);
-
- ls = lh.readEntries(0, numEntriesToWrite - 1);
- int i = 0;
- while(ls.hasMoreElements()){
- ByteBuffer result = ByteBuffer.wrap(ls.nextElement().getEntry());
- LOG.debug("Length of result: " + result.capacity());
-
- assertTrue("Checking if entry " + i + " has zero bytes", result.capacity() == 0);
- }
- lh.close();
- } catch (KeeperException e) {
- e.printStackTrace();
- } catch (BKException e) {
- e.printStackTrace();
- } catch (InterruptedException e) {
- e.printStackTrace();
- } //catch (NoSuchAlgorithmException e) {
- // e.printStackTrace();
- //}
- }
+ try {
+ // Create a BookKeeper client and a ledger
+ bkc = new BookKeeper("127.0.0.1");
+ lh = bkc.createLedger(ledgerPassword);
+ //bkc.initMessageDigest("SHA1");
+ ledgerId = lh.getId();
+ LOG.info("Ledger ID: " + lh.getId());
+ for(int i = 0; i < numEntriesToWrite; i++){
+ lh.addEntry(new byte[0]);
+ }
+
+ /*
+ * Write a non-zero entry
+ */
+ ByteBuffer entry = ByteBuffer.allocate(4);
+ entry.putInt(rng.nextInt(maxInt));
+ entry.position(0);
+ entries.add(entry.array());
+ lh.addEntry( entry.array());
+
+ lh.close();
+ lh = bkc.openLedger(ledgerId, ledgerPassword);
+ LOG.debug("Number of entries written: " + lh.getLast());
+ assertTrue("Verifying number of entries written", lh.getLast() == numEntriesToWrite);
+
+ ls = lh.readEntries(0, numEntriesToWrite - 1);
+ int i = 0;
+ while(ls.hasMoreElements()){
+ ByteBuffer result = ByteBuffer.wrap(ls.nextElement().getEntry());
+ LOG.debug("Length of result: " + result.capacity());
+
+ assertTrue("Checking if entry " + i + " has zero bytes", result.capacity() == 0);
+ }
+ lh.close();
+ } catch (KeeperException e) {
+ LOG.error("Test failed", e);
+ fail("Test failed due to ZooKeeper exception");
+ } catch (BKException e) {
+ LOG.error("Test failed", e);
+ fail("Test failed due to BookKeeper exception");
+ } catch (InterruptedException e) {
+ LOG.error("Test failed", e);
+ fail("Test failed due to interruption");
+ }
+ }
@Test
public void testMultiLedger() throws IOException {
@@ -473,159 +476,160 @@
assertTrue("Checking if entry " + i + " has zero bytes", result.capacity() == 0);
}
-
lh2.close();
} catch (KeeperException e) {
- e.printStackTrace();
+ LOG.error("Test failed", e);
+ fail("Test failed due to ZooKeeper exception");
} catch (BKException e) {
- e.printStackTrace();
+ LOG.error("Test failed", e);
+ fail("Test failed due to BookKeeper exception");
} catch (InterruptedException e) {
- e.printStackTrace();
- } //catch (NoSuchAlgorithmException e) {
- // e.printStackTrace();
- //}
+ LOG.error("Test failed", e);
+ fail("Test failed due to interruption");
+ }
}
- public void addComplete(int rc,
- LedgerHandle lh,
- long entryId,
- Object ctx) {
- SyncObj x = (SyncObj) ctx;
- synchronized (x) {
- x.counter++;
- x.notify();
- }
- }
-
- public void readComplete(int rc,
- LedgerHandle lh,
- LedgerSequence seq,
- Object ctx) {
- ls = seq;
- synchronized (sync) {
- sync.value = true;
- sync.notify();
- }
-
- }
-
- protected void setUp() throws IOException {
- LOG.addAppender(ca);
- LOG.setLevel((Level) Level.DEBUG);
-
- // create a ZooKeeper server(dataDir, dataLogDir, port)
- LOG.debug("Running ZK server");
- //ServerStats.registerAsConcrete();
- ClientBase.setupTestEnv();
- ZkTmpDir = File.createTempFile("zookeeper", "test");
+ public void addComplete(int rc,
+ LedgerHandle lh,
+ long entryId,
+ Object ctx) {
+ SyncObj x = (SyncObj) ctx;
+ synchronized (x) {
+ x.counter++;
+ x.notify();
+ }
+ }
+
+ public void readComplete(int rc,
+ LedgerHandle lh,
+ LedgerSequence seq,
+ Object ctx) {
+ ls = seq;
+ synchronized (sync) {
+ sync.value = true;
+ sync.notify();
+ }
+
+ }
+
+ protected void setUp() throws IOException, InterruptedException {
+ LOG.addAppender(ca);
+ LOG.setLevel((Level) Level.DEBUG);
+
+ // create a ZooKeeper server(dataDir, dataLogDir, port)
+ LOG.debug("Running ZK server");
+ //ServerStats.registerAsConcrete();
+ ClientBase.setupTestEnv();
+ ZkTmpDir = File.createTempFile("zookeeper", "test");
ZkTmpDir.delete();
ZkTmpDir.mkdir();
-
- try {
- zks = new ZooKeeperServer(ZkTmpDir, ZkTmpDir, ZooKeeperDefaultPort);
- serverFactory = new NIOServerCnxn.Factory(ZooKeeperDefaultPort);
- serverFactory.startup(zks);
- } catch (IOException e1) {
- // TODO Auto-generated catch block
- e1.printStackTrace();
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
+
+ try {
+ zks = new ZooKeeperServer(ZkTmpDir, ZkTmpDir, ZooKeeperDefaultPort);
+ serverFactory = new NIOServerCnxn.Factory(ZooKeeperDefaultPort);
+ serverFactory.startup(zks);
+ } catch (IOException e1) {
+ // TODO Auto-generated catch block
+ e1.printStackTrace();
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
boolean b = ClientBase.waitForServerUp(HOSTPORT, ClientBase.CONNECTION_TIMEOUT);
-
+
LOG.debug("Server up: " + b);
- // create a zookeeper client
- LOG.debug("Instantiate ZK Client");
- zkc = new ZooKeeper("127.0.0.1", ZooKeeperDefaultPort, new emptyWatcher());
-
- //initialize the zk client with values
- try {
- zkc.create("/ledgers", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- zkc.create("/ledgers/available", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- zkc.create("/ledgers/available/127.0.0.1:" + Integer.toString(initialPort), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- zkc.create("/ledgers/available/127.0.0.1:" + Integer.toString(initialPort + 1), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- zkc.create("/ledgers/available/127.0.0.1:" + Integer.toString(initialPort + 2), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- } catch (KeeperException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
-
- // Create Bookie Servers (B1, B2, B3)
- tmpDirB1 = File.createTempFile("bookie1", "test");
+ // create a zookeeper client
+ LOG.debug("Instantiate ZK Client");
+ zkc = new ZooKeeper("127.0.0.1", ZooKeeperDefaultPort, new emptyWatcher());
+
+ //initialize the zk client with values
+ try {
+ zkc.create("/ledgers", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ zkc.create("/ledgers/available", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ zkc.create("/ledgers/available/127.0.0.1:" + Integer.toString(initialPort), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ zkc.create("/ledgers/available/127.0.0.1:" + Integer.toString(initialPort + 1), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ zkc.create("/ledgers/available/127.0.0.1:" + Integer.toString(initialPort + 2), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ } catch (KeeperException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+
+ // Create Bookie Servers (B1, B2, B3)
+ tmpDirB1 = File.createTempFile("bookie1", "test");
tmpDirB1.delete();
tmpDirB1.mkdir();
-
- bs1 = new BookieServer(initialPort, tmpDirB1, new File[]{tmpDirB1});
- bs1.start();
-
- tmpDirB2 = File.createTempFile("bookie2", "test");
+
+ bs1 = new BookieServer(initialPort, tmpDirB1, new File[]{tmpDirB1});
+ bs1.start();
+
+ tmpDirB2 = File.createTempFile("bookie2", "test");
tmpDirB2.delete();
tmpDirB2.mkdir();
-
- bs2 = new BookieServer(initialPort + 1, tmpDirB2, new File[]{tmpDirB2});
- bs2.start();
+
+ bs2 = new BookieServer(initialPort + 1, tmpDirB2, new File[]{tmpDirB2});
+ bs2.start();
- tmpDirB3 = File.createTempFile("bookie3", "test");
+ tmpDirB3 = File.createTempFile("bookie3", "test");
tmpDirB3.delete();
tmpDirB3.mkdir();
- bs3 = new BookieServer(initialPort + 2, tmpDirB3, new File[]{tmpDirB3});
- bs3.start();
-
- rng = new Random(System.currentTimeMillis()); // Initialize the Random Number Generator
- entries = new ArrayList<byte[]>(); // initialize the entries list
- entriesSize = new ArrayList<Integer>();
- sync = new SyncObj(); // initialize the synchronization data structure
- }
-
- protected void tearDown(){
- LOG.info("TearDown");
-
- //shutdown bookie servers
- try {
- bs1.shutdown();
- bs2.shutdown();
- bs3.shutdown();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- cleanUpDir(tmpDirB1);
- cleanUpDir(tmpDirB2);
- cleanUpDir(tmpDirB3);
-
- //shutdown ZK server
- serverFactory.shutdown();
- assertTrue("waiting for server down",
+ bs3 = new BookieServer(initialPort + 2, tmpDirB3, new File[]{tmpDirB3});
+ bs3.start();
+
+ rng = new Random(System.currentTimeMillis()); // Initialize the Random Number Generator
+ entries = new ArrayList<byte[]>(); // initialize the entries list
+ entriesSize = new ArrayList<Integer>();
+ sync = new SyncObj(); // initialize the synchronization data structure
+ zkc.close();
+ }
+
+ protected void tearDown(){
+ LOG.info("TearDown");
+
+ //shutdown bookie servers
+ try {
+ bs1.shutdown();
+ bs2.shutdown();
+ bs3.shutdown();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ cleanUpDir(tmpDirB1);
+ cleanUpDir(tmpDirB2);
+ cleanUpDir(tmpDirB3);
+
+ //shutdown ZK server
+ serverFactory.shutdown();
+ assertTrue("waiting for server down",
ClientBase.waitForServerDown(HOSTPORT,
ClientBase.CONNECTION_TIMEOUT));
- //ServerStats.unregister();
- cleanUpDir(ZkTmpDir);
-
- }
-
- /* Clean up a directory recursively */
- protected boolean cleanUpDir(File dir){
- if (dir.isDirectory()) {
- LOG.info("Cleaning up " + dir.getName());
+ //ServerStats.unregister();
+ cleanUpDir(ZkTmpDir);
+
+ }
+
+ /* Clean up a directory recursively */
+ protected boolean cleanUpDir(File dir){
+ if (dir.isDirectory()) {
+ LOG.info("Cleaning up " + dir.getName());
String[] children = dir.list();
for (String string : children) {
- boolean success = cleanUpDir(new File(dir, string));
- if (!success) return false;
- }
+ boolean success = cleanUpDir(new File(dir, string));
+ if (!success) return false;
+ }
}
// The directory is now empty so delete it
- return dir.delete();
- }
+ return dir.delete();
+ }
- /* User for testing purposes, void */
- class emptyWatcher implements Watcher{
- public void process(WatchedEvent event) {}
- }
+ /* User for testing purposes, void */
+ class emptyWatcher implements Watcher{
+ public void process(WatchedEvent event) {}
+ }
}
Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/CloseTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/CloseTest.java?rev=787907&r1=787906&r2=787907&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/CloseTest.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/CloseTest.java Wed Jun 24 05:07:23 2009
@@ -141,6 +141,7 @@
zk.create("/ledgers/available/" + BOOKIEADDR1, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT );
zk.create("/ledgers/available/" + BOOKIEADDR2, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT );
zk.create("/ledgers/available/" + BOOKIEADDR3, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT );
+ zk.close();
} catch (KeeperException ke) {
LOG.error(ke);
fail("Couldn't execute ZooKeeper start procedure");
@@ -230,6 +231,9 @@
} catch(InterruptedException e){
LOG.error("Interrupted when adding entry", e);
fail("Couldn't finish adding entries");
+ } catch(BKException e){
+ LOG.error("BookKeeper exception", e);
+ fail("BookKeeper exception when adding entries");
}
try{
@@ -249,6 +253,9 @@
} catch(InterruptedException e){
LOG.error("Interrupted when adding entry", e);
fail("Couldn't finish adding entries");
+ } catch(BKException e){
+ LOG.error("BookKeeper exception", e);
+ fail("CBookKeeper exception while adding entries");
}
try{
@@ -270,6 +277,9 @@
} catch(InterruptedException e){
LOG.error("Interrupted when adding entry", e);
fail("Couldn't finish adding entries");
+ } catch(BKException e){
+ LOG.error("BookKeeper exception", e);
+ fail("BookKeeper exception when adding entries");
}
try{
@@ -279,22 +289,6 @@
LOG.error(e);
fail("Exception while closing ledger 4");
}
- /*
- LedgerHandle afterlh = bk.openLedger(beforelh.getId(), "".getBytes());
-
- } catch (KeeperException e) {
- LOG.error("Error when opening ledger", e);
- fail("Couldn't open ledger");
- } /* catch (InterruptedException ie) {
- LOG.error("Interrupted exception", ie);
- fail("Failure due to interrupted exception");
- } catch (IOException ioe) {
- LOG.error("IO Exception", ioe);
- fail("Failure due to IO exception");
- } catch (BKException bke){
- LOG.error("BookKeeper error", bke);
- fail("BookKeeper error");
- }*/
}
}
Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/LedgerRecoveryTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/LedgerRecoveryTest.java?rev=787907&r1=787906&r2=787907&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/LedgerRecoveryTest.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/LedgerRecoveryTest.java Wed Jun 24 05:07:23 2009
@@ -140,6 +140,7 @@
zk.create("/ledgers/available/" + BOOKIEADDR1, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT );
zk.create("/ledgers/available/" + BOOKIEADDR2, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT );
zk.create("/ledgers/available/" + BOOKIEADDR3, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT );
+ zk.close();
} catch (KeeperException ke) {
LOG.error(ke);
fail("Couldn't execute ZooKeeper start procedure");
@@ -219,21 +220,16 @@
for(int i = 0; i < 1000; i++){
beforelh.addEntry(tmp.getBytes());
}
+
+ //bk.resetLedger(beforelh);
} catch(InterruptedException e){
LOG.error("Interrupted when adding entry", e);
fail("Couldn't finish adding entries");
+ } catch(BKException e){
+ LOG.error("BookKeeper exception", e);
+ fail("BookKeeper exception while adding entries");
}
- ///*
- // * Sleep.
- // */
- //try{
- // Thread.sleep(2000);
- //} catch(InterruptedException e){
- // LOG.error("Interrupted while sleeping", e);
- // fail("Couldn't finish sleeping");
- //}
-
/*
* Try to open ledger.
*/
@@ -307,17 +303,11 @@
} catch(InterruptedException e){
LOG.error("Interrupted when adding entry", e);
fail("Couldn't finish adding entries");
+ } catch(BKException e){
+ LOG.error("BookKeeper exception", e);
+ fail("BookKeeper exception while adding entries");
}
- ///*
- // * Sleep.
- // */
- //try{
- // Thread.sleep(2000);
- //} catch(InterruptedException e){
- // LOG.error("Interrupted while sleeping", e);
- // fail("Couldn't finish sleeping");
- //}
/*
* Try to open ledger.