You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ma...@apache.org on 2010/01/27 00:16:49 UTC
svn commit: r903483 [5/6] - in /hadoop/zookeeper/trunk: ./
src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/
src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/
src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/ src/cont...
Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/util/SafeRunnable.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/util/SafeRunnable.java?rev=903483&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/util/SafeRunnable.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/util/SafeRunnable.java Tue Jan 26 23:16:45 2010
@@ -0,0 +1,38 @@
+package org.apache.bookkeeper.util;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.log4j.Logger;
+
+public abstract class SafeRunnable implements Runnable{
+
+ static final Logger logger = Logger.getLogger(SafeRunnable.class);
+
+@Override
+ public void run() {
+ try{
+ safeRun();
+ }catch(Throwable t){
+ logger.fatal("Unexpected throwable caught ", t);
+ }
+ }
+
+ public abstract void safeRun();
+
+}
Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/util/StringUtils.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/util/StringUtils.java?rev=903483&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/util/StringUtils.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/util/StringUtils.java Tue Jan 26 23:16:45 2010
@@ -0,0 +1,94 @@
+package org.apache.bookkeeper.util;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+/**
+ * Provided utilites for parsing network addresses, ledger-id from node paths
+ * etc.
+ *
+ */
+public class StringUtils {
+
+ /*
+ * Path to ledger metadata. ZooKeeper appends a sequence number to L.
+ */
+ static public final String prefix = "/ledgers/L";
+
+ /**
+ * Parses address into IP and port.
+ *
+ * @param addr
+ * String
+ */
+
+ public static InetSocketAddress parseAddr(String s) throws IOException {
+
+ String parts[] = s.split(":");
+ if (parts.length != 2) {
+ throw new IOException(s + " does not have the form host:port");
+ }
+ int port;
+ try {
+ port = Integer.parseInt(parts[1]);
+ } catch (NumberFormatException e) {
+ throw new IOException(s + " does not have the form host:port");
+ }
+
+ InetSocketAddress addr = new InetSocketAddress(parts[0], port);
+ return addr;
+ }
+
+ public static StringBuilder addrToString(StringBuilder sb, InetSocketAddress addr) {
+ return sb.append(addr.getAddress().getHostAddress()).append(":").append(addr.getPort());
+ }
+
+ /**
+ * Formats ledger ID according to ZooKeeper rules
+ *
+ * @param id
+ * znode id
+ */
+ public static String getZKStringId(long id) {
+ return String.format("%010d", id);
+ }
+
+ /**
+ * Get the path for the ledger metadata node
+ *
+ * @return
+ */
+ public static String getLedgerNodePath(long ledgerId) {
+ return prefix + StringUtils.getZKStringId(ledgerId);
+ }
+
+ public static long getLedgerId(String nodeName) throws IOException {
+ long ledgerId;
+ try {
+ String parts[] = nodeName.split(prefix);
+ ledgerId = Long.parseLong(parts[parts.length - 1]);
+ } catch (NumberFormatException e) {
+ throw new IOException(e);
+ }
+ return ledgerId;
+ }
+
+}
Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/AsyncLedgerOpsTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/AsyncLedgerOpsTest.java?rev=903483&r1=903482&r2=903483&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/AsyncLedgerOpsTest.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/AsyncLedgerOpsTest.java Tue Jan 26 23:16:45 2010
@@ -1,4 +1,5 @@
package org.apache.bookkeeper.test;
+
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -20,192 +21,158 @@
*
*/
-
-import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Enumeration;
import java.util.Random;
import java.util.Set;
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
-import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
import org.apache.bookkeeper.client.LedgerHandle;
-import org.apache.bookkeeper.client.LedgerSequence;
import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
-import org.apache.bookkeeper.client.LedgerHandle.QMode;
-import org.apache.bookkeeper.proto.BookieServer;
-import org.apache.bookkeeper.streaming.LedgerInputStream;
-import org.apache.bookkeeper.streaming.LedgerOutputStream;
-
-import org.apache.log4j.ConsoleAppender;
-import org.apache.log4j.Level;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.log4j.Logger;
-import org.apache.log4j.PatternLayout;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.server.NIOServerCnxn;
-import org.apache.zookeeper.server.ZooKeeperServer;
-import org.apache.zookeeper.test.ClientBase;
-
+import org.junit.Before;
import org.junit.Test;
-
-//import BookieReadWriteTest.SyncObj;
-//import BookieReadWriteTest.emptyWatcher;
+import org.junit.runners.Parameterized.Parameters;
/**
- * This test tests read and write, synchronous and
- * asynchronous, strings and integers for a BookKeeper client.
- * The test deployment uses a ZooKeeper server
- * and three BookKeepers.
+ * This test tests read and write, synchronous and asynchronous, strings and
+ * integers for a BookKeeper client. The test deployment uses a ZooKeeper server
+ * and three BookKeepers.
*
*/
-
-public class AsyncLedgerOpsTest
- extends junit.framework.TestCase
- implements AddCallback,
- ReadCallback,
- CreateCallback,
- CloseCallback,
- OpenCallback{
+public class AsyncLedgerOpsTest extends BaseTestCase implements AddCallback, ReadCallback, CreateCallback,
+ CloseCallback, OpenCallback {
static Logger LOG = Logger.getLogger(BookieClientTest.class);
- static ConsoleAppender ca = new ConsoleAppender(new PatternLayout());
-
- // ZooKeeper related variables
- private static final String HOSTPORT = "127.0.0.1:2181";
- static Integer ZooKeeperDefaultPort = 2181;
- ZooKeeperServer zks;
- ZooKeeper zkc; //zookeeper client
- NIOServerCnxn.Factory serverFactory;
- File ZkTmpDir;
+ DigestType digestType;
+
+ public AsyncLedgerOpsTest(DigestType digestType) {
+ super(3);
+ this.digestType = digestType;
+ }
+
+ @Parameters
+ public static Collection<Object[]> configs(){
+ return Arrays.asList(new Object[][]{ {DigestType.MAC }, {DigestType.CRC32}});
+ }
- //BookKeeper
- File tmpDirB1, tmpDirB2, tmpDirB3;
- BookieServer bs1, bs2, bs3;
- Integer initialPort = 5000;
- BookKeeper bkc; // bookkeeper client
+
byte[] ledgerPassword = "aaa".getBytes();
LedgerHandle lh, lh2;
long ledgerId;
- LedgerSequence ls;
-
- //test related variables
+ Enumeration<LedgerEntry> ls;
+
+ // test related variables
int numEntriesToWrite = 20;
int maxInt = 2147483647;
- Random rng; // Random Number Generator
+ Random rng; // Random Number Generator
ArrayList<byte[]> entries; // generated entries
ArrayList<Integer> entriesSize;
-
+
// Synchronization
SyncObj sync;
Set<Object> syncObjs;
-
+
class SyncObj {
int counter;
- boolean value;
+ boolean value;
+
public SyncObj() {
counter = 0;
value = false;
- }
+ }
}
-
- class ControlObj{
+
+ class ControlObj {
LedgerHandle lh;
-
- void setLh(LedgerHandle lh){
+
+ void setLh(LedgerHandle lh) {
this.lh = lh;
}
-
- LedgerHandle getLh(){
+
+ LedgerHandle getLh() {
return lh;
}
}
-
+
@Test
- public void testAsyncCreateClose() throws IOException{
+ public void testAsyncCreateClose() throws IOException {
try {
- // Create a BookKeeper client and a ledger
- bkc = new BookKeeper("127.0.0.1");
-
- ControlObj ctx = new ControlObj();
- synchronized(ctx){
- bkc.asyncCreateLedger(3, 2,
- QMode.VERIFIABLE,
- ledgerPassword,
- this,
- ctx);
+ ControlObj ctx = new ControlObj();
+
+ synchronized (ctx) {
+ LOG.info("Going to create ledger asynchronously");
+ bkc.asyncCreateLedger(3, 2, digestType, ledgerPassword, this, ctx);
+
ctx.wait();
}
-
-
- //bkc.initMessageDigest("SHA1");
+
+ // bkc.initMessageDigest("SHA1");
LedgerHandle lh = ctx.getLh();
ledgerId = lh.getId();
LOG.info("Ledger ID: " + lh.getId());
- for(int i = 0; i < numEntriesToWrite; i++){
+ for (int i = 0; i < numEntriesToWrite; i++) {
ByteBuffer entry = ByteBuffer.allocate(4);
entry.putInt(rng.nextInt(maxInt));
entry.position(0);
-
+
entries.add(entry.array());
entriesSize.add(entry.array().length);
lh.asyncAddEntry(entry.array(), this, sync);
}
-
+
// wait for all entries to be acknowledged
synchronized (sync) {
- if (sync.counter < numEntriesToWrite){
+ while (sync.counter < numEntriesToWrite) {
LOG.debug("Entries counter = " + sync.counter);
sync.wait();
}
}
-
+
LOG.debug("*** WRITE COMPLETE ***");
- // close ledger
- synchronized(ctx){
+ // close ledger
+ synchronized (ctx) {
lh.asyncClose(this, ctx);
ctx.wait();
}
-
- //*** WRITING PART COMPLETE // READ PART BEGINS ***
-
+
+ // *** WRITING PART COMPLETE // READ PART BEGINS ***
+
// open ledger
- synchronized(ctx){
- bkc.asyncOpenLedger(ledgerId, ledgerPassword, this, ctx);
+ synchronized (ctx) {
+ bkc.asyncOpenLedger(ledgerId, digestType, ledgerPassword, this, ctx);
ctx.wait();
}
lh = ctx.getLh();
-
- LOG.debug("Number of entries written: " + lh.getLast());
- assertTrue("Verifying number of entries written", lh.getLast() == numEntriesToWrite);
-
- //read entries
- lh.asyncReadEntries(0, numEntriesToWrite - 1, this, (Object) sync);
-
+
+ LOG.debug("Number of entries written: " + lh.getLastAddConfirmed());
+ assertTrue("Verifying number of entries written", lh.getLastAddConfirmed() == (numEntriesToWrite - 1));
+
+ // read entries
+ lh.asyncReadEntries(0, numEntriesToWrite - 1, this, sync);
+
synchronized (sync) {
- while(sync.value == false){
+ while (sync.value == false) {
sync.wait();
- }
+ }
}
-
- assertTrue("Checking number of read entries", ls.size() == numEntriesToWrite);
-
+
LOG.debug("*** READ COMPLETE ***");
-
- // at this point, LedgerSequence ls is filled with the returned values
+
+ // at this point, LedgerSequence ls is filled with the returned
+ // values
int i = 0;
- while(ls.hasMoreElements()){
+ while (ls.hasMoreElements()) {
ByteBuffer origbb = ByteBuffer.wrap(entries.get(i));
Integer origEntry = origbb.getInt();
byte[] entry = ls.nextElement().getEntry();
@@ -219,20 +186,17 @@
assertTrue("Checking entry " + i + " for size", entry.length == entriesSize.get(i).intValue());
i++;
}
+ assertTrue("Checking number of read entries", i == numEntriesToWrite);
lh.close();
- } catch (KeeperException e) {
- e.printStackTrace();
- } catch (BKException e) {
- e.printStackTrace();
} catch (InterruptedException e) {
- e.printStackTrace();
- } //catch (NoSuchAlgorithmException e) {
- // e.printStackTrace();
- //}
-
+ LOG.error(e);
+ fail("InterruptedException");
+ } // catch (NoSuchAlgorithmException e) {
+ // e.printStackTrace();
+ // }
+
}
-
-
+
public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
SyncObj x = (SyncObj) ctx;
synchronized (x) {
@@ -241,154 +205,52 @@
}
}
- public void readComplete(int rc, LedgerHandle lh, LedgerSequence seq,
- Object ctx) {
- ls = seq;
+ public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx) {
+ ls = seq;
synchronized (sync) {
sync.value = true;
sync.notify();
}
-
+
}
-
- public void createComplete(int rc, LedgerHandle lh, Object ctx){
- synchronized(ctx){
+
+ public void createComplete(int rc, LedgerHandle lh, Object ctx) {
+ synchronized (ctx) {
ControlObj cobj = (ControlObj) ctx;
cobj.setLh(lh);
cobj.notify();
- }
+ }
}
-
- public void openComplete(int rc, LedgerHandle lh, Object ctx){
- synchronized(ctx){
+
+ public void openComplete(int rc, LedgerHandle lh, Object ctx) {
+ synchronized (ctx) {
ControlObj cobj = (ControlObj) ctx;
cobj.setLh(lh);
cobj.notify();
- }
+ }
}
-
- public void closeComplete(int rc, LedgerHandle lh, Object ctx){
- synchronized(ctx){
+
+ public void closeComplete(int rc, LedgerHandle lh, Object ctx) {
+ synchronized (ctx) {
ControlObj cobj = (ControlObj) ctx;
cobj.notify();
}
}
-
- protected void setUp() throws IOException {
- LOG.addAppender(ca);
- LOG.setLevel((Level) Level.DEBUG);
-
- // create a ZooKeeper server(dataDir, dataLogDir, port)
- LOG.debug("Running ZK server");
- //ServerStats.registerAsConcrete();
- ClientBase.setupTestEnv();
- ZkTmpDir = File.createTempFile("zookeeper", "test");
- ZkTmpDir.delete();
- ZkTmpDir.mkdir();
-
- try {
- zks = new ZooKeeperServer(ZkTmpDir, ZkTmpDir, ZooKeeperDefaultPort);
- serverFactory = new NIOServerCnxn.Factory(ZooKeeperDefaultPort);
- serverFactory.startup(zks);
- } catch (IOException e1) {
- // TODO Auto-generated catch block
- e1.printStackTrace();
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- boolean b = ClientBase.waitForServerUp(HOSTPORT, ClientBase.CONNECTION_TIMEOUT);
-
- LOG.debug("Server up: " + b);
-
- // create a zookeeper client
- LOG.debug("Instantiate ZK Client");
- zkc = new ZooKeeper("127.0.0.1", ZooKeeperDefaultPort, new emptyWatcher());
-
- //initialize the zk client with values
- try {
- zkc.create("/ledgers", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- zkc.create("/ledgers/available", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- zkc.create("/ledgers/available/127.0.0.1:" + Integer.toString(initialPort), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- zkc.create("/ledgers/available/127.0.0.1:" + Integer.toString(initialPort + 1), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- zkc.create("/ledgers/available/127.0.0.1:" + Integer.toString(initialPort + 2), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- } catch (KeeperException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
-
- // Create Bookie Servers (B1, B2, B3)
- tmpDirB1 = File.createTempFile("bookie1", "test");
- tmpDirB1.delete();
- tmpDirB1.mkdir();
-
- bs1 = new BookieServer(initialPort, tmpDirB1, new File[]{tmpDirB1});
- bs1.start();
-
- tmpDirB2 = File.createTempFile("bookie2", "test");
- tmpDirB2.delete();
- tmpDirB2.mkdir();
-
- bs2 = new BookieServer(initialPort + 1, tmpDirB2, new File[]{tmpDirB2});
- bs2.start();
- tmpDirB3 = File.createTempFile("bookie3", "test");
- tmpDirB3.delete();
- tmpDirB3.mkdir();
-
- bs3 = new BookieServer(initialPort + 2, tmpDirB3, new File[]{tmpDirB3});
- bs3.start();
-
- rng = new Random(System.currentTimeMillis()); // Initialize the Random Number Generator
- entries = new ArrayList<byte[]>(); // initialize the entries list
- entriesSize = new ArrayList<Integer>();
+
+ @Before
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ rng = new Random(System.currentTimeMillis()); // Initialize the Random
+ // Number Generator
+ entries = new ArrayList<byte[]>(); // initialize the entries list
+ entriesSize = new ArrayList<Integer>();
sync = new SyncObj(); // initialize the synchronization data structure
}
+
- protected void tearDown(){
- LOG.info("TearDown");
- //shutdown bookie servers
- try {
- bs1.shutdown();
- bs2.shutdown();
- bs3.shutdown();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- cleanUpDir(tmpDirB1);
- cleanUpDir(tmpDirB2);
- cleanUpDir(tmpDirB3);
-
- //shutdown ZK server
- serverFactory.shutdown();
- assertTrue("waiting for server down",
- ClientBase.waitForServerDown(HOSTPORT,
- ClientBase.CONNECTION_TIMEOUT));
- //ServerStats.unregister();
- cleanUpDir(ZkTmpDir);
-
- }
-
- /* Clean up a directory recursively */
- protected boolean cleanUpDir(File dir){
- if (dir.isDirectory()) {
- LOG.info("Cleaning up " + dir.getName());
- String[] children = dir.list();
- for (String string : children) {
- boolean success = cleanUpDir(new File(dir, string));
- if (!success) return false;
- }
- }
- // The directory is now empty so delete it
- return dir.delete();
- }
- /* User for testing purposes, void */
- class emptyWatcher implements Watcher{
- public void process(WatchedEvent event) {}
- }
+
}
\ No newline at end of file
Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BaseTestCase.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BaseTestCase.java?rev=903483&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BaseTestCase.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BaseTestCase.java Tue Jan 26 23:16:45 2010
@@ -0,0 +1,178 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.test;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.proto.BookieServer;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.server.NIOServerCnxn;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.test.ClientBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import junit.framework.TestCase;
+
+@RunWith(Parameterized.class)
+public abstract class BaseTestCase extends TestCase {
+ static final Logger LOG = Logger.getLogger(BaseTestCase.class);
+ // ZooKeeper related variables
+ private static final String HOSTPORT = "127.0.0.1:2181";
+ static Integer ZooKeeperDefaultPort = 2181;
+ ZooKeeperServer zks;
+ ZooKeeper zkc; // zookeeper client
+ NIOServerCnxn.Factory serverFactory;
+ File ZkTmpDir;
+
+ // BookKeeper
+ List<File> tmpDirs = new ArrayList<File>();
+ List<BookieServer> bs = new ArrayList<BookieServer>();
+ Integer initialPort = 5000;
+ int numBookies;
+ BookKeeper bkc;
+
+ public BaseTestCase(int numBookies) {
+ this.numBookies = numBookies;
+ }
+
+ @Parameters
+ public static Collection<Object[]> configs(){
+ return Arrays.asList(new Object[][]{ {DigestType.MAC }, {DigestType.CRC32}});
+ }
+
+
+ @Before
+ @Override
+ public void setUp() throws Exception {
+ try {
+ // create a ZooKeeper server(dataDir, dataLogDir, port)
+ LOG.debug("Running ZK server");
+ // ServerStats.registerAsConcrete();
+ ClientBase.setupTestEnv();
+ ZkTmpDir = File.createTempFile("zookeeper", "test");
+ ZkTmpDir.delete();
+ ZkTmpDir.mkdir();
+
+ zks = new ZooKeeperServer(ZkTmpDir, ZkTmpDir, ZooKeeperDefaultPort);
+ serverFactory = new NIOServerCnxn.Factory(ZooKeeperDefaultPort);
+ serverFactory.startup(zks);
+
+ boolean b = ClientBase.waitForServerUp(HOSTPORT, ClientBase.CONNECTION_TIMEOUT);
+
+ LOG.debug("Server up: " + b);
+
+ // create a zookeeper client
+ LOG.debug("Instantiate ZK Client");
+ zkc = new ZooKeeper("127.0.0.1", ZooKeeperDefaultPort, new emptyWatcher());
+
+ // initialize the zk client with values
+ zkc.create("/ledgers", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ zkc.create("/ledgers/available", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ for (int i = 0; i < numBookies; i++) {
+ zkc.create("/ledgers/available/127.0.0.1:" + Integer.toString(initialPort + i), new byte[0],
+ Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ }
+
+ // Create Bookie Servers (B1, B2, B3)
+ for (int i = 0; i < numBookies; i++) {
+ File f = File.createTempFile("bookie", "test");
+ tmpDirs.add(f);
+ f.delete();
+ f.mkdir();
+
+ BookieServer server = new BookieServer(initialPort + i, f, new File[] { f });
+ server.start();
+ bs.add(server);
+ }
+ zkc.close();
+ bkc = new BookKeeper("127.0.0.1");
+ } catch(Exception e) {
+ e.printStackTrace();
+ throw e;
+ }
+ }
+
+ @After
+ @Override
+ public void tearDown() throws Exception {
+ LOG.info("TearDown");
+
+ if (bkc != null) {
+ bkc.halt();;
+ }
+
+ for (BookieServer server : bs) {
+ server.shutdown();
+ }
+
+ for (File f : tmpDirs) {
+ cleanUpDir(f);
+ }
+
+ // shutdown ZK server
+ if (serverFactory != null) {
+ serverFactory.shutdown();
+ assertTrue("waiting for server down", ClientBase.waitForServerDown(HOSTPORT, ClientBase.CONNECTION_TIMEOUT));
+ }
+ // ServerStats.unregister();
+ cleanUpDir(ZkTmpDir);
+
+
+ }
+
+ /* Clean up a directory recursively */
+ protected boolean cleanUpDir(File dir) {
+ if (dir.isDirectory()) {
+ LOG.info("Cleaning up " + dir.getName());
+ String[] children = dir.list();
+ for (String string : children) {
+ boolean success = cleanUpDir(new File(dir, string));
+ if (!success)
+ return false;
+ }
+ }
+ // The directory is now empty so delete it
+ return dir.delete();
+ }
+
+ /* User for testing purposes, void */
+ class emptyWatcher implements Watcher {
+ public void process(WatchedEvent event) {
+ }
+ }
+
+}
Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieClientTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieClientTest.java?rev=903483&r1=903482&r2=903483&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieClientTest.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieClientTest.java Tue Jan 26 23:16:45 2010
@@ -1,4 +1,5 @@
package org.apache.bookkeeper.test;
+
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -20,20 +21,25 @@
*
*/
-
import java.io.File;
+import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.Arrays;
+import java.util.concurrent.Executors;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.junit.Test;
+import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.proto.BookieClient;
-import org.apache.bookkeeper.proto.BookieProtocol;
import org.apache.bookkeeper.proto.BookieServer;
-import org.apache.bookkeeper.proto.ReadEntryCallback;
-import org.apache.bookkeeper.proto.WriteCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
+import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.apache.log4j.Logger;
-
import junit.framework.TestCase;
public class BookieClientTest extends TestCase {
@@ -41,57 +47,64 @@
BookieServer bs;
File tmpDir;
int port = 13645;
+ ClientSocketChannelFactory channelFactory;
+ OrderedSafeExecutor executor;
+
+ @Override
protected void setUp() throws Exception {
tmpDir = File.createTempFile("bookie", "test");
tmpDir.delete();
tmpDir.mkdir();
bs = new BookieServer(port, tmpDir, new File[] { tmpDir });
bs.start();
+ channelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors
+ .newCachedThreadPool());
+ executor = new OrderedSafeExecutor(2);
}
+
+ @Override
protected void tearDown() throws Exception {
bs.shutdown();
recursiveDelete(tmpDir);
+ channelFactory.releaseExternalResources();
+ executor.shutdown();
}
+
private static void recursiveDelete(File dir) {
File children[] = dir.listFiles();
if (children != null) {
- for(File child: children) {
+ for (File child : children) {
recursiveDelete(child);
}
}
dir.delete();
}
-
+
static class ResultStruct {
int rc;
ByteBuffer entry;
}
+
ReadEntryCallback recb = new ReadEntryCallback() {
- public void readEntryComplete(int rc, long ledgerId, long entryId,
- ByteBuffer bb, Object ctx) {
- ResultStruct rs = (ResultStruct)ctx;
- synchronized(rs) {
- LOG.info("Capacity " + bb.capacity() + ", " + bb.position());
+ public void readEntryComplete(int rc, long ledgerId, long entryId, ChannelBuffer bb, Object ctx) {
+ ResultStruct rs = (ResultStruct) ctx;
+ synchronized (rs) {
rs.rc = rc;
- bb.position(bb.position()+16);
- //if (bb.remaining() >=4) {
- // // Skip the len
- // bb.position(bb.position()+4);
- //}
- rs.entry = bb.slice();
- LOG.info("Received " + bb.remaining());
- rs.notifyAll();
+ if (bb != null) {
+ bb.readerIndex(16);
+ rs.entry = bb.toByteBuffer();
+ rs.notifyAll();
+ }
}
}
-
+
};
WriteCallback wrcb = new WriteCallback() {
- public void writeComplete(int rc, long ledgerId, long entryId,
- Object ctx) {
+ public void writeComplete(int rc, long ledgerId, long entryId, InetSocketAddress addr, Object ctx) {
if (ctx != null) {
- synchronized(ctx) {
+ synchronized (ctx) {
ctx.notifyAll();
}
}
@@ -103,104 +116,114 @@
final Object notifyObject = new Object();
byte[] passwd = new byte[20];
Arrays.fill(passwd, (byte) 'a');
-
- BookieClient bc = new BookieClient("127.0.0.1", port, 50000);
- ByteBuffer bb;
- bb = createByteBuffer(1,1,1);
- bc.addEntry(1, passwd, 1, bb, wrcb, null);
- bb = createByteBuffer(2,1,2);
- bc.addEntry(1, passwd, 2, bb, wrcb, null);
- bb = createByteBuffer(3,1,3);
- bc.addEntry(1, passwd, 3, bb, wrcb, null);
- bb = createByteBuffer(5,1,5);
- bc.addEntry(1, passwd, 5, bb, wrcb, null);
- bb = createByteBuffer(7,1,7);
- bc.addEntry(1, passwd, 7, bb, wrcb, null);
- synchronized(notifyObject) {
- bb = createByteBuffer(11,1,11);
- bc.addEntry(1, passwd, 11, bb, wrcb, notifyObject);
+ InetSocketAddress addr = new InetSocketAddress("127.0.0.1", port);
+ ResultStruct arc = new ResultStruct();
+
+ BookieClient bc = new BookieClient(channelFactory, executor);
+ ChannelBuffer bb;
+ bb = createByteBuffer(1, 1, 1);
+ bc.addEntry(addr, 1, passwd, 1, bb, wrcb, null);
+ synchronized (arc) {
+ bc.readEntry(addr, 1, 1, recb, arc);
+ arc.wait(1000);
+ assertEquals(0, arc.rc);
+ assertEquals(1, arc.entry.getInt());
+ }
+ bb = createByteBuffer(2, 1, 2);
+ bc.addEntry(addr, 1, passwd, 2, bb, wrcb, null);
+ bb = createByteBuffer(3, 1, 3);
+ bc.addEntry(addr, 1, passwd, 3, bb, wrcb, null);
+ bb = createByteBuffer(5, 1, 5);
+ bc.addEntry(addr, 1, passwd, 5, bb, wrcb, null);
+ bb = createByteBuffer(7, 1, 7);
+ bc.addEntry(addr, 1, passwd, 7, bb, wrcb, null);
+ synchronized (notifyObject) {
+ bb = createByteBuffer(11, 1, 11);
+ bc.addEntry(addr, 1, passwd, 11, bb, wrcb, notifyObject);
notifyObject.wait();
}
- ResultStruct arc = new ResultStruct();
- synchronized(arc) {
- bc.readEntry(1, 6, recb, arc);
+ synchronized (arc) {
+ bc.readEntry(addr, 1, 6, recb, arc);
arc.wait(1000);
- assertEquals(BookieProtocol.ENOENTRY, arc.rc);
+ assertEquals(BKException.Code.NoSuchEntryException, arc.rc);
}
- synchronized(arc) {
- bc.readEntry(1, 7, recb, arc);
+ synchronized (arc) {
+ bc.readEntry(addr, 1, 7, recb, arc);
arc.wait(1000);
assertEquals(0, arc.rc);
assertEquals(7, arc.entry.getInt());
}
- synchronized(arc) {
- bc.readEntry(1, 1, recb, arc);
+ synchronized (arc) {
+ bc.readEntry(addr, 1, 1, recb, arc);
arc.wait(1000);
assertEquals(0, arc.rc);
assertEquals(1, arc.entry.getInt());
}
- synchronized(arc) {
- bc.readEntry(1, 2, recb, arc);
+ synchronized (arc) {
+ bc.readEntry(addr, 1, 2, recb, arc);
arc.wait(1000);
assertEquals(0, arc.rc);
assertEquals(2, arc.entry.getInt());
}
- synchronized(arc) {
- bc.readEntry(1, 3, recb, arc);
+ synchronized (arc) {
+ bc.readEntry(addr, 1, 3, recb, arc);
arc.wait(1000);
assertEquals(0, arc.rc);
assertEquals(3, arc.entry.getInt());
}
- synchronized(arc) {
- bc.readEntry(1, 4, recb, arc);
+ synchronized (arc) {
+ bc.readEntry(addr, 1, 4, recb, arc);
arc.wait(1000);
- assertEquals(BookieProtocol.ENOENTRY, arc.rc);
+ assertEquals(BKException.Code.NoSuchEntryException, arc.rc);
}
- synchronized(arc) {
- bc.readEntry(1, 11, recb, arc);
+ synchronized (arc) {
+ bc.readEntry(addr, 1, 11, recb, arc);
arc.wait(1000);
assertEquals(0, arc.rc);
assertEquals(11, arc.entry.getInt());
}
- synchronized(arc) {
- bc.readEntry(1, 5, recb, arc);
+ synchronized (arc) {
+ bc.readEntry(addr, 1, 5, recb, arc);
arc.wait(1000);
assertEquals(0, arc.rc);
assertEquals(5, arc.entry.getInt());
}
- synchronized(arc) {
- bc.readEntry(1, 10, recb, arc);
+ synchronized (arc) {
+ bc.readEntry(addr, 1, 10, recb, arc);
arc.wait(1000);
- assertEquals(BookieProtocol.ENOENTRY, arc.rc);
+ assertEquals(BKException.Code.NoSuchEntryException, arc.rc);
}
- synchronized(arc) {
- bc.readEntry(1, 12, recb, arc);
+ synchronized (arc) {
+ bc.readEntry(addr, 1, 12, recb, arc);
arc.wait(1000);
- assertEquals(BookieProtocol.ENOENTRY, arc.rc);
+ assertEquals(BKException.Code.NoSuchEntryException, arc.rc);
}
- synchronized(arc) {
- bc.readEntry(1, 13, recb, arc);
+ synchronized (arc) {
+ bc.readEntry(addr, 1, 13, recb, arc);
arc.wait(1000);
- assertEquals(BookieProtocol.ENOENTRY, arc.rc);
+ assertEquals(BKException.Code.NoSuchEntryException, arc.rc);
}
}
- private ByteBuffer createByteBuffer(int i, long lid, long eid) {
+
+ private ChannelBuffer createByteBuffer(int i, long lid, long eid) {
ByteBuffer bb;
- bb = ByteBuffer.allocate(4+16);
- bb.putInt(i);
+ bb = ByteBuffer.allocate(4 + 16);
bb.putLong(lid);
bb.putLong(eid);
+ bb.putInt(i);
bb.flip();
- return bb;
+ return ChannelBuffers.wrappedBuffer(bb);
}
+
@Test
public void testNoLedger() throws Exception {
ResultStruct arc = new ResultStruct();
- BookieClient bc = new BookieClient("127.0.0.1", port, 50000);
- synchronized(arc) {
- bc.readEntry(2, 13, recb, arc);
+ InetSocketAddress addr = new InetSocketAddress("127.0.0.1", port);
+ BookieClient bc = new BookieClient(channelFactory, executor);
+ synchronized (arc) {
+ bc.readEntry(addr, 2, 13, recb, arc);
arc.wait(1000);
- assertEquals(BookieProtocol.ENOLEDGER, arc.rc);
+ assertEquals(BKException.Code.NoSuchEntryException, arc.rc);
}
}
}
Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieFailureTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieFailureTest.java?rev=903483&r1=903482&r2=903483&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieFailureTest.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieFailureTest.java Tue Jan 26 23:16:45 2010
@@ -1,4 +1,5 @@
package org.apache.bookkeeper.test;
+
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -20,190 +21,200 @@
*
*/
-
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
import java.util.ArrayList;
+import java.util.Enumeration;
import java.util.Random;
import java.util.Set;
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
-import org.apache.bookkeeper.client.LedgerHandle.QMode;
-import org.apache.bookkeeper.client.LedgerSequence;
import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.proto.BookieServer;
-import org.apache.bookkeeper.streaming.LedgerInputStream;
-import org.apache.bookkeeper.streaming.LedgerOutputStream;
-
-import org.apache.log4j.ConsoleAppender;
-import org.apache.log4j.Level;
import org.apache.log4j.Logger;
-import org.apache.log4j.PatternLayout;
-import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.server.NIOServerCnxn;
-import org.apache.zookeeper.server.ZooKeeperServer;
-import org.apache.zookeeper.test.ClientBase;
-
+import org.junit.Before;
import org.junit.Test;
/**
- * This test tests read and write, synchronous and
- * asynchronous, strings and integers for a BookKeeper client.
- * The test deployment uses a ZooKeeper server
- * and three BookKeepers.
+ * This test tests read and write, synchronous and asynchronous, strings and
+ * integers for a BookKeeper client. The test deployment uses a ZooKeeper server
+ * and three BookKeepers.
*
*/
-public class BookieFailureTest
- extends junit.framework.TestCase
- implements AddCallback, ReadCallback{
-
+public class BookieFailureTest extends BaseTestCase implements AddCallback, ReadCallback {
- //Depending on the taste, select the amount of logging
+ // Depending on the taste, select the amount of logging
// by decommenting one of the two lines below
- //static Logger LOG = Logger.getRootLogger();
- static Logger LOG = Logger.getLogger(BookieReadWriteTest.class);
-
- static ConsoleAppender ca = new ConsoleAppender(new PatternLayout());
+ // static Logger LOG = Logger.getRootLogger();
+ static Logger LOG = Logger.getLogger(BookieFailureTest.class);
- // ZooKeeper related variables
- private static final String HOSTPORT = "127.0.0.1:2181";
- static Integer ZooKeeperDefaultPort = 2181;
- ZooKeeperServer zks;
- ZooKeeper zkc; //zookeeper client
- NIOServerCnxn.Factory serverFactory;
- File ZkTmpDir;
-
- //BookKeeper
- File tmpDirB1, tmpDirB2, tmpDirB3, tmpDirB4;
- BookieServer bs1, bs2, bs3, bs4;
- Integer initialPort = 5000;
- BookKeeper bkc; // bookkeeper client
byte[] ledgerPassword = "aaa".getBytes();
LedgerHandle lh, lh2;
long ledgerId;
- LedgerSequence ls;
-
- //test related variables
- int numEntriesToWrite = 20000;
+ Enumeration<LedgerEntry> ls;
+
+ // test related variables
+ int numEntriesToWrite = 200;
int maxInt = 2147483647;
- Random rng; // Random Number Generator
+ Random rng; // Random Number Generator
ArrayList<byte[]> entries; // generated entries
ArrayList<Integer> entriesSize;
+ DigestType digestType;
// Synchronization
SyncObj sync;
Set<Object> syncObjs;
-
+
class SyncObj {
int counter;
- boolean value;
+ boolean value;
+
public SyncObj() {
counter = 0;
value = false;
- }
+ }
+ }
+
+ public BookieFailureTest(DigestType digestType) {
+ super(4);
+ this.digestType = digestType;
}
- /**
- * Tests writes and reads when a bookie fails.
- *
- * @throws {@link IOException}
- */
+ /**
+ * Tests writes and reads when a bookie fails.
+ *
+ * @throws {@link IOException}
+ */
@Test
- public void testAsyncBK1() throws IOException{
+ public void testAsyncBK1() throws IOException {
LOG.info("#### BK1 ####");
- auxTestReadWriteAsyncSingleClient(bs1);
+ auxTestReadWriteAsyncSingleClient(bs.get(0));
+ }
+
+ @Test
+ public void testAsyncBK2() throws IOException {
+ LOG.info("#### BK2 ####");
+ auxTestReadWriteAsyncSingleClient(bs.get(1));
+ }
+
+ @Test
+ public void testAsyncBK3() throws IOException {
+ LOG.info("#### BK3 ####");
+ auxTestReadWriteAsyncSingleClient(bs.get(2));
+ }
+
+ @Test
+ public void testAsyncBK4() throws IOException {
+ LOG.info("#### BK4 ####");
+ auxTestReadWriteAsyncSingleClient(bs.get(3));
}
-
- @Test
- public void testAsyncBK2() throws IOException{
- LOG.info("#### BK2 ####");
- auxTestReadWriteAsyncSingleClient(bs2);
- }
-
- @Test
- public void testAsyncBK3() throws IOException{
- LOG.info("#### BK3 ####");
- auxTestReadWriteAsyncSingleClient(bs3);
- }
-
- @Test
- public void testAsyncBK4() throws IOException{
- LOG.info("#### BK4 ####");
- auxTestReadWriteAsyncSingleClient(bs4);
- }
- void auxTestReadWriteAsyncSingleClient(BookieServer bs) throws IOException{
+ @Test
+ public void testBookieRecovery() throws Exception{
+ bkc = new BookKeeper("127.0.0.1");
+
+ //Shutdown all but 1 bookie
+ bs.get(0).shutdown();
+ bs.get(1).shutdown();
+ bs.get(2).shutdown();
+
+ byte[] passwd = "blah".getBytes();
+ LedgerHandle lh = bkc.createLedger(1, 1,digestType, passwd);
+
+ int numEntries = 100;
+ for (int i=0; i< numEntries; i++){
+ byte[] data = (""+i).getBytes();
+ lh.addEntry(data);
+ }
+
+ bs.get(3).shutdown();
+ BookieServer server = new BookieServer(initialPort + 3, tmpDirs.get(3), new File[] { tmpDirs.get(3)});
+ server.start();
+ bs.set(3, server);
+
+ assertEquals(numEntries - 1 , lh.getLastAddConfirmed());
+ Enumeration<LedgerEntry> entries = lh.readEntries(0, lh.getLastAddConfirmed());
+
+ int numScanned = 0;
+ while (entries.hasMoreElements()){
+ assertEquals((""+numScanned), new String(entries.nextElement().getEntry()));
+ numScanned++;
+ }
+ assertEquals(numEntries, numScanned);
+
+
+ }
+
+ void auxTestReadWriteAsyncSingleClient(BookieServer bs) throws IOException {
try {
// Create a BookKeeper client and a ledger
- bkc = new BookKeeper("127.0.0.1");
- lh = bkc.createLedger(4, 2, QMode.VERIFIABLE, ledgerPassword);
-
+ lh = bkc.createLedger(3, 2, digestType, ledgerPassword);
+
ledgerId = lh.getId();
LOG.info("Ledger ID: " + lh.getId());
- for(int i = 0; i < numEntriesToWrite; i++){
+ for (int i = 0; i < numEntriesToWrite; i++) {
ByteBuffer entry = ByteBuffer.allocate(4);
entry.putInt(rng.nextInt(maxInt));
entry.position(0);
-
+
entries.add(entry.array());
entriesSize.add(entry.array().length);
lh.asyncAddEntry(entry.array(), this, sync);
- if(i == 5000){
- //Bookie fail
- bs.shutdown();
- }
+
}
+ LOG.info("Wrote " + numEntriesToWrite + " and now going to fail bookie.");
+ // Bookie fail
+ bs.shutdown();
+
// wait for all entries to be acknowledged
synchronized (sync) {
- while (sync.counter < numEntriesToWrite){
+ while (sync.counter < numEntriesToWrite) {
LOG.debug("Entries counter = " + sync.counter);
sync.wait();
}
}
-
+
LOG.debug("*** WRITE COMPLETE ***");
- // close ledger
+ // close ledger
lh.close();
-
- //*** WRITING PART COMPLETE // READ PART BEGINS ***
-
+
+ // *** WRITING PART COMPLETE // READ PART BEGINS ***
+
// open ledger
+ bkc.halt();
bkc = new BookKeeper("127.0.0.1");
- lh = bkc.openLedger(ledgerId, ledgerPassword);
- LOG.debug("Number of entries written: " + lh.getLast());
- assertTrue("Verifying number of entries written", lh.getLast() == (numEntriesToWrite - 1));
-
- //read entries
-
- lh.asyncReadEntries(0, numEntriesToWrite - 1, this, (Object) sync);
-
+ lh = bkc.openLedger(ledgerId, digestType, ledgerPassword);
+ LOG.debug("Number of entries written: " + (lh.getLastAddConfirmed() + 1));
+ assertTrue("Verifying number of entries written", lh.getLastAddConfirmed() == (numEntriesToWrite - 1));
+
+ // read entries
+
+ lh.asyncReadEntries(0, numEntriesToWrite - 1, this, sync);
+
synchronized (sync) {
- while(sync.value == false){
+ while (sync.value == false) {
sync.wait(10000);
assertTrue("Haven't received entries", sync.value);
- }
+ }
}
-
- assertTrue("Checking number of read entries", ls.size() == numEntriesToWrite);
-
+
LOG.debug("*** READ COMPLETE ***");
-
- // at this point, LedgerSequence ls is filled with the returned values
+
+ // at this point, LedgerSequence ls is filled with the returned
+ // values
int i = 0;
- LOG.info("Size of ledger sequence: " + ls.size());
- while(ls.hasMoreElements()){
+ while (ls.hasMoreElements()) {
ByteBuffer origbb = ByteBuffer.wrap(entries.get(i));
Integer origEntry = origbb.getInt();
byte[] entry = ls.nextElement().getEntry();
@@ -215,24 +226,26 @@
assertTrue("Checking entry " + i + " for size", entry.length == entriesSize.get(i).intValue());
i++;
}
-
+
+ assertTrue("Checking number of read entries", i == numEntriesToWrite);
+
LOG.info("Verified that entries are ok, and now closing ledger");
lh.close();
} catch (KeeperException e) {
+ LOG.error("Caught KeeperException", e);
fail(e.toString());
} catch (BKException e) {
+ LOG.error("Caught BKException", e);
fail(e.toString());
} catch (InterruptedException e) {
+ LOG.error("Caught InterruptedException", e);
fail(e.toString());
- }
-
+ }
+
}
-
- public void addComplete(int rc,
- LedgerHandle lh,
- long entryId,
- Object ctx) {
- if(rc != 0)
+
+ public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
+ if (rc != 0)
fail("Failed to write entry: " + entryId);
SyncObj x = (SyncObj) ctx;
synchronized (x) {
@@ -241,146 +254,52 @@
}
}
- public void readComplete(int rc,
- LedgerHandle lh,
- LedgerSequence seq,
- Object ctx) {
- if(rc != 0)
+ public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx) {
+ if (rc != 0)
fail("Failed to write entry");
- ls = seq;
+ ls = seq;
synchronized (sync) {
sync.value = true;
sync.notify();
}
-
+
}
-
- protected void setUp() throws IOException, InterruptedException {
- LOG.addAppender(ca);
- LOG.setLevel((Level) Level.DEBUG);
-
- // create a ZooKeeper server(dataDir, dataLogDir, port)
- LOG.debug("Running ZK server (setup)");
- //ServerStats.registerAsConcrete();
- ClientBase.setupTestEnv();
- ZkTmpDir = File.createTempFile("zookeeper", "test");
- ZkTmpDir.delete();
- ZkTmpDir.mkdir();
-
- try {
- zks = new ZooKeeperServer(ZkTmpDir, ZkTmpDir, ZooKeeperDefaultPort);
- serverFactory = new NIOServerCnxn.Factory(ZooKeeperDefaultPort);
- serverFactory.startup(zks);
- } catch (IOException e1) {
- // TODO Auto-generated catch block
- e1.printStackTrace();
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- boolean b = ClientBase.waitForServerUp(HOSTPORT, ClientBase.CONNECTION_TIMEOUT);
-
- LOG.debug("Server up: " + b);
-
- // create a zookeeper client
- LOG.debug("Instantiate ZK Client");
- zkc = new ZooKeeper("127.0.0.1", ZooKeeperDefaultPort, new emptyWatcher());
-
- //initialize the zk client with values
- try {
- zkc.create("/ledgers", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- zkc.create("/ledgers/available", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- zkc.create("/ledgers/available/127.0.0.1:" + Integer.toString(initialPort), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- zkc.create("/ledgers/available/127.0.0.1:" + Integer.toString(initialPort + 1), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- zkc.create("/ledgers/available/127.0.0.1:" + Integer.toString(initialPort + 2), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- zkc.create("/ledgers/available/127.0.0.1:" + Integer.toString(initialPort + 3), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- } catch (KeeperException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
-
- // Create Bookie Servers (B1, B2, B3)
- tmpDirB1 = File.createTempFile("bookie1", "test");
- tmpDirB1.delete();
- tmpDirB1.mkdir();
-
- bs1 = new BookieServer(initialPort, tmpDirB1, new File[]{tmpDirB1});
- bs1.start();
-
- tmpDirB2 = File.createTempFile("bookie2", "test");
- tmpDirB2.delete();
- tmpDirB2.mkdir();
-
- bs2 = new BookieServer(initialPort + 1, tmpDirB2, new File[]{tmpDirB2});
- bs2.start();
- tmpDirB3 = File.createTempFile("bookie3", "test");
- tmpDirB3.delete();
- tmpDirB3.mkdir();
-
- bs3 = new BookieServer(initialPort + 2, tmpDirB3, new File[]{tmpDirB3});
- bs3.start();
-
- tmpDirB4 = File.createTempFile("bookie4", "test");
- tmpDirB4.delete();
- tmpDirB4.mkdir();
-
- bs4 = new BookieServer(initialPort + 3, tmpDirB4, new File[]{tmpDirB4});
- bs4.start();
-
- rng = new Random(System.currentTimeMillis()); // Initialize the Random Number Generator
- entries = new ArrayList<byte[]>(); // initialize the entries list
- entriesSize = new ArrayList<Integer>();
+ @Before
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+
+ rng = new Random(System.currentTimeMillis()); // Initialize the Random
+ // Number Generator
+ entries = new ArrayList<byte[]>(); // initialize the entries list
+ entriesSize = new ArrayList<Integer>();
sync = new SyncObj(); // initialize the synchronization data structure
-
+
zkc.close();
}
-
- protected void tearDown() throws InterruptedException {
- LOG.info("TearDown");
- bkc.halt();
-
- //shutdown bookie servers
- if(!bs1.isDown()) bs1.shutdown();
- if(!bs2.isDown()) bs2.shutdown();
- if(!bs3.isDown()) bs3.shutdown();
- if(!bs4.isDown()) bs4.shutdown();
-
- cleanUpDir(tmpDirB1);
- cleanUpDir(tmpDirB2);
- cleanUpDir(tmpDirB3);
- cleanUpDir(tmpDirB4);
- //shutdown ZK server
- serverFactory.shutdown();
- assertTrue("waiting for server down",
- ClientBase.waitForServerDown(HOSTPORT,
- ClientBase.CONNECTION_TIMEOUT));
- //ServerStats.unregister();
- cleanUpDir(ZkTmpDir);
-
- }
- /* Clean up a directory recursively */
- protected boolean cleanUpDir(File dir){
+
+ /* Clean up a directory recursively */
+ @Override
+ protected boolean cleanUpDir(File dir) {
if (dir.isDirectory()) {
LOG.info("Cleaning up " + dir.getName());
String[] children = dir.list();
for (String string : children) {
boolean success = cleanUpDir(new File(dir, string));
- if (!success) return false;
+ if (!success)
+ return false;
}
}
// The directory is now empty so delete it
- return dir.delete();
+ return dir.delete();
}
- /* User for testing purposes, void */
- class emptyWatcher implements Watcher{
- public void process(WatchedEvent event) {}
+ /* User for testing purposes, void */
+ class emptyWatcher implements Watcher {
+ public void process(WatchedEvent event) {
+ }
}
-
}
\ No newline at end of file
Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieReadWriteTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieReadWriteTest.java?rev=903483&r1=903482&r2=903483&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieReadWriteTest.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieReadWriteTest.java Tue Jan 26 23:16:45 2010
@@ -1,4 +1,5 @@
package org.apache.bookkeeper.test;
+
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -20,157 +21,132 @@
*
*/
-
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.ArrayList;
+import java.util.Enumeration;
import java.util.Random;
import java.util.Set;
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
-import org.apache.bookkeeper.client.LedgerSequence;
import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
-import org.apache.bookkeeper.proto.BookieServer;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.streaming.LedgerInputStream;
import org.apache.bookkeeper.streaming.LedgerOutputStream;
-
-import org.apache.log4j.ConsoleAppender;
-import org.apache.log4j.Level;
import org.apache.log4j.Logger;
-import org.apache.log4j.PatternLayout;
-import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.server.NIOServerCnxn;
-import org.apache.zookeeper.server.ZooKeeperServer;
-import org.apache.zookeeper.test.ClientBase;
-
+import org.junit.Before;
import org.junit.Test;
/**
- * This test tests read and write, synchronous and
- * asynchronous, strings and integers for a BookKeeper client.
- * The test deployment uses a ZooKeeper server
- * and three BookKeepers.
+ * This test tests read and write, synchronous and asynchronous, strings and
+ * integers for a BookKeeper client. The test deployment uses a ZooKeeper server
+ * and three BookKeepers.
*
*/
-public class BookieReadWriteTest
- extends junit.framework.TestCase
- implements AddCallback, ReadCallback{
+public class BookieReadWriteTest extends BaseTestCase implements AddCallback, ReadCallback {
- //Depending on the taste, select the amount of logging
+ // Depending on the taste, select the amount of logging
// by decommenting one of the two lines below
- //static Logger LOG = Logger.getRootLogger();
+ // static Logger LOG = Logger.getRootLogger();
static Logger LOG = Logger.getLogger(BookieReadWriteTest.class);
- static ConsoleAppender ca = new ConsoleAppender(new PatternLayout());
-
- // ZooKeeper related variables
- private static final String HOSTPORT = "127.0.0.1:2181";
- static Integer ZooKeeperDefaultPort = 2181;
- ZooKeeperServer zks;
- ZooKeeper zkc; //zookeeper client
- NIOServerCnxn.Factory serverFactory;
- File ZkTmpDir;
-
- //BookKeeper
- File tmpDirB1, tmpDirB2, tmpDirB3;
- BookieServer bs1, bs2, bs3;
- Integer initialPort = 5000;
- BookKeeper bkc; // bookkeeper client
byte[] ledgerPassword = "aaa".getBytes();
LedgerHandle lh, lh2;
long ledgerId;
- LedgerSequence ls;
-
- //test related variables
+ Enumeration<LedgerEntry> ls;
+
+ // test related variables
int numEntriesToWrite = 200;
int maxInt = 2147483647;
- Random rng; // Random Number Generator
+ Random rng; // Random Number Generator
ArrayList<byte[]> entries; // generated entries
ArrayList<Integer> entriesSize;
+ DigestType digestType;
+
+ public BookieReadWriteTest(DigestType digestType){
+ super(3);
+ this.digestType = digestType;
+ }
// Synchronization
SyncObj sync;
Set<Object> syncObjs;
-
+
class SyncObj {
int counter;
- boolean value;
+ boolean value;
+
public SyncObj() {
counter = 0;
value = false;
- }
+ }
}
-
+
@Test
- public void testOpenException()
- throws KeeperException, IOException, InterruptedException {
- bkc = new BookKeeper("127.0.0.1");
- try{
- lh = bkc.openLedger(0, ledgerPassword);
+ public void testOpenException() throws KeeperException, IOException, InterruptedException {
+ try {
+ lh = bkc.openLedger(0, digestType, ledgerPassword);
fail("Haven't thrown exception");
} catch (BKException e) {
LOG.warn("Successfully thrown and caught exception:", e);
}
}
-
+
/**
- * test the streaming api for reading
- * and writing
+ * test the streaming api for reading and writing
+ *
* @throws {@link IOException}, {@link KeeperException}
*/
@Test
- public void testStreamingClients() throws IOException,
- KeeperException, BKException, InterruptedException {
+ public void testStreamingClients() throws IOException, KeeperException, BKException, InterruptedException {
bkc = new BookKeeper("127.0.0.1");
- lh = bkc.createLedger(ledgerPassword);
- //write a string so that we cna
+ lh = bkc.createLedger(digestType, ledgerPassword);
+ // write a string so that we cna
// create a buffer of a single bytes
// and check for corner cases
- String toWrite = "we need to check for this string to match " +
- "and for the record mahadev is the best";
- LedgerOutputStream lout = new LedgerOutputStream(lh , 1);
+ String toWrite = "we need to check for this string to match " + "and for the record mahadev is the best";
+ LedgerOutputStream lout = new LedgerOutputStream(lh, 1);
byte[] b = toWrite.getBytes();
lout.write(b);
lout.close();
long lId = lh.getId();
lh.close();
- //check for sanity
- lh = bkc.openLedger(lId, ledgerPassword);
- LedgerInputStream lin = new LedgerInputStream(lh, 1);
+ // check for sanity
+ lh = bkc.openLedger(lId, digestType, ledgerPassword);
+ LedgerInputStream lin = new LedgerInputStream(lh, 1);
byte[] bread = new byte[b.length];
int read = 0;
- while (read < b.length) {
+ while (read < b.length) {
read = read + lin.read(bread, read, b.length);
}
-
+
String newString = new String(bread);
assertTrue("these two should same", toWrite.equals(newString));
lin.close();
lh.close();
- //create another ledger to write one byte at a time
- lh = bkc.createLedger(ledgerPassword);
+ // create another ledger to write one byte at a time
+ lh = bkc.createLedger(digestType, ledgerPassword);
lout = new LedgerOutputStream(lh);
- for (int i=0; i < b.length;i++) {
+ for (int i = 0; i < b.length; i++) {
lout.write(b[i]);
}
lout.close();
lId = lh.getId();
lh.close();
- lh = bkc.openLedger(lId, ledgerPassword);
+ lh = bkc.openLedger(lId, digestType, ledgerPassword);
lin = new LedgerInputStream(lh);
bread = new byte[b.length];
- read= 0;
+ read = 0;
while (read < b.length) {
read = read + lin.read(bread, read, b.length);
}
@@ -179,62 +155,60 @@
lin.close();
lh.close();
}
-
-
+
@Test
- public void testReadWriteAsyncSingleClient() throws IOException{
+ public void testReadWriteAsyncSingleClient() throws IOException {
try {
// Create a BookKeeper client and a ledger
bkc = new BookKeeper("127.0.0.1");
- lh = bkc.createLedger(ledgerPassword);
- //bkc.initMessageDigest("SHA1");
+ lh = bkc.createLedger(digestType, ledgerPassword);
+ // bkc.initMessageDigest("SHA1");
ledgerId = lh.getId();
LOG.info("Ledger ID: " + lh.getId());
- for(int i = 0; i < numEntriesToWrite; i++){
+ for (int i = 0; i < numEntriesToWrite; i++) {
ByteBuffer entry = ByteBuffer.allocate(4);
entry.putInt(rng.nextInt(maxInt));
entry.position(0);
-
+
entries.add(entry.array());
entriesSize.add(entry.array().length);
lh.asyncAddEntry(entry.array(), this, sync);
}
-
+
// wait for all entries to be acknowledged
synchronized (sync) {
- while (sync.counter < numEntriesToWrite){
+ while (sync.counter < numEntriesToWrite) {
LOG.debug("Entries counter = " + sync.counter);
sync.wait();
}
}
-
+
LOG.debug("*** WRITE COMPLETE ***");
- // close ledger
+ // close ledger
lh.close();
-
- //*** WRITING PART COMPLETE // READ PART BEGINS ***
-
+
+ // *** WRITING PART COMPLETE // READ PART BEGINS ***
+
// open ledger
- lh = bkc.openLedger(ledgerId, ledgerPassword);
- LOG.debug("Number of entries written: " + lh.getLast());
- assertTrue("Verifying number of entries written", lh.getLast() == (numEntriesToWrite - 1));
-
- //read entries
+ lh = bkc.openLedger(ledgerId, digestType, ledgerPassword);
+ LOG.debug("Number of entries written: " + (lh.getLastAddConfirmed() + 1));
+ assertTrue("Verifying number of entries written", lh.getLastAddConfirmed() == (numEntriesToWrite - 1));
+
+ // read entries
lh.asyncReadEntries(0, numEntriesToWrite - 1, this, (Object) sync);
-
+
synchronized (sync) {
- while(sync.value == false){
+ while (sync.value == false) {
sync.wait();
- }
+ }
}
-
- assertTrue("Checking number of read entries", ls.size() == numEntriesToWrite);
-
+
LOG.debug("*** READ COMPLETE ***");
-
- // at this point, LedgerSequence ls is filled with the returned values
+
+ // at this point, LedgerSequence ls is filled with the returned
+ // values
int i = 0;
- while(ls.hasMoreElements()){
+ while (ls.hasMoreElements()) {
ByteBuffer origbb = ByteBuffer.wrap(entries.get(i));
Integer origEntry = origbb.getInt();
byte[] entry = ls.nextElement().getEntry();
@@ -248,6 +222,8 @@
assertTrue("Checking entry " + i + " for size", entry.length == entriesSize.get(i).intValue());
i++;
}
+ assertTrue("Checking number of read entries", i == numEntriesToWrite);
+
lh.close();
} catch (KeeperException e) {
LOG.error("Test failed", e);
@@ -258,71 +234,72 @@
} catch (InterruptedException e) {
LOG.error("Test failed", e);
fail("Test failed due to interruption");
- }
+ }
}
-
+
@Test
- public void testSyncReadAsyncWriteStringsSingleClient() throws IOException{
+ public void testSyncReadAsyncWriteStringsSingleClient() throws IOException {
LOG.info("TEST READ WRITE STRINGS MIXED SINGLE CLIENT");
String charset = "utf-8";
- LOG.debug("Default charset: " + Charset.defaultCharset());
+ LOG.debug("Default charset: " + Charset.defaultCharset());
try {
// Create a BookKeeper client and a ledger
bkc = new BookKeeper("127.0.0.1");
- lh = bkc.createLedger(ledgerPassword);
- //bkc.initMessageDigest("SHA1");
+ lh = bkc.createLedger(digestType, ledgerPassword);
+ // bkc.initMessageDigest("SHA1");
ledgerId = lh.getId();
LOG.info("Ledger ID: " + lh.getId());
- for(int i = 0; i < numEntriesToWrite; i++){
+ for (int i = 0; i < numEntriesToWrite; i++) {
int randomInt = rng.nextInt(maxInt);
byte[] entry = new String(Integer.toString(randomInt)).getBytes(charset);
entries.add(entry);
lh.asyncAddEntry(entry, this, sync);
}
-
+
// wait for all entries to be acknowledged
synchronized (sync) {
- while (sync.counter < numEntriesToWrite){
+ while (sync.counter < numEntriesToWrite) {
LOG.debug("Entries counter = " + sync.counter);
sync.wait();
}
}
-
+
LOG.debug("*** ASYNC WRITE COMPLETE ***");
- // close ledger
+ // close ledger
lh.close();
-
- //*** WRITING PART COMPLETED // READ PART BEGINS ***
-
+
+ // *** WRITING PART COMPLETED // READ PART BEGINS ***
+
// open ledger
- lh = bkc.openLedger(ledgerId, ledgerPassword);
- LOG.debug("Number of entries written: " + lh.getLast());
- assertTrue("Verifying number of entries written", lh.getLast() == (numEntriesToWrite - 1));
-
- //read entries
+ lh = bkc.openLedger(ledgerId, digestType, ledgerPassword);
+ LOG.debug("Number of entries written: " + (lh.getLastAddConfirmed() + 1));
+ assertTrue("Verifying number of entries written", lh.getLastAddConfirmed() == (numEntriesToWrite - 1));
+
+ // read entries
ls = lh.readEntries(0, numEntriesToWrite - 1);
-
- assertTrue("Checking number of read entries", ls.size() == numEntriesToWrite);
-
+
LOG.debug("*** SYNC READ COMPLETE ***");
-
- // at this point, LedgerSequence ls is filled with the returned values
+
+ // at this point, LedgerSequence ls is filled with the returned
+ // values
int i = 0;
- while(ls.hasMoreElements()){
+ while (ls.hasMoreElements()) {
byte[] origEntryBytes = entries.get(i++);
byte[] retrEntryBytes = ls.nextElement().getEntry();
-
+
LOG.debug("Original byte entry size: " + origEntryBytes.length);
LOG.debug("Saved byte entry size: " + retrEntryBytes.length);
-
+
String origEntry = new String(origEntryBytes, charset);
String retrEntry = new String(retrEntryBytes, charset);
-
+
LOG.debug("Original entry: " + origEntry);
LOG.debug("Retrieved entry: " + retrEntry);
-
+
assertTrue("Checking entry " + i + " for equality", origEntry.equals(retrEntry));
}
+ assertTrue("Checking number of read entries", i == numEntriesToWrite);
+
lh.close();
} catch (KeeperException e) {
LOG.error("Test failed", e);
@@ -333,34 +310,34 @@
} catch (InterruptedException e) {
LOG.error("Test failed", e);
fail("Test failed due to interruption");
- }
-
+ }
+
}
-
+
@Test
public void testReadWriteSyncSingleClient() throws IOException {
try {
// Create a BookKeeper client and a ledger
bkc = new BookKeeper("127.0.0.1");
- lh = bkc.createLedger(ledgerPassword);
- //bkc.initMessageDigest("SHA1");
+ lh = bkc.createLedger(digestType, ledgerPassword);
+ // bkc.initMessageDigest("SHA1");
ledgerId = lh.getId();
LOG.info("Ledger ID: " + lh.getId());
- for(int i = 0; i < numEntriesToWrite; i++){
+ for (int i = 0; i < numEntriesToWrite; i++) {
ByteBuffer entry = ByteBuffer.allocate(4);
entry.putInt(rng.nextInt(maxInt));
entry.position(0);
- entries.add(entry.array());
+ entries.add(entry.array());
lh.addEntry(entry.array());
}
lh.close();
- lh = bkc.openLedger(ledgerId, ledgerPassword);
- LOG.debug("Number of entries written: " + lh.getLast());
- assertTrue("Verifying number of entries written", lh.getLast() == (numEntriesToWrite - 1));
-
+ lh = bkc.openLedger(ledgerId, digestType, ledgerPassword);
+ LOG.debug("Number of entries written: " + lh.getLastAddConfirmed());
+ assertTrue("Verifying number of entries written", lh.getLastAddConfirmed() == (numEntriesToWrite - 1));
+
ls = lh.readEntries(0, numEntriesToWrite - 1);
int i = 0;
- while(ls.hasMoreElements()){
+ while (ls.hasMoreElements()) {
ByteBuffer origbb = ByteBuffer.wrap(entries.get(i++));
Integer origEntry = origbb.getInt();
ByteBuffer result = ByteBuffer.wrap(ls.nextElement().getEntry());
@@ -381,42 +358,42 @@
} catch (InterruptedException e) {
LOG.error("Test failed", e);
fail("Test failed due to interruption");
- }
+ }
}
-
+
@Test
public void testReadWriteZero() throws IOException {
try {
// Create a BookKeeper client and a ledger
bkc = new BookKeeper("127.0.0.1");
- lh = bkc.createLedger(ledgerPassword);
- //bkc.initMessageDigest("SHA1");
+ lh = bkc.createLedger(digestType, ledgerPassword);
+ // bkc.initMessageDigest("SHA1");
ledgerId = lh.getId();
LOG.info("Ledger ID: " + lh.getId());
- for(int i = 0; i < numEntriesToWrite; i++){
- lh.addEntry(new byte[0]);
+ for (int i = 0; i < numEntriesToWrite; i++) {
+ lh.addEntry(new byte[0]);
}
-
+
/*
* Write a non-zero entry
*/
ByteBuffer entry = ByteBuffer.allocate(4);
entry.putInt(rng.nextInt(maxInt));
entry.position(0);
- entries.add(entry.array());
- lh.addEntry( entry.array());
-
+ entries.add(entry.array());
+ lh.addEntry(entry.array());
+
lh.close();
- lh = bkc.openLedger(ledgerId, ledgerPassword);
- LOG.debug("Number of entries written: " + lh.getLast());
- assertTrue("Verifying number of entries written", lh.getLast() == numEntriesToWrite);
-
+ lh = bkc.openLedger(ledgerId, digestType, ledgerPassword);
+ LOG.debug("Number of entries written: " + lh.getLastAddConfirmed());
+ assertTrue("Verifying number of entries written", lh.getLastAddConfirmed() == numEntriesToWrite);
+
ls = lh.readEntries(0, numEntriesToWrite - 1);
int i = 0;
- while(ls.hasMoreElements()){
+ while (ls.hasMoreElements()) {
ByteBuffer result = ByteBuffer.wrap(ls.nextElement().getEntry());
LOG.debug("Length of result: " + result.capacity());
-
+
assertTrue("Checking if entry " + i + " has zero bytes", result.capacity() == 0);
}
lh.close();
@@ -429,52 +406,54 @@
} catch (InterruptedException e) {
LOG.error("Test failed", e);
fail("Test failed due to interruption");
- }
+ }
}
-
+
@Test
public void testMultiLedger() throws IOException {
try {
// Create a BookKeeper client and a ledger
bkc = new BookKeeper("127.0.0.1");
- lh = bkc.createLedger(ledgerPassword);
- lh2 = bkc.createLedger(ledgerPassword);
-
+ lh = bkc.createLedger(digestType, ledgerPassword);
+ lh2 = bkc.createLedger(digestType, ledgerPassword);
+
long ledgerId = lh.getId();
long ledgerId2 = lh2.getId();
-
- //bkc.initMessageDigest("SHA1");
+
+ // bkc.initMessageDigest("SHA1");
LOG.info("Ledger ID 1: " + lh.getId() + ", Ledger ID 2: " + lh2.getId());
- for(int i = 0; i < numEntriesToWrite; i++){
- lh.addEntry( new byte[0]);
+ for (int i = 0; i < numEntriesToWrite; i++) {
+ lh.addEntry(new byte[0]);
lh2.addEntry(new byte[0]);
}
-
+
lh.close();
lh2.close();
-
- lh = bkc.openLedger(ledgerId, ledgerPassword);
- lh2 = bkc.openLedger(ledgerId2, ledgerPassword);
-
- LOG.debug("Number of entries written: " + lh.getLast() + ", " + lh2.getLast());
- assertTrue("Verifying number of entries written lh (" + lh.getLast() + ")" , lh.getLast() == (numEntriesToWrite - 1));
- assertTrue("Verifying number of entries written lh2 (" + lh2.getLast() + ")", lh2.getLast() == (numEntriesToWrite - 1));
-
+
+ lh = bkc.openLedger(ledgerId, digestType, ledgerPassword);
+ lh2 = bkc.openLedger(ledgerId2, digestType, ledgerPassword);
+
+ LOG.debug("Number of entries written: " + lh.getLastAddConfirmed() + ", " + lh2.getLastAddConfirmed());
+ assertTrue("Verifying number of entries written lh (" + lh.getLastAddConfirmed() + ")", lh
+ .getLastAddConfirmed() == (numEntriesToWrite - 1));
+ assertTrue("Verifying number of entries written lh2 (" + lh2.getLastAddConfirmed() + ")", lh2
+ .getLastAddConfirmed() == (numEntriesToWrite - 1));
+
ls = lh.readEntries(0, numEntriesToWrite - 1);
int i = 0;
- while(ls.hasMoreElements()){
+ while (ls.hasMoreElements()) {
ByteBuffer result = ByteBuffer.wrap(ls.nextElement().getEntry());
LOG.debug("Length of result: " + result.capacity());
-
+
assertTrue("Checking if entry " + i + " has zero bytes", result.capacity() == 0);
}
lh.close();
- ls = lh2.readEntries( 0, numEntriesToWrite - 1);
+ ls = lh2.readEntries(0, numEntriesToWrite - 1);
i = 0;
- while(ls.hasMoreElements()){
+ while (ls.hasMoreElements()) {
ByteBuffer result = ByteBuffer.wrap(ls.nextElement().getEntry());
LOG.debug("Length of result: " + result.capacity());
-
+
assertTrue("Checking if entry " + i + " has zero bytes", result.capacity() == 0);
}
lh2.close();
@@ -487,14 +466,10 @@
} catch (InterruptedException e) {
LOG.error("Test failed", e);
fail("Test failed due to interruption");
- }
+ }
}
-
-
- public void addComplete(int rc,
- LedgerHandle lh,
- long entryId,
- Object ctx) {
+
+ public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
SyncObj x = (SyncObj) ctx;
synchronized (x) {
x.counter++;
@@ -502,135 +477,45 @@
}
}
- public void readComplete(int rc,
- LedgerHandle lh,
- LedgerSequence seq,
- Object ctx) {
- ls = seq;
+ public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx) {
+ ls = seq;
synchronized (sync) {
sync.value = true;
sync.notify();
}
-
- }
-
- protected void setUp() throws IOException, InterruptedException {
- LOG.addAppender(ca);
- LOG.setLevel((Level) Level.DEBUG);
-
- // create a ZooKeeper server(dataDir, dataLogDir, port)
- LOG.debug("Running ZK server");
- //ServerStats.registerAsConcrete();
- ClientBase.setupTestEnv();
- ZkTmpDir = File.createTempFile("zookeeper", "test");
- ZkTmpDir.delete();
- ZkTmpDir.mkdir();
-
- try {
- zks = new ZooKeeperServer(ZkTmpDir, ZkTmpDir, ZooKeeperDefaultPort);
- serverFactory = new NIOServerCnxn.Factory(ZooKeeperDefaultPort);
- serverFactory.startup(zks);
- } catch (IOException e1) {
- // TODO Auto-generated catch block
- e1.printStackTrace();
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- boolean b = ClientBase.waitForServerUp(HOSTPORT, ClientBase.CONNECTION_TIMEOUT);
-
- LOG.debug("Server up: " + b);
-
- // create a zookeeper client
- LOG.debug("Instantiate ZK Client");
- zkc = new ZooKeeper("127.0.0.1", ZooKeeperDefaultPort, new emptyWatcher());
-
- //initialize the zk client with values
- try {
- zkc.create("/ledgers", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- zkc.create("/ledgers/available", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- zkc.create("/ledgers/available/127.0.0.1:" + Integer.toString(initialPort), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- zkc.create("/ledgers/available/127.0.0.1:" + Integer.toString(initialPort + 1), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- zkc.create("/ledgers/available/127.0.0.1:" + Integer.toString(initialPort + 2), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- } catch (KeeperException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
-
- // Create Bookie Servers (B1, B2, B3)
- tmpDirB1 = File.createTempFile("bookie1", "test");
- tmpDirB1.delete();
- tmpDirB1.mkdir();
-
- bs1 = new BookieServer(initialPort, tmpDirB1, new File[]{tmpDirB1});
- bs1.start();
-
- tmpDirB2 = File.createTempFile("bookie2", "test");
- tmpDirB2.delete();
- tmpDirB2.mkdir();
-
- bs2 = new BookieServer(initialPort + 1, tmpDirB2, new File[]{tmpDirB2});
- bs2.start();
-
- tmpDirB3 = File.createTempFile("bookie3", "test");
- tmpDirB3.delete();
- tmpDirB3.mkdir();
-
- bs3 = new BookieServer(initialPort + 2, tmpDirB3, new File[]{tmpDirB3});
- bs3.start();
-
- rng = new Random(System.currentTimeMillis()); // Initialize the Random Number Generator
- entries = new ArrayList<byte[]>(); // initialize the entries list
- entriesSize = new ArrayList<Integer>();
- sync = new SyncObj(); // initialize the synchronization data structure
- zkc.close();
+
}
-
- protected void tearDown(){
- LOG.info("TearDown");
- //shutdown bookie servers
- try {
- bs1.shutdown();
- bs2.shutdown();
- bs3.shutdown();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- cleanUpDir(tmpDirB1);
- cleanUpDir(tmpDirB2);
- cleanUpDir(tmpDirB3);
-
- //shutdown ZK server
- serverFactory.shutdown();
- assertTrue("waiting for server down",
- ClientBase.waitForServerDown(HOSTPORT,
- ClientBase.CONNECTION_TIMEOUT));
- //ServerStats.unregister();
- cleanUpDir(ZkTmpDir);
+ @Before
+ public void setUp() throws Exception{
+ super.setUp();
+ rng = new Random(System.currentTimeMillis()); // Initialize the Random
+ // Number Generator
+ entries = new ArrayList<byte[]>(); // initialize the entries list
+ entriesSize = new ArrayList<Integer>();
+ sync = new SyncObj(); // initialize the synchronization data structure
}
- /* Clean up a directory recursively */
- protected boolean cleanUpDir(File dir){
+ /* Clean up a directory recursively */
+ protected boolean cleanUpDir(File dir) {
if (dir.isDirectory()) {
LOG.info("Cleaning up " + dir.getName());
String[] children = dir.list();
for (String string : children) {
boolean success = cleanUpDir(new File(dir, string));
- if (!success) return false;
+ if (!success)
+ return false;
}
}
// The directory is now empty so delete it
- return dir.delete();
+ return dir.delete();
}
- /* User for testing purposes, void */
- class emptyWatcher implements Watcher{
- public void process(WatchedEvent event) {}
+ /* User for testing purposes, void */
+ class emptyWatcher implements Watcher {
+ public void process(WatchedEvent event) {
+ }
}
}