You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ma...@apache.org on 2010/01/27 00:16:49 UTC
svn commit: r903483 [2/6] - in /hadoop/zookeeper/trunk: ./
src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/
src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/
src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/ src/cont...
Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BKException.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BKException.java?rev=903483&r1=903482&r2=903483&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BKException.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BKException.java Tue Jan 26 23:16:45 2010
@@ -1,4 +1,5 @@
package org.apache.bookkeeper.client;
+
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -20,11 +21,10 @@
*
*/
-
import java.lang.Exception;
/**
- * Implements BookKeeper exceptions.
+ * Class the enumerates all the possible error conditions
*
*/
@@ -32,12 +32,18 @@
public abstract class BKException extends Exception {
private int code;
- public BKException(int code){
+
+ BKException(int code) {
this.code = code;
}
-
- public static BKException create(int code){
- switch(code){
+
+ /**
+ * Create an exception from an error code
+ * @param code return error code
+ * @return correponding exception
+ */
+ public static BKException create(int code) {
+ switch (code) {
case Code.ReadException:
return new BKReadException();
case Code.QuorumException:
@@ -54,11 +60,25 @@
return new BKNoSuchLedgerExistsException();
case Code.BookieHandleNotAvailableException:
return new BKBookieHandleNotAvailableException();
+ case Code.ZKException:
+ return new ZKException();
+ case Code.LedgerRecoveryException:
+ return new BKLedgerRecoveryException();
+ case Code.LedgerClosedException:
+ return new BKLedgerClosedException();
+ case Code.WriteException:
+ return new BKWriteException();
+ case Code.NoSuchEntryException:
+ return new BKNoSuchEntryException();
default:
return new BKIllegalOpException();
}
}
-
+
+ /**
+ * List of return codes
+ *
+ */
public interface Code {
int OK = 0;
int ReadException = -1;
@@ -69,20 +89,25 @@
int NotEnoughBookiesException = -6;
int NoSuchLedgerExistsException = -7;
int BookieHandleNotAvailableException = -8;
-
+ int ZKException = -9;
+ int LedgerRecoveryException = -10;
+ int LedgerClosedException = -11;
+ int WriteException = -12;
+ int NoSuchEntryException = -13;
+
int IllegalOpException = -100;
}
-
- public void setCode(int code){
+
+ public void setCode(int code) {
this.code = code;
}
-
- public int getCode(){
+
+ public int getCode() {
return this.code;
}
-
- public String getMessage(int code){
- switch(code){
+
+ public static String getMessage(int code) {
+ switch (code) {
case Code.OK:
return "No problem";
case Code.ReadException:
@@ -101,63 +126,102 @@
return "No such ledger exists";
case Code.BookieHandleNotAvailableException:
return "Bookie handle is not available";
+ case Code.ZKException:
+ return "Error while using ZooKeeper";
+ case Code.LedgerRecoveryException:
+ return "Error while recovering ledger";
+ case Code.LedgerClosedException:
+ return "Attempt to write to a closed ledger";
+ case Code.WriteException:
+ return "Write failed on bookie";
+ case Code.NoSuchEntryException:
+ return "No such entry";
default:
return "Invalid operation";
}
}
-
+
public static class BKReadException extends BKException {
- public BKReadException(){
+ public BKReadException() {
super(Code.ReadException);
- }
+ }
}
-
+
+ public static class BKNoSuchEntryException extends BKException {
+ public BKNoSuchEntryException() {
+ super(Code.NoSuchEntryException);
+ }
+ }
+
public static class BKQuorumException extends BKException {
- public BKQuorumException(){
+ public BKQuorumException() {
super(Code.QuorumException);
- }
+ }
}
-
+
public static class BKBookieException extends BKException {
- public BKBookieException(){
+ public BKBookieException() {
super(Code.NoBookieAvailableException);
- }
+ }
}
-
+
public static class BKDigestNotInitializedException extends BKException {
- public BKDigestNotInitializedException(){
+ public BKDigestNotInitializedException() {
super(Code.DigestNotInitializedException);
- }
+ }
}
-
+
public static class BKDigestMatchException extends BKException {
- public BKDigestMatchException(){
+ public BKDigestMatchException() {
super(Code.DigestMatchException);
- }
+ }
}
-
+
public static class BKIllegalOpException extends BKException {
- public BKIllegalOpException(){
+ public BKIllegalOpException() {
super(Code.IllegalOpException);
- }
+ }
}
-
+
public static class BKNotEnoughBookiesException extends BKException {
- public BKNotEnoughBookiesException(){
+ public BKNotEnoughBookiesException() {
super(Code.NotEnoughBookiesException);
}
}
+ public static class BKWriteException extends BKException {
+ public BKWriteException() {
+ super(Code.WriteException);
+ }
+ }
+
public static class BKNoSuchLedgerExistsException extends BKException {
- public BKNoSuchLedgerExistsException(){
+ public BKNoSuchLedgerExistsException() {
super(Code.NoSuchLedgerExistsException);
- }
+ }
}
-
+
public static class BKBookieHandleNotAvailableException extends BKException {
- public BKBookieHandleNotAvailableException(){
+ public BKBookieHandleNotAvailableException() {
super(Code.BookieHandleNotAvailableException);
- }
+ }
+ }
+
+ public static class ZKException extends BKException {
+ public ZKException() {
+ super(Code.ZKException);
+ }
+ }
+
+ public static class BKLedgerRecoveryException extends BKException {
+ public BKLedgerRecoveryException() {
+ super(Code.LedgerRecoveryException);
+ }
+ }
+
+ public static class BKLedgerClosedException extends BKException {
+ public BKLedgerClosedException() {
+ super(Code.LedgerClosedException);
+ }
}
}
-
Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookKeeper.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookKeeper.java?rev=903483&r1=903482&r2=903483&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookKeeper.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookKeeper.java Tue Jan 26 23:16:45 2010
@@ -1,4 +1,5 @@
-package org. apache.bookkeeper.client;
+package org.apache.bookkeeper.client;
+
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -20,597 +21,332 @@
*
*/
-
import java.io.IOException;
-import java.net.ConnectException;
-import java.nio.ByteBuffer;
-import java.nio.channels.UnresolvedAddressException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.HashMap;
-import java.util.Random;
-import java.net.InetSocketAddress;
-
+import java.util.concurrent.Executors;
import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.client.BookieHandle;
-import org.apache.bookkeeper.client.LedgerSequence;
import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
import org.apache.bookkeeper.client.BKException.Code;
-import org.apache.bookkeeper.client.LedgerHandle.QMode;
-import org.apache.bookkeeper.client.LedgerManagementProcessor.CreateLedgerOp;
-import org.apache.bookkeeper.client.LedgerManagementProcessor.OpenLedgerOp;
+import org.apache.bookkeeper.client.SyncCounter;
+import org.apache.bookkeeper.proto.BookieClient;
+import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.apache.log4j.Logger;
-import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.WatchedEvent;
-
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
/**
- * BookKeeper client. We assume there is one single writer
- * to a ledger at any time.
+ * BookKeeper client. We assume there is one single writer to a ledger at any
+ * time.
*
- * There are three possible operations: start a new ledger,
- * write to a ledger, and read from a ledger.
+ * There are three possible operations: start a new ledger, write to a ledger,
+ * and read from a ledger.
+ *
+ * The exceptions resulting from synchronous calls and error code resulting from
+ * asynchronous calls can be found in the class {@link BKException}.
*
- * For the ZooKeeper layout, please refer to BKDefs.java.
*
*/
-public class BookKeeper
-implements Watcher {
-
- Logger LOG = Logger.getLogger(BookKeeper.class);
-
- ZooKeeper zk = null;
-
+public class BookKeeper implements OpenCallback, CreateCallback {
+
+ static final Logger LOG = Logger.getLogger(BookKeeper.class);
+
+ ZooKeeper zk = null;
+ // whether the zk handle is one we created, or is owned by whoever
+ // instantiated us
+ boolean ownZKHandle = false;
+
+ ClientSocketChannelFactory channelFactory;
+ // whether the socket factory is one we created, or is owned by whoever
+ // instantiated us
+ boolean ownChannelFactory = false;
+
+ BookieClient bookieClient;
+ BookieWatcher bookieWatcher;
+
+ OrderedSafeExecutor callbackWorker = new OrderedSafeExecutor(Runtime
+ .getRuntime().availableProcessors());
+ OrderedSafeExecutor mainWorkerPool = new OrderedSafeExecutor(Runtime
+ .getRuntime().availableProcessors());
+
+ /**
+ * Create a bookkeeper client. A zookeeper client and a client socket factory
+ * will be instantiated as part of this constructor.
+ *
+ * @param servers
+ * A list of one of more servers on which zookeeper is running. The
+ * client assumes that the running bookies have been registered with
+ * zookeeper under the path
+ * {@link BookieWatcher#BOOKIE_REGISTRATION_PATH}
+ * @throws IOException
+ * @throws InterruptedException
+ * @throws KeeperException
+ */
+ public BookKeeper(String servers) throws IOException, InterruptedException,
+ KeeperException {
+ this(new ZooKeeper(servers, 10000, new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ // TODO: handle session disconnects and expires
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Process: " + event.getType() + " " + event.getPath());
+ }
+ }
+ }), new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
+ Executors.newCachedThreadPool()));
+
+ ownZKHandle = true;
+ ownChannelFactory = true;
+ }
+
+ /**
+ * Create a bookkeeper client but use the passed in zookeeper client instead
+ * of instantiating one.
+ *
+ * @param zk
+ * Zookeeper client instance connected to the zookeeper with which
+ * the bookies have registered
+ * @throws InterruptedException
+ * @throws KeeperException
+ */
+ public BookKeeper(ZooKeeper zk) throws InterruptedException, KeeperException {
+ this(zk, new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
+ Executors.newCachedThreadPool()));
+ ownChannelFactory = true;
+ }
+
+ /**
+ * Create a bookkeeper client but use the passed in zookeeper client and
+ * client socket channel factory instead of instantiating those.
+ *
+ * @param zk
+ * Zookeeper client instance connected to the zookeeper with which
+ * the bookies have registered
+ * @param channelFactory
+ * A factory that will be used to create connections to the bookies
+ * @throws InterruptedException
+ * @throws KeeperException
+ */
+ public BookKeeper(ZooKeeper zk, ClientSocketChannelFactory channelFactory)
+ throws InterruptedException, KeeperException {
+ if (zk == null || channelFactory == null) {
+ throw new NullPointerException();
+ }
+ this.zk = zk;
+ this.channelFactory = channelFactory;
+ bookieWatcher = new BookieWatcher(this);
+ bookieWatcher.readBookiesBlocking();
+ bookieClient = new BookieClient(channelFactory, mainWorkerPool);
+ }
+
+ /**
+ * There are 2 digest types that can be used for verification. The CRC32 is
+ * cheap to compute but does not protect against byzantine bookies (i.e., a
+ * bookie might report fake bytes and a matching CRC32). The MAC code is more
+ * expensive to compute, but is protected by a password, i.e., a bookie can't
+ * report fake bytes with a mathching MAC unless it knows the password
+ */
+ public enum DigestType {
+ MAC, CRC32
+ };
+
+ public ZooKeeper getZkHandle() {
+ return zk;
+ }
+
+ /**
+ * Creates a new ledger asynchronously. To create a ledger, we need to specify
+ * the ensemble size, the quorum size, the digest type, a password, a callback
+ * implementation, and an optional control object. The ensemble size is how
+ * many bookies the entries should be striped among and the quorum size is the
+ * degree of replication of each entry. The digest type is either a MAC or a
+ * CRC. Note that the CRC option is not able to protect a client against a
+ * bookie that replaces an entry. The password is used not only to
+ * authenticate access to a ledger, but also to verify entries in ledgers.
+ *
+ * @param ensSize
+ * ensemble size
+ * @param qSize
+ * quorum size
+ * @param digestType
+ * digest type, either MAC or CRC32
+ * @param passwd
+ * password
+ * @param cb
+ * createCallback implementation
+ * @param ctx
+ * optional control object
+ */
+ public void asyncCreateLedger(int ensSize, int qSize, DigestType digestType,
+ byte[] passwd, CreateCallback cb, Object ctx) {
+
+ new LedgerCreateOp(this, ensSize, qSize, digestType, passwd, cb, ctx)
+ .initiate();
+
+ }
+
+ /**
+ * Create callback implementation for synchronous create call.
+ *
+ * @param rc
+ * return code
+ * @param lh
+ * ledger handle object
+ * @param ctx
+ * optional control object
+ */
+ public void createComplete(int rc, LedgerHandle lh, Object ctx) {
+ SyncCounter counter = (SyncCounter) ctx;
+ counter.setLh(lh);
+ counter.setrc(rc);
+ counter.dec();
+ }
+
+ /**
+ * Creates a new ledger. Default of 3 servers, and quorum of 2 servers.
+ *
+ * @param digestType
+ * digest type, either MAC or CRC32
+ * @param passwd
+ * password
+ * @return
+ * @throws KeeperException
+ * @throws InterruptedException
+ * @throws BKException
+ */
+ public LedgerHandle createLedger(DigestType digestType, byte passwd[])
+ throws KeeperException, BKException, InterruptedException, IOException {
+ return createLedger(3, 2, digestType, passwd);
+ }
+
+ /**
+ * Synchronous call to create ledger. Parameters match those of
+ * {@link #asyncCreateLedger(int, int, DigestType, byte[], CreateCallback, Object)}
+ *
+ * @param ensSize
+ * @param qSize
+ * @param digestType
+ * @param passwd
+ * @return
+ * @throws KeeperException
+ * @throws InterruptedException
+ * @throws IOException
+ * @throws BKException
+ */
+ public LedgerHandle createLedger(int ensSize, int qSize,
+ DigestType digestType, byte passwd[]) throws KeeperException,
+ InterruptedException, IOException, BKException {
+ SyncCounter counter = new SyncCounter();
+ counter.inc();
/*
- * The ledgerMngProcessor is a thread that processes
- * asynchronously requests that handle ledgers, such
- * as create, open, and close.
+ * Calls asynchronous version
*/
- private static LedgerManagementProcessor ledgerMngProcessor;
-
+ asyncCreateLedger(ensSize, qSize, digestType, passwd, this, counter);
+
/*
- * Blacklist of bookies
- */
- HashSet<InetSocketAddress> bookieBlackList;
-
- LedgerSequence responseRead;
- Long responseLong;
-
- public BookKeeper(String servers)
- throws KeeperException, IOException{
- LOG.debug("Creating BookKeeper for servers " + servers);
- //Create ZooKeeper object
- this.zk = new ZooKeeper(servers, 10000, this);
-
- //List to enable clients to blacklist bookies
- this.bookieBlackList = new HashSet<InetSocketAddress>();
- }
-
- /**
- * Watcher method.
- */
- synchronized public void process(WatchedEvent event) {
- LOG.debug("Process: " + event.getType() + " " + event.getPath());
- }
-
- /**
- * Formats ledger ID according to ZooKeeper rules
- *
- * @param id znode id
- */
- String getZKStringId(long id){
- return String.format("%010d", id);
- }
-
- /**
- * return the zookeeper instance
- * @return return the zookeeper instance
+ * Wait
*/
- ZooKeeper getZooKeeper() {
- return zk;
- }
-
- LedgerManagementProcessor getMngProcessor(){
- if (ledgerMngProcessor == null){
- ledgerMngProcessor = new LedgerManagementProcessor(this);
- ledgerMngProcessor.start();
- }
- return ledgerMngProcessor;
- }
-
- /**
- * Creates a new ledger. To create a ledger, we need to specify the ensemble
- * size, the quorum size, the operation mode, and a password. The ensemble size
- * and the quorum size depend upon the operation mode. The operation mode can be
- * GENERIC, VERIFIABLE, or FREEFORM (debugging). The password is used not only
- * to authenticate access to a ledger, but also to verify entries in verifiable
- * ledgers.
- *
- * @param ensSize ensemble size
- * @param qSize quorum size
- * @param mode quorum mode: VERIFIABLE (default), GENERIC, or FREEFORM
- * @param passwd password
- */
- public LedgerHandle createLedger(int ensSize, int qSize, QMode mode, byte passwd[])
- throws KeeperException, InterruptedException,
- IOException, BKException {
- // Check that quorum size follows the minimum
- long t;
- LedgerHandle lh = null;
-
- switch(mode){
- case VERIFIABLE:
- t = java.lang.Math.round(java.lang.Math.floor((ensSize - 1)/2));
- if(t == 0){
- LOG.error("Tolerates 0 bookie failures");
- throw BKException.create(Code.QuorumException);
- }
- break;
- case GENERIC:
- t = java.lang.Math.round(java.lang.Math.floor((ensSize - 1)/3));
- if(t == 0){
- LOG.error("Tolerates 0 bookie failures");
- throw BKException.create(Code.QuorumException);
- }
- break;
- case FREEFORM:
- break;
- }
- /*
- * Create ledger node on ZK.
- * We get the id from the sequence number on the node.
- */
- String path = zk.create(BKDefs.prefix, new byte[0],
- Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
- /*
- * Extract ledger id.
- */
- String parts[] = path.split("/");
- String subparts[] = parts[2].split("L");
- try{
- long lId = Long.parseLong(subparts[1]);
-
- /*
- * Get children from "/ledgers/available" on zk
- */
- List<String> list =
- zk.getChildren("/ledgers/available", false);
- ArrayList<InetSocketAddress> lBookies = new ArrayList<InetSocketAddress>();
- /*
- * Select ensSize servers to form the ensemble
- */
- path = zk.create(BKDefs.prefix + getZKStringId(lId) + BKDefs.ensemble, new byte[0],
- Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-
- /*
- * Add quorum size to ZK metadata
- */
- ByteBuffer bb = ByteBuffer.allocate(4);
- bb.putInt(qSize);
- zk.create(BKDefs.prefix + getZKStringId(lId) + BKDefs.quorumSize, bb.array(),
- Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- /*
- * Quorum mode
- */
- bb = ByteBuffer.allocate(4);
- bb.putInt(mode.ordinal());
- zk.create(BKDefs.prefix + getZKStringId(lId) + BKDefs.quorumMode, bb.array(),
- Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- /*
- * Create QuorumEngine
- */
- lh = new LedgerHandle(this, lId, 0, qSize, mode, passwd);
-
- /*
- * Adding bookies to ledger handle
- */
- Random r = new Random();
-
- for(int i = 0; i < ensSize; i++){
- int index = 0;
- if(list.size() > 1)
- index = r.nextInt(list.size() - 1);
- else if(list.size() == 1)
- index = 0;
- else {
- LOG.error("Not enough bookies available");
-
- return null;
- }
-
- try{
- String bookie = list.remove(index);
- LOG.info("Bookie: " + bookie);
- InetSocketAddress tAddr = parseAddr(bookie);
- int bindex = lh.addBookieForWriting(tAddr);
- ByteBuffer bindexBuf = ByteBuffer.allocate(4);
- bindexBuf.putInt(bindex);
-
- String pBookie = "/" + bookie;
- zk.create(BKDefs.prefix + getZKStringId(lId) + BKDefs.ensemble + pBookie, bindexBuf.array(),
- Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- } catch (IOException e) {
- LOG.error(e);
- i--;
- }
- }
- LOG.debug("Created new ledger");
- } catch (NumberFormatException e) {
- LOG.error("Error when parsing the ledger identifier", e);
- }
- // Return ledger handler
- return lh;
- }
+ counter.block(0);
+ if (counter.getLh() == null) {
+ LOG.error("ZooKeeper error: " + counter.getrc());
+ throw BKException.create(Code.ZKException);
+ }
+
+ return counter.getLh();
+ }
+
+ /**
+ * Open existing ledger asynchronously for reading.
+ *
+ * @param lId
+ * ledger identifier
+ * @param digestType
+ * digest type, either MAC or CRC32
+ * @param passwd
+ * password
+ * @param ctx
+ * optional control object
+ */
+ public void asyncOpenLedger(long lId, DigestType digestType, byte passwd[],
+ OpenCallback cb, Object ctx) {
+
+ new LedgerOpenOp(this, lId, digestType, passwd, cb, ctx).initiate();
+
+ }
+
+ /**
+ * Callback method for synchronous open operation
+ *
+ * @param rc
+ * return code
+ * @param lh
+ * ledger handle
+ * @param ctx
+ * optional control object
+ */
+ public void openComplete(int rc, LedgerHandle lh, Object ctx) {
+ SyncCounter counter = (SyncCounter) ctx;
+ counter.setLh(lh);
+
+ LOG.debug("Open complete: " + rc);
+
+ counter.setrc(rc);
+ counter.dec();
+ }
+
+ /**
+ * Synchronous open ledger call
+ *
+ * @param lId
+ * ledger identifier
+ * @param digestType
+ * digest type, either MAC or CRC32
+ * @param passwd
+ * password
+ * @return
+ * @throws InterruptedException
+ * @throws BKException
+ */
+
+ public LedgerHandle openLedger(long lId, DigestType digestType, byte passwd[])
+ throws BKException, InterruptedException {
+ SyncCounter counter = new SyncCounter();
+ counter.inc();
- /**
- * Creates a new ledger. Default of 3 servers, and quorum of 2 servers,
- * verifiable ledger.
- *
- * @param passwd password
+ /*
+ * Calls async open ledger
*/
- public LedgerHandle createLedger(byte passwd[])
- throws KeeperException, BKException,
- InterruptedException, IOException {
- return createLedger(3, 2, QMode.VERIFIABLE, passwd);
- }
+ asyncOpenLedger(lId, digestType, passwd, this, counter);
- /**
- * Asychronous call to create ledger
- *
- * @param ensSize
- * @param qSize
- * @param mode
- * @param passwd
- * @param cb
- * @param ctx
- * @throws KeeperException
- * @throws InterruptedException
- * @throws IOException
- * @throws BKException
- */
- public void asyncCreateLedger(int ensSize,
- int qSize,
- QMode mode,
- byte passwd[],
- CreateCallback cb,
- Object ctx
- )
- throws KeeperException, InterruptedException,
- IOException, BKException {
- CreateLedgerOp op = new CreateLedgerOp(ensSize,
- qSize,
- mode,
- passwd,
- cb,
- ctx);
- LedgerManagementProcessor lmp = getMngProcessor();
- lmp.addOp(op);
-
- }
-
- /**
- * Open existing ledger for reading. Default for quorum size is 2.
- *
- * @param long the long corresponding to the ledger id
- * @param byte[] byte array corresponding to the password to access a ledger
- * @param int the quorum size, it has to be at least ceil(n+1/2)
- */
- public LedgerHandle openLedger(long lId, byte passwd[])
- throws KeeperException, InterruptedException, IOException, BKException {
-
- Stat stat = null;
-
- /*
- * Check if ledger exists
- */
- if(zk.exists(BKDefs.prefix + getZKStringId(lId), false) == null){
- LOG.error("Ledger " + getZKStringId(lId) + " doesn't exist.");
- throw BKException.create(Code.NoSuchLedgerExistsException);
- }
-
- /*
- * Get quorum size.
- */
- ByteBuffer bb = ByteBuffer.wrap(zk.getData(BKDefs.prefix + getZKStringId(lId) + BKDefs.quorumSize, false, stat));
- int qSize = bb.getInt();
-
- /*
- * Get last entry written from ZK
- */
-
- long last = 0;
- LOG.debug("Close path: " + BKDefs.prefix + getZKStringId(lId) + BKDefs.close);
- if(zk.exists(BKDefs.prefix + getZKStringId(lId) + BKDefs.close, false) == null){
- recoverLedger(lId, passwd);
- }
-
- stat = null;
- byte[] data = zk.getData(BKDefs.prefix + getZKStringId(lId) + BKDefs.close, false, stat);
- ByteBuffer buf = ByteBuffer.wrap(data);
- last = buf.getLong();
- //zk.delete(BKDefs.prefix + getZKStringId(lId) + BKDefs.close, -1);
-
- /*
- * Quorum mode
- */
- data = zk.getData(BKDefs.prefix + getZKStringId(lId) + BKDefs.quorumMode, false, stat);
- buf = ByteBuffer.wrap(data);
-
- QMode qMode;
- switch(buf.getInt()){
- case 1:
- qMode = QMode.GENERIC;
- LOG.info("Generic ledger");
- break;
- case 2:
- qMode = QMode.FREEFORM;
- break;
- default:
- qMode = QMode.VERIFIABLE;
- LOG.info("Verifiable ledger");
- }
-
- /*
- * Create QuorumEngine
- */
- LedgerHandle lh = new LedgerHandle(this, lId, last, qSize, qMode, passwd);
-
- /*
- * Get children of "/ledgers/id/ensemble"
- */
-
- List<String> list =
- zk.getChildren(BKDefs.prefix + getZKStringId(lId) + BKDefs.ensemble, false);
-
- LOG.debug("Length of list of bookies: " + list.size());
- for(int i = 0 ; i < list.size() ; i++){
- for(String s : list){
- LOG.debug("Extracting bookie: " + s);
- byte[] bindex = zk.getData(BKDefs.prefix + getZKStringId(lId) + BKDefs.ensemble + "/" + s, false, stat);
- ByteBuffer bindexBuf = ByteBuffer.wrap(bindex);
- if(bindexBuf.getInt() == i){
- try{
- lh.addBookieForReading(parseAddr(s));
- } catch (IOException e){
- LOG.error(e);
- }
- }
- }
- }
-
- /*
- * Read changes to quorum over time. To determine if there has been changes during
- * writes to the ledger, check if there is a znode called quorumEvolution.
- */
- if(zk.exists(BKDefs.prefix +
- getZKStringId(lh.getId()) +
- BKDefs.quorumEvolution, false) != null){
- String path = BKDefs.prefix +
- getZKStringId(lh.getId()) +
- BKDefs.quorumEvolution;
-
- List<String> faultList = zk.getChildren(path, false);
- try{
- for(String s : faultList){
- LOG.debug("Faulty list child: " + s);
- long entry = Long.parseLong(s);
- String addresses = new String(zk.getData(path + "/" + s, false, stat));
- String parts[] = addresses.split(" ");
-
- ArrayList<BookieHandle> newBookieSet = new ArrayList<BookieHandle>();
- for(int i = 0 ; i < parts.length ; i++){
- LOG.debug("Address: " + parts[i]);
- InetSocketAddress faultyBookie =
- parseAddr(parts[i].substring(1));
-
- newBookieSet.add(lh.getBookieHandleDup(faultyBookie));
- }
- lh.setNewBookieConfig(entry, newBookieSet);
- LOG.debug("NewBookieSet size: " + newBookieSet.size());
- }
-
- lh.prepareEntryChange();
- } catch (NumberFormatException e) {
- LOG.error("Error when parsing the ledger identifier", e);
- }
- }
-
- /*
- * Return ledger handler
- */
- return lh;
- }
-
- public void asyncOpenLedger(long lId, byte passwd[], OpenCallback cb, Object ctx)
- throws InterruptedException{
- OpenLedgerOp op = new OpenLedgerOp(lId,
- passwd,
- cb,
- ctx);
- LedgerManagementProcessor lmp = getMngProcessor();
- lmp.addOp(op);
- }
-
- /**
- * Parses address into IP and port.
- *
- * @param addr String
- */
-
- InetSocketAddress parseAddr(String s){
- String parts[] = s.split(":");
- if (parts.length != 2) {
- System.out.println(s
- + " does not have the form host:port");
- }
- InetSocketAddress addr = new InetSocketAddress(parts[0],
- Integer.parseInt(parts[1]));
- return addr;
- }
-
-
- /**
- * Check if close node exists.
- *
- * @param ledgerId id of the ledger to check
- */
- public boolean hasClosed(long ledgerId)
- throws KeeperException, InterruptedException{
- String closePath = BKDefs.prefix + getZKStringId(ledgerId) + BKDefs.close;
- if(zk.exists(closePath, false) == null) return false;
- else return true;
- }
-
- /**
- * Recover a ledger that was not closed properly.
- *
- * @param lId ledger identifier
- * @param passwd password
- */
-
- boolean recoverLedger(long lId, byte passwd[])
- throws KeeperException, InterruptedException, IOException, BKException {
-
- Stat stat = null;
-
- LOG.info("Recovering ledger");
-
- /*
- * Get quorum size.
- */
- ByteBuffer bb = ByteBuffer.wrap(zk.getData(BKDefs.prefix + getZKStringId(lId) + BKDefs.quorumSize, false, stat));
- int qSize = bb.getInt();
-
-
- /*
- * Get children of "/ledgers/id/ensemble"
- */
-
- List<String> list =
- zk.getChildren(BKDefs.prefix + getZKStringId(lId) + BKDefs.ensemble, false);
-
- ArrayList<InetSocketAddress> addresses = new ArrayList<InetSocketAddress>();
- for(String s : list){
- addresses.add(parseAddr(s));
- }
-
- /*
- * Quorum mode
- */
- byte[] data = zk.getData(BKDefs.prefix + getZKStringId(lId) + BKDefs.quorumMode, false, stat);
- ByteBuffer buf = ByteBuffer.wrap(data);
- //int ordinal = buf.getInt();
-
- QMode qMode = QMode.VERIFIABLE;
- switch(buf.getInt()){
- case 0:
- qMode = QMode.VERIFIABLE;
- break;
- case 1:
- qMode = QMode.GENERIC;
- break;
- case 2:
- qMode = QMode.FREEFORM;
- break;
- }
-
- /*
- * Create ledger recovery monitor object
- */
-
- LedgerRecoveryMonitor lrm = new LedgerRecoveryMonitor(this, lId, qSize, addresses, qMode);
-
- return lrm.recover(passwd);
- }
-
- /**
- * Get new bookies
- *
- * @param addrList list of bookies to replace
- */
- InetSocketAddress getNewBookie(ArrayList<InetSocketAddress> addrList)
- throws InterruptedException {
- try{
- // Get children from "/ledgers/available" on zk
- List<String> list =
- zk.getChildren("/ledgers/available", false);
- ArrayList<InetSocketAddress> lBookies = new ArrayList<InetSocketAddress>();
-
- for(String addr : list){
- InetSocketAddress nAddr = parseAddr(addr);
- if(!addrList.contains(nAddr) &&
- !bookieBlackList.contains(nAddr))
- return nAddr;
- }
- } catch (KeeperException e){
- LOG.error("Problem accessing ZooKeeper: " + e);
- }
-
- return null;
- }
-
- HashMap<InetSocketAddress, BookieHandle> bhMap =
- new HashMap<InetSocketAddress, BookieHandle>();
-
- /**
- * Keeps a list of available BookieHandle objects and returns
- * the corresponding object given an address.
- *
- * @param a InetSocketAddress
- */
-
- synchronized BookieHandle getBookieHandle(LedgerHandle lh, InetSocketAddress a)
- throws ConnectException, IOException {
- if(!bhMap.containsKey(a)){
- BookieHandle bh = new BookieHandle(a, true);
- bhMap.put(a, bh);
- bh.start();
- }
- bhMap.get(a).incRefCount(lh);
-
- return bhMap.get(a);
- }
-
- /**
- * When there are no more references to a BookieHandle,
- * remove it from the list.
- */
-
- synchronized void haltBookieHandles(LedgerHandle lh, ArrayList<BookieHandle> bookies){
- while(bookies.size() > 0){
- BookieHandle bh = bookies.remove(0);
- if(bh.halt(lh) <= 0)
- bhMap.remove(bh.addr);
- }
- }
-
- /**
- * Blacklists bookies.
- *
- * @param addr address of bookie
- */
- void blackListBookie(InetSocketAddress addr){
- bookieBlackList.add(addr);
- }
-
- /**
- * Halts all bookie handles
- *
+ /*
+ * Wait
*/
- public void halt() throws InterruptedException{
-
- for(BookieHandle bh: bhMap.values()){
- bh.shutdown();
- }
- zk.close();
- }
+ counter.block(0);
+ if (counter.getrc() != BKException.Code.OK)
+ throw BKException.create(counter.getrc());
+
+ return counter.getLh();
+ }
+
+ /**
+ * Shuts down client.
+ *
+ */
+ public void halt() throws InterruptedException {
+ bookieClient.close();
+ bookieWatcher.halt();
+ if (ownChannelFactory) {
+ channelFactory.releaseExternalResources();
+ }
+ if (ownZKHandle) {
+ zk.close();
+ }
+ callbackWorker.shutdown();
+ mainWorkerPool.shutdown();
+ }
}
Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookieWatcher.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookieWatcher.java?rev=903483&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookieWatcher.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookieWatcher.java Tue Jan 26 23:16:45 2010
@@ -0,0 +1,204 @@
+package org.apache.bookkeeper.client;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
+import org.apache.bookkeeper.util.SafeRunnable;
+import org.apache.bookkeeper.util.StringUtils;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.AsyncCallback.ChildrenCallback;
+import org.apache.zookeeper.KeeperException.Code;
+
+/**
+ * This class is responsible for maintaining a consistent view of what bookies
+ * are available by reading Zookeeper (and setting watches on the bookie nodes).
+ * When a bookie fails, the other parts of the code turn to this class to find a
+ * replacement
+ *
+ */
+class BookieWatcher implements Watcher, ChildrenCallback {
+ static final Logger logger = Logger.getLogger(BookieWatcher.class);
+
+ public static final String BOOKIE_REGISTRATION_PATH = "/ledgers/available";
+ static final Set<InetSocketAddress> EMPTY_SET = new HashSet<InetSocketAddress>();
+ public static int ZK_CONNECT_BACKOFF_SEC = 1;
+
+ BookKeeper bk;
+ ScheduledExecutorService scheduler;
+
+ Set<InetSocketAddress> knownBookies = new HashSet<InetSocketAddress>();
+
+ SafeRunnable reReadTask = new SafeRunnable() {
+ @Override
+ public void safeRun() {
+ readBookies();
+ }
+ };
+
+ public BookieWatcher(BookKeeper bk) {
+ this.bk = bk;
+ this.scheduler = Executors.newSingleThreadScheduledExecutor();
+ }
+
+ public void halt(){
+ scheduler.shutdown();
+ }
+
+ public void readBookies() {
+ readBookies(this);
+ }
+
+ public void readBookies(ChildrenCallback callback) {
+ bk.getZkHandle().getChildren( BOOKIE_REGISTRATION_PATH, this, callback, null);
+ }
+
+ @Override
+ public void process(WatchedEvent event) {
+ readBookies();
+ }
+
+ @Override
+ public void processResult(int rc, String path, Object ctx, List<String> children) {
+
+ if (rc != KeeperException.Code.OK.intValue()) {
+ //logger.error("Error while reading bookies", KeeperException.create(Code.get(rc), path));
+ // try the read after a second again
+ scheduler.schedule(reReadTask, ZK_CONNECT_BACKOFF_SEC, TimeUnit.SECONDS);
+ return;
+ }
+
+ // Read the bookie addresses into a set for efficient lookup
+ Set<InetSocketAddress> newBookieAddrs = new HashSet<InetSocketAddress>();
+ for (String bookieAddrString : children) {
+ InetSocketAddress bookieAddr;
+ try {
+ bookieAddr = StringUtils.parseAddr(bookieAddrString);
+ } catch (IOException e) {
+ logger.error("Could not parse bookie address: " + bookieAddrString + ", ignoring this bookie");
+ continue;
+ }
+ newBookieAddrs.add(bookieAddr);
+ }
+
+ synchronized (this) {
+ knownBookies = newBookieAddrs;
+ }
+ }
+
+ /**
+ * Blocks until bookies are read from zookeeper, used in the {@link BookKeeper} constructor.
+ * @throws InterruptedException
+ * @throws KeeperException
+ */
+ public void readBookiesBlocking() throws InterruptedException, KeeperException {
+ final LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>();
+ readBookies(new ChildrenCallback() {
+ public void processResult(int rc, String path, Object ctx, List<String> children) {
+ try {
+ BookieWatcher.this.processResult(rc, path, ctx, children);
+ queue.put(rc);
+ } catch (InterruptedException e) {
+ logger.error("Interruped when trying to read bookies in a blocking fashion");
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ int rc = queue.take();
+
+ if (rc != KeeperException.Code.OK.intValue()) {
+ throw KeeperException.create(Code.get(rc));
+ }
+ }
+
+ /**
+ * Wrapper over the {@link #getAdditionalBookies(Set, int)} method when there is no exclusion list (or exisiting bookies)
+ * @param numBookiesNeeded
+ * @return
+ * @throws BKNotEnoughBookiesException
+ */
+ public ArrayList<InetSocketAddress> getNewBookies(int numBookiesNeeded) throws BKNotEnoughBookiesException {
+ return getAdditionalBookies(EMPTY_SET, numBookiesNeeded);
+ }
+
+ /**
+ * Wrapper over the {@link #getAdditionalBookies(Set, int)} method when you just need 1 extra bookie
+ * @param existingBookies
+ * @return
+ * @throws BKNotEnoughBookiesException
+ */
+ public InetSocketAddress getAdditionalBookie(List<InetSocketAddress> existingBookies)
+ throws BKNotEnoughBookiesException {
+ return getAdditionalBookies(new HashSet<InetSocketAddress>(existingBookies), 1).get(0);
+ }
+
+ /**
+ * Returns additional bookies given an exclusion list and how many are needed
+ * @param existingBookies
+ * @param numAdditionalBookiesNeeded
+ * @return
+ * @throws BKNotEnoughBookiesException
+ */
+ public ArrayList<InetSocketAddress> getAdditionalBookies(Set<InetSocketAddress> existingBookies,
+ int numAdditionalBookiesNeeded) throws BKNotEnoughBookiesException {
+
+ ArrayList<InetSocketAddress> newBookies = new ArrayList<InetSocketAddress>();
+
+ if (numAdditionalBookiesNeeded <= 0) {
+ return newBookies;
+ }
+
+ List<InetSocketAddress> allBookies;
+
+ synchronized (this) {
+ allBookies = new ArrayList<InetSocketAddress>(knownBookies);
+ }
+
+ Collections.shuffle(allBookies);
+
+ for (InetSocketAddress bookie : allBookies) {
+ if (existingBookies.contains(bookie)) {
+ continue;
+ }
+
+ newBookies.add(bookie);
+ numAdditionalBookiesNeeded--;
+
+ if (numAdditionalBookiesNeeded == 0) {
+ return newBookies;
+ }
+ }
+
+ throw new BKNotEnoughBookiesException();
+ }
+
+}
Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/CRC32DigestManager.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/CRC32DigestManager.java?rev=903483&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/CRC32DigestManager.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/CRC32DigestManager.java Tue Jan 26 23:16:45 2010
@@ -0,0 +1,50 @@
+package org.apache.bookkeeper.client;
+
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+
+import java.nio.ByteBuffer;
+import java.util.zip.CRC32;
+
+class CRC32DigestManager extends DigestManager {
+ CRC32 crc = new CRC32();
+
+ public CRC32DigestManager(long ledgerId) {
+ super(ledgerId);
+ }
+
+ @Override
+ int getMacCodeLength() {
+ return 8;
+ }
+
+ @Override
+ byte[] getValueAndReset() {
+ byte[] value = new byte[8];
+ ByteBuffer buf = ByteBuffer.wrap(value);
+ buf.putLong(crc.getValue());
+ crc.reset();
+ return value;
+ }
+
+ @Override
+ void update(byte[] data, int offset, int length) {
+ crc.update(data, offset, length);
+ }
+}
Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/DigestManager.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/DigestManager.java?rev=903483&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/DigestManager.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/DigestManager.java Tue Jan 26 23:16:45 2010
@@ -0,0 +1,162 @@
+package org.apache.bookkeeper.client;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.nio.ByteBuffer;
+import java.security.GeneralSecurityException;
+
+import org.apache.bookkeeper.client.BKException.BKDigestMatchException;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.log4j.Logger;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBufferInputStream;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+/**
+ * This class takes an entry, attaches a digest to it and packages it with relevant
+ * data so that it can be shipped to the bookie. On the return side, it also
+ * gets a packet, checks that the digest matches, and extracts the original entry
+ * for the packet. Currently 2 types of digests are supported: MAC (based on SHA-1) and CRC32
+ */
+
+abstract class DigestManager {
+ static final Logger logger = Logger.getLogger(DigestManager.class);
+
+ long ledgerId;
+
+ abstract int getMacCodeLength();
+
+ void update(byte[] data){
+ update(data, 0, data.length);
+ }
+
+ abstract void update(byte[] data, int offset, int length);
+ abstract byte[] getValueAndReset();
+
+ final int macCodeLength;
+
+ public DigestManager(long ledgerId) {
+ this.ledgerId = ledgerId;
+ macCodeLength = getMacCodeLength();
+ }
+
+ static DigestManager instantiate(long ledgerId, byte[] passwd, DigestType digestType) throws GeneralSecurityException{
+ switch(digestType){
+ case MAC:
+ return new MacDigestManager(ledgerId, passwd);
+ case CRC32:
+ return new CRC32DigestManager(ledgerId);
+ default:
+ throw new GeneralSecurityException("Unknown checksum type: " + digestType);
+ }
+ }
+
+ ChannelBuffer computeDigestAndPackageForSending(long entryId, long lastAddConfirmed, byte[] data) {
+
+ byte[] bufferArray = new byte[24+macCodeLength];
+ ByteBuffer buffer = ByteBuffer.wrap(bufferArray);
+ buffer.putLong(ledgerId);
+ buffer.putLong(entryId);
+ buffer.putLong(lastAddConfirmed);
+ buffer.flip();
+
+ update(buffer.array(), 0, 24);
+ update(data);
+ byte[] digest = getValueAndReset();
+
+ buffer.limit(buffer.capacity());
+ buffer.position(24);
+ buffer.put(digest);
+ buffer.flip();
+
+ return ChannelBuffers.wrappedBuffer(ChannelBuffers.wrappedBuffer(buffer), ChannelBuffers.wrappedBuffer(data));
+ }
+
+ private void verifyDigest(ChannelBuffer dataReceived) throws BKDigestMatchException {
+ verifyDigest(-1, dataReceived, true);
+ }
+
+ private void verifyDigest(long entryId, ChannelBuffer dataReceived) throws BKDigestMatchException {
+ verifyDigest(entryId, dataReceived, false);
+ }
+
+ private void verifyDigest(long entryId, ChannelBuffer dataReceived, boolean skipEntryIdCheck)
+ throws BKDigestMatchException {
+
+ ByteBuffer dataReceivedBuffer = dataReceived.toByteBuffer();
+ byte[] digest;
+
+ update(dataReceivedBuffer.array(), dataReceivedBuffer.position(), 24);
+
+ int offset = 24 + macCodeLength;
+ update(dataReceivedBuffer.array(), dataReceivedBuffer.position() + offset, dataReceived.readableBytes() - offset);
+ digest = getValueAndReset();
+
+ for (int i = 0; i < digest.length; i++) {
+ if (digest[i] != dataReceived.getByte(24 + i)) {
+ logger.error("Mac mismatch for ledger-id: " + ledgerId + ", entry-id: " + entryId);
+ throw new BKDigestMatchException();
+ }
+ }
+
+ long actualLedgerId = dataReceived.readLong();
+ long actualEntryId = dataReceived.readLong();
+
+ if (actualLedgerId != ledgerId) {
+ logger.error("Ledger-id mismatch in authenticated message, expected: " + ledgerId + " , actual: "
+ + actualLedgerId);
+ throw new BKDigestMatchException();
+ }
+
+ if (!skipEntryIdCheck && actualEntryId != entryId) {
+ logger.error("Entry-id mismatch in authenticated message, expected: " + entryId + " , actual: "
+ + actualEntryId);
+ throw new BKDigestMatchException();
+ }
+
+ }
+
+ ChannelBufferInputStream verifyDigestAndReturnData(long entryId, ChannelBuffer dataReceived)
+ throws BKDigestMatchException {
+ verifyDigest(entryId, dataReceived);
+ dataReceived.readerIndex(24 + macCodeLength);
+ return new ChannelBufferInputStream(dataReceived);
+ }
+
+ static class RecoveryData {
+ long lastAddConfirmed;
+ long entryId;
+
+ public RecoveryData(long lastAddConfirmed, long entryId) {
+ this.lastAddConfirmed = lastAddConfirmed;
+ this.entryId = entryId;
+ }
+
+ }
+
+ RecoveryData verifyDigestAndReturnLastConfirmed(ChannelBuffer dataReceived) throws BKDigestMatchException {
+ verifyDigest(dataReceived);
+ dataReceived.readerIndex(8);
+
+ long entryId = dataReceived.readLong();
+ long lastAddConfirmed = dataReceived.readLong();
+ return new RecoveryData(lastAddConfirmed, entryId);
+
+ }
+}
Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/DistributionSchedule.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/DistributionSchedule.java?rev=903483&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/DistributionSchedule.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/DistributionSchedule.java Tue Jan 26 23:16:45 2010
@@ -0,0 +1,61 @@
+package org.apache.bookkeeper.client;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This interface determins how entries are distributed among bookies.
+ *
+ * Every entry gets replicated to some number of replicas. The first replica for
+ * an entry is given a replicaIndex of 0, and so on. To distribute write load,
+ * not all entries go to all bookies. Given an entry-id and replica index, an
+ * {@link DistributionSchedule} determines which bookie that replica should go
+ * to.
+ */
+
+interface DistributionSchedule {
+
+ /**
+ *
+ * @param entryId
+ * @param replicaIndex
+ * @return index of bookie that should get this replica
+ */
+ public int getBookieIndex(long entryId, int replicaIndex);
+
+ /**
+ *
+ * @param entryId
+ * @param bookieIndex
+ * @return -1 if the given bookie index is not a replica for the given
+ * entryId
+ */
+ public int getReplicaIndex(long entryId, int bookieIndex);
+
+ /**
+ * Specifies whether its ok to proceed with recovery given that we have
+ * heard back from the given bookie index. These calls will be a made in a
+ * sequence and an implementation of this interface should accumulate
+ * history about which bookie indexes we have heard from. Once this method
+ * has returned true, it wont be called again on the same instance
+ *
+ * @param bookieIndexHeardFrom
+ * @return true if its ok to proceed with recovery
+ */
+ public boolean canProceedWithRecovery(int bookieIndexHeardFrom);
+}
Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerCreateOp.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerCreateOp.java?rev=903483&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerCreateOp.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerCreateOp.java Tue Jan 26 23:16:45 2010
@@ -0,0 +1,163 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.client;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.security.GeneralSecurityException;
+import java.util.ArrayList;
+import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
+import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.util.StringUtils;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.AsyncCallback.StatCallback;
+import org.apache.zookeeper.AsyncCallback.StringCallback;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ * Encapsulates asynchronous ledger create operation
+ *
+ */
+class LedgerCreateOp implements StringCallback, StatCallback {
+
+ static final Logger LOG = Logger.getLogger(LedgerCreateOp.class);
+
+ CreateCallback cb;
+ LedgerMetadata metadata;
+ LedgerHandle lh;
+ Object ctx;
+ byte[] passwd;
+ BookKeeper bk;
+ DigestType digestType;
+
+ /**
+ * Constructor
+ *
+ * @param bk
+ * BookKeeper object
+ * @param ensembleSize
+ * ensemble size
+ * @param quorumSize
+ * quorum size
+ * @param digestType
+ * digest type, either MAC or CRC32
+ * @param passwd
+ * passowrd
+ * @param cb
+ * callback implementation
+ * @param ctx
+ * optional control object
+ */
+
+ LedgerCreateOp(BookKeeper bk, int ensembleSize, int quorumSize, DigestType digestType, byte[] passwd, CreateCallback cb, Object ctx) {
+ this.bk = bk;
+ this.metadata = new LedgerMetadata(ensembleSize, quorumSize);
+ this.digestType = digestType;
+ this.passwd = passwd;
+ this.cb = cb;
+ this.ctx = ctx;
+ }
+
+ /**
+ * Initiates the operation
+ */
+ public void initiate() {
+ /*
+ * Create ledger node on ZK. We get the id from the sequence number on
+ * the node.
+ */
+
+ bk.getZkHandle().create(StringUtils.prefix, new byte[0], Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT_SEQUENTIAL, this, null);
+
+ // calls the children callback method below
+ }
+
+
+ /**
+ * Implements ZooKeeper string callback.
+ *
+ * @see org.apache.zookeeper.AsyncCallback.StringCallback#processResult(int, java.lang.String, java.lang.Object, java.lang.String)
+ */
+ public void processResult(int rc, String path, Object ctx, String name) {
+
+ if (rc != KeeperException.Code.OK.intValue()) {
+ LOG.error("Could not create node for ledger", KeeperException.create(KeeperException.Code.get(rc), path));
+ cb.createComplete(BKException.Code.ZKException, null, this.ctx);
+ return;
+ }
+
+ /*
+ * Extract ledger id.
+ */
+ long ledgerId;
+ try {
+ ledgerId = StringUtils.getLedgerId(name);
+ } catch (IOException e) {
+ LOG.error("Could not extract ledger-id from path:" + path, e);
+ cb.createComplete(BKException.Code.ZKException, null, this.ctx);
+ return;
+ }
+
+ /*
+ * Adding bookies to ledger handle
+ */
+
+ ArrayList<InetSocketAddress> ensemble;
+ try {
+ ensemble = bk.bookieWatcher.getNewBookies(metadata.ensembleSize);
+ } catch (BKNotEnoughBookiesException e) {
+ LOG.error("Not enough bookies to create ledger" + ledgerId);
+ cb.createComplete(e.getCode(), null, this.ctx);
+ return;
+ }
+
+ /*
+ * Add ensemble to the configuration
+ */
+ metadata.addEnsemble(new Long(0), ensemble);
+ try {
+ lh = new LedgerHandle(bk, ledgerId, metadata, digestType, passwd);
+ } catch (GeneralSecurityException e) {
+ LOG.error("Security exception while creating ledger: " + ledgerId, e);
+ cb.createComplete(BKException.Code.DigestNotInitializedException, null, this.ctx);
+ return;
+ }
+
+ lh.writeLedgerConfig(this, null);
+
+ }
+
+ /**
+ * Implements ZooKeeper stat callback.
+ *
+ * @see org.apache.zookeeper.AsyncCallback.StatCallback#processResult(int, String, Object, Stat)
+ */
+ public void processResult(int rc, String path, Object ctx, Stat stat) {
+ cb.createComplete(rc, lh, this.ctx);
+ }
+
+}
Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerEntry.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerEntry.java?rev=903483&r1=903482&r2=903483&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerEntry.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerEntry.java Tue Jan 26 23:16:45 2010
@@ -1,4 +1,5 @@
package org.apache.bookkeeper.client;
+
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -20,39 +21,58 @@
*
*/
-
+import java.io.IOException;
+import java.io.InputStream;
import org.apache.log4j.Logger;
+import org.jboss.netty.buffer.ChannelBufferInputStream;
/**
- * Ledger entry. Currently only holds the necessary
- * fields to identify a ledger entry, and the entry
- * content.
+ * Ledger entry. Its a simple tuple containing the ledger id, the entry-id, and
+ * the entry content.
*
*/
public class LedgerEntry {
- Logger LOG = Logger.getLogger(LedgerEntry.class);
-
- private long lId;
- private long eId;
- private byte[] entry;
-
- LedgerEntry(long lId, long eId, byte[] entry){
- this.lId = lId;
- this.eId = eId;
- this.entry = entry;
- }
-
- public long getLedgerId(){
- return lId;
- }
-
- public long getEntryId(){
- return eId;
- }
-
- public byte[] getEntry(){
- return entry;
+ Logger LOG = Logger.getLogger(LedgerEntry.class);
+
+ long ledgerId;
+ long entryId;
+ ChannelBufferInputStream entryDataStream;
+
+ int nextReplicaIndexToReadFrom = 0;
+
+ LedgerEntry(long lId, long eId) {
+ this.ledgerId = lId;
+ this.entryId = eId;
+ }
+
+ public long getLedgerId() {
+ return ledgerId;
+ }
+
+ public long getEntryId() {
+ return entryId;
+ }
+
+ public byte[] getEntry() {
+ try {
+ // In general, you can't rely on the available() method of an input
+ // stream, but ChannelBufferInputStream is backed by a byte[] so it
+ // accurately knows the # bytes available
+ byte[] ret = new byte[entryDataStream.available()];
+ entryDataStream.readFully(ret);
+ return ret;
+ } catch (IOException e) {
+ // The channelbufferinput stream doesnt really throw the
+ // ioexceptions, it just has to be in the signature because
+ // InputStream says so. Hence this code, should never be reached.
+ LOG.fatal("Unexpected IOException while reading from channel buffer", e);
+ return new byte[0];
}
+ }
+
+ public InputStream getEntryInputStream() {
+ return entryDataStream;
+ }
}