You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ma...@apache.org on 2010/01/27 00:16:49 UTC

svn commit: r903483 [4/6] - in /hadoop/zookeeper/trunk: ./ src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/ src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/ src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/ src/cont...

Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieClient.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieClient.java?rev=903483&r1=903482&r2=903483&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieClient.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieClient.java Tue Jan 26 23:16:45 2010
@@ -1,4 +1,5 @@
 package org.apache.bookkeeper.proto;
+
 /*
  * 
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -20,391 +21,126 @@
  * 
  */
 
-
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.net.ConnectException;
-import java.nio.ByteBuffer;
-import java.nio.channels.SocketChannel;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import java.util.Enumeration;
-import java.security.NoSuchAlgorithmException;
-import java.security.InvalidKeyException;
-import java.security.MessageDigest;
-import javax.crypto.Mac; 
-import javax.crypto.spec.SecretKeySpec;
-
-//import org.apache.bookkeeper.client.AsyncCallback.FailCallback;
-import org.apache.bookkeeper.client.BookieHandle;
-import org.apache.bookkeeper.proto.ReadEntryCallback;
-import org.apache.bookkeeper.proto.WriteCallback;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
+import org.apache.bookkeeper.util.OrderedSafeExecutor;
 import org.apache.log4j.Logger;
-
-
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
 
 /**
- * Implements the client-side part of the BookKeeper protocol. 
+ * Implements the client-side part of the BookKeeper protocol.
  * 
- */    
-public class BookieClient extends Thread {
-	Logger LOG = Logger.getLogger(BookieClient.class);
-    SocketChannel sock;
-    int myCounter = 0;
-
-    public BookieClient(InetSocketAddress addr, int recvTimeout)
-    throws IOException, ConnectException { 
-        startConnection(addr, recvTimeout);
-    }
-    
-    public BookieClient(String host, int port, int recvTimeout)
-    throws IOException, ConnectException {
-        this(new InetSocketAddress(host, port), recvTimeout);
-    }
-    
-    public void startConnection(InetSocketAddress addr, int recvTimeout)
-    throws IOException, ConnectException {
-        sock = SocketChannel.open(addr);
-        setDaemon(true);
-        //sock.configureBlocking(false);
-        sock.socket().setSoTimeout(recvTimeout);
-        sock.socket().setTcpNoDelay(true);
-        start();        
-    }
-    
-    private static class Completion<T> {
-        Completion(T cb, Object ctx) {
-            this.cb = cb;
-            this.ctx = ctx;
-        }
+ */
+public class BookieClient {
+    static final Logger LOG = Logger.getLogger(BookieClient.class);
+
+    // This is global state that should be across all BookieClients
+    AtomicLong totalBytesOutstanding = new AtomicLong();
 
-        T cb;
-        Object ctx;
+    OrderedSafeExecutor executor;
+    ClientSocketChannelFactory channelFactory;
+    ConcurrentHashMap<InetSocketAddress, PerChannelBookieClient> channels = new ConcurrentHashMap<InetSocketAddress, PerChannelBookieClient>();
+
+    public BookieClient(ClientSocketChannelFactory channelFactory, OrderedSafeExecutor executor) {
+        this.channelFactory = channelFactory;
+        this.executor = executor;
     }
 
-    private static class CompletionKey {
-        long ledgerId;
-        long entryId;
-
-        CompletionKey(long ledgerId, long entryId) {
-            this.ledgerId = ledgerId;
-            this.entryId = entryId;
-        }
+    public PerChannelBookieClient lookupClient(InetSocketAddress addr) {
+        PerChannelBookieClient channel = channels.get(addr);
 
-        @Override
-        public boolean equals(Object obj) {
-            if (!(obj instanceof CompletionKey) || obj == null) {
-                return false;
+        if (channel == null) {
+            channel = new PerChannelBookieClient(executor, channelFactory, addr, totalBytesOutstanding);
+            PerChannelBookieClient prevChannel = channels.putIfAbsent(addr, channel);
+            if (prevChannel != null) {
+                channel = prevChannel;
             }
-            CompletionKey that = (CompletionKey) obj;
-            return this.ledgerId == that.ledgerId && this.entryId == that.entryId;
-        }
-
-        @Override
-        public int hashCode() {
-            return ((int) ledgerId << 16) ^ ((int) entryId);
         }
 
+        return channel;
     }
 
-    ConcurrentHashMap<CompletionKey, Completion<WriteCallback>> addCompletions = 
-        new ConcurrentHashMap<CompletionKey, Completion<WriteCallback>>();
-    
-    ConcurrentHashMap<CompletionKey, Completion<ReadEntryCallback>> readCompletions =
-        new ConcurrentHashMap<CompletionKey, Completion<ReadEntryCallback>>();
-    
-    /*
-     * Use this semaphore to control the number of completion key in both addCompletions
-     * and readCompletions. This is more of a problem for readCompletions because one
-     * readEntries opertion is expanded into individual operations to read entries.
-     */
-    Semaphore completionSemaphore = new Semaphore(3000);
-    
-   
-    /**
-     * Message disgest instance
-     * 
-     */
-    MessageDigest digest = null;
-    
-    /** 
-     * Get digest instance if there is none.
-     * 
-     */
-    public MessageDigest getDigestInstance(String alg)
-    throws NoSuchAlgorithmException {
-        if(digest == null){
-            digest = MessageDigest.getInstance(alg);
-        }
-        
-        return digest;
-    }
-    
-    /**
-     * Mac instance
-     * 
-     */
-    Mac mac = null;
-    
-    public Mac getMac(String alg, byte[] key)
-    throws NoSuchAlgorithmException, InvalidKeyException {
-        if(mac == null){
-            mac = Mac.getInstance(alg);
-            mac.init(new SecretKeySpec(key, "HmacSHA1"));
-        }
-        
-        return mac;
-    }
-    
-    /**
-     * Send addEntry operation to bookie. It throws an IOException
-     * if either the write to the socket fails or it takes too long
-     * to obtain a permit to send another request, which possibly 
-     * implies that the corresponding bookie is down.
-     * 
-     * @param ledgerId	ledger identifier
-     * @param entryId 	entry identifier
-     * @param cb		object implementing callback method
-     * @param ctx		control object
-     * @throws IOException
-     * @throws InterruptedException
-     */
-    synchronized public void addEntry(long ledgerId, byte[] masterKey, long entryId,
-            ByteBuffer entry, WriteCallback cb, Object ctx) 
-    throws IOException, InterruptedException {
-        
-        if(cb == null)
-            LOG.error("WriteCallback object is null: " + entryId);
-        addCompletions.put(new CompletionKey(ledgerId, entryId),
-                new Completion<WriteCallback>(cb, ctx));
-
-        ByteBuffer tmpEntry = ByteBuffer.allocate(entry.remaining() + 44);
-
-        tmpEntry.position(4);
-        tmpEntry.putInt(BookieProtocol.ADDENTRY);
-        tmpEntry.put(masterKey);
-        tmpEntry.putLong(ledgerId);
-        tmpEntry.putLong(entryId);
-        tmpEntry.put(entry);
-        tmpEntry.position(0);
-        
-        // 4 bytes for the message type
-        tmpEntry.putInt(tmpEntry.remaining() - 4);
-        tmpEntry.position(0);
-
-        
-        if(!sock.isConnected() || 
-                !completionSemaphore.tryAcquire(1000, TimeUnit.MILLISECONDS)){ 
-            throw new IOException();
-        } else sock.write(tmpEntry);
-    }
-    
-    /**
-     * Send readEntry operation to bookie. It throws an IOException
-     * if either the write to the socket fails or it takes too long
-     * to obtain a permit to send another request, which possibly 
-     * implies that the corresponding bookie is down.
-     * 
-     * @param ledgerId	ledger identifier
-     * @param entryId	entry identifier
-     * @param cb		object implementing callback method
-     * @param ctx		control object
-     * @throws IOException
-     */
-    synchronized public void readEntry(long ledgerId, long entryId,
-            ReadEntryCallback cb, Object ctx) 
-    throws IOException, InterruptedException {
-        //LOG.info("Entry id: " + entryId);
-    	//completionSemaphore.acquire();
-        readCompletions.put(new CompletionKey(ledgerId, entryId),
-                new Completion<ReadEntryCallback>(cb, ctx));
-        
-        ByteBuffer tmpEntry = ByteBuffer.allocate(8 + 8 + 8);
-        tmpEntry.putInt(20);
-        tmpEntry.putInt(BookieProtocol.READENTRY);
-        tmpEntry.putLong(ledgerId);
-        tmpEntry.putLong(entryId);
-        tmpEntry.position(0);
-
-        if(!sock.isConnected() || 
-                !completionSemaphore.tryAcquire(1000, TimeUnit.MILLISECONDS)){ 
-            throw new IOException();
-        } else sock.write(tmpEntry);
-    }
-    
-    private void readFully(ByteBuffer bb) throws IOException {
-        while(bb.remaining() > 0) {
-            sock.read(bb);
-        }
-    }
-    
-    Semaphore running = new Semaphore(0);
-    public void run() {
-        int len = -1;
-        ByteBuffer lenBuffer = ByteBuffer.allocate(4);
-        int type = -1, rc = -1;
-        try {
-            while(sock.isConnected()) {
-                lenBuffer.clear();
-                readFully(lenBuffer);
-                lenBuffer.flip();
-                len = lenBuffer.getInt();
-                ByteBuffer bb = ByteBuffer.allocate(len);
-                readFully(bb);
-                bb.flip();
-                type = bb.getInt();
-                rc = bb.getInt();
- 
-                switch(type) {
-                case BookieProtocol.ADDENTRY:
-                {                    
-                    long ledgerId = bb.getLong();
-                    long entryId = bb.getLong();
-
-                    Completion<WriteCallback> ac;
-                    ac = addCompletions.remove(new CompletionKey(ledgerId, entryId));
-                    completionSemaphore.release();
-                    if (ac != null) {
-                        ac.cb.writeComplete(rc, ledgerId, entryId, ac.ctx);
-                    } else {
-                        LOG.error("Callback object null: " + ledgerId + " : " + entryId);
-                    }
+    public void addEntry(final InetSocketAddress addr, final long ledgerId, final byte[] masterKey, final long entryId,
+            final ChannelBuffer toSend, final WriteCallback cb, final Object ctx) {
 
-                    break;
-                }
-                case BookieProtocol.READENTRY:
-                {
-                    long ledgerId = bb.getLong();
-                    long entryId = bb.getLong();
-                    
-                    bb.position(24);
-                    byte[] data = new byte[bb.capacity() - 24];
-                    bb.get(data);
-                    ByteBuffer entryData = ByteBuffer.wrap(data);         
-                    
-                    CompletionKey key = new CompletionKey(ledgerId, entryId);
-                    Completion<ReadEntryCallback> c;
-                    
-                    if(readCompletions.containsKey(key)){
-                            c = readCompletions.remove(key);
-                    }
-                    else{    
-                            /*
-                             * This is a special case. When recovering a ledger, a client submits
-                             * a read request with id -1, and receives a response with a different
-                             * entry id.
-                             */
-                            c = readCompletions.remove(new CompletionKey(ledgerId, -1));
-                    }
-                    completionSemaphore.release();
-                    
-                    if (c != null) {
-                        c.cb.readEntryComplete(rc, 
-                                ledgerId, 
-                                entryId, 
-                                entryData, 
-                                c.ctx);
-                    }
-                    break;
-                }
-                default:
-                    System.err.println("Got error " + rc + " for type " + type);
+        final PerChannelBookieClient client = lookupClient(addr);
+
+        client.connectIfNeededAndDoOp(new GenericCallback<Void>() {
+            @Override
+            public void operationComplete(int rc, Void result) {
+                if (rc != BKException.Code.OK) {
+                    cb.writeComplete(rc, ledgerId, entryId, addr, ctx);
+                    return;
                 }
+                client.addEntry(ledgerId, masterKey, entryId, toSend, cb, ctx);
             }
-            
-        } catch(Exception e) {
-            LOG.error("Len = " + len + ", Type = " + type + ", rc = " + rc);
-        }
-        running.release();
-        
+        });
     }
-    
-    /**
-     * Errors out pending entries. We call this method from one thread to avoid
-     * concurrent executions to QuorumOpMonitor (implements callbacks). It seems
-     * simpler to call it from BookieHandle instead of calling directly from here.
-     */
-    
-    public void errorOut(){
-        LOG.info("Erroring out pending entries");
-    
-        for (Enumeration<CompletionKey> e = addCompletions.keys() ; e.hasMoreElements() ;) {
-            CompletionKey key = e.nextElement();
-            Completion<WriteCallback> ac = addCompletions.remove(key);
-            if(ac != null){
-                completionSemaphore.release();
-                ac.cb.writeComplete(-1, key.ledgerId, key.entryId, ac.ctx);
-            }
-        }
-        
-        LOG.info("Finished erroring out pending add entries");
-         
-        for (Enumeration<CompletionKey> e = readCompletions.keys() ; e.hasMoreElements() ;) {
-            CompletionKey key = e.nextElement();
-            Completion<ReadEntryCallback> ac = readCompletions.remove(key);
-                
-            if(ac != null){
-                completionSemaphore.release();
-                ac.cb.readEntryComplete(-1, key.ledgerId, key.entryId, null, ac.ctx);
+
+    public void readEntry(final InetSocketAddress addr, final long ledgerId, final long entryId,
+            final ReadEntryCallback cb, final Object ctx) {
+
+        final PerChannelBookieClient client = lookupClient(addr);
+
+        client.connectIfNeededAndDoOp(new GenericCallback<Void>() {
+            @Override
+            public void operationComplete(int rc, Void result) {
+
+                if (rc != BKException.Code.OK) {
+                    cb.readEntryComplete(rc, ledgerId, entryId, null, ctx);
+                    return;
+                }
+                client.readEntry(ledgerId, entryId, cb, ctx);
             }
-        }
-        
-        LOG.info("Finished erroring out pending read entries");
+        });
     }
 
-    /**
-     * Halts client.
-     */
-    
-    public void halt() {
-        try{
-            sock.close();
-        } catch(IOException e) {
-            LOG.warn("Exception while closing socket");
-        }
-        
-        try{
-            running.acquire();
-        } catch(InterruptedException e){
-            LOG.error("Interrupted while waiting for running semaphore to acquire lock");
+    public void close(){
+        for (PerChannelBookieClient channel: channels.values()){
+            channel.close();
         }
     }
-    
-    /**
-     * Returns the status of the socket of this bookie client.
-     * 
-     * @return boolean
-     */
-    public boolean isConnected(){
-        return sock.isConnected();
-    }
 
     private static class Counter {
         int i;
         int total;
+
         synchronized void inc() {
             i++;
             total++;
         }
+
         synchronized void dec() {
             i--;
             notifyAll();
         }
+
         synchronized void wait(int limit) throws InterruptedException {
-            while(i > limit) {
+            while (i > limit) {
                 wait();
             }
         }
+
         synchronized int total() {
             return total;
         }
     }
+
     /**
      * @param args
-     * @throws IOException 
-     * @throws NumberFormatException 
-     * @throws InterruptedException 
+     * @throws IOException
+     * @throws NumberFormatException
+     * @throws InterruptedException
      */
     public static void main(String[] args) throws NumberFormatException, IOException, InterruptedException {
         if (args.length != 3) {
@@ -413,8 +149,8 @@
         }
         WriteCallback cb = new WriteCallback() {
 
-            public void writeComplete(int rc, long ledger, long entry, Object ctx) {
-                Counter counter = (Counter)ctx;
+            public void writeComplete(int rc, long ledger, long entry, InetSocketAddress addr, Object ctx) {
+                Counter counter = (Counter) ctx;
                 counter.dec();
                 if (rc != 0) {
                     System.out.println("rc = " + rc + " for " + entry + "@" + ledger);
@@ -424,18 +160,19 @@
         Counter counter = new Counter();
         byte hello[] = "hello".getBytes();
         long ledger = Long.parseLong(args[2]);
-        BookieClient bc = new BookieClient(args[0], Integer.parseInt(args[1]), 5000);
-        for(int i = 0; i < 100000; i++) {
-            ByteBuffer entry = ByteBuffer.allocate(100);
-            entry.putLong(ledger);
-            entry.putLong(i);
-            entry.putInt(0);
-            entry.put(hello);
-            entry.flip();
+        ClientSocketChannelFactory channelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors
+                .newCachedThreadPool());
+        OrderedSafeExecutor executor = new OrderedSafeExecutor(1);
+        BookieClient bc = new BookieClient(channelFactory, executor);
+        InetSocketAddress addr = new InetSocketAddress(args[0], Integer.parseInt(args[1]));
+
+        for (int i = 0; i < 100000; i++) {
             counter.inc();
-            bc.addEntry(ledger, new byte[0], i, entry, cb, counter);
+            bc.addEntry(addr, ledger, new byte[0], i, ChannelBuffers.wrappedBuffer(hello), cb, counter);
         }
         counter.wait(0);
         System.out.println("Total = " + counter.total());
+        channelFactory.releaseExternalResources();
+        executor.shutdown();
     }
 }

Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieProtocol.java?rev=903483&r1=903482&r2=903483&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieProtocol.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieProtocol.java Tue Jan 26 23:16:45 2010
@@ -1,4 +1,5 @@
 package org.apache.bookkeeper.proto;
+
 /*
  * 
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -20,33 +21,31 @@
  * 
  */
 
-
 /**
- * The packets of the Bookie protocol all have a 4-byte integer
- * indicating the type of request or response at the very beginning
- * of the packet followed by a payload.
- *
+ * The packets of the Bookie protocol all have a 4-byte integer indicating the
+ * type of request or response at the very beginning of the packet followed by a
+ * payload.
+ * 
  */
 public interface BookieProtocol {
     /**
-     * The Add entry request payload will be a ledger entry exactly
-     * as it should be logged. The response payload will be a 4-byte
-     * integer that has the error code followed by the 8-byte
-     * ledger number and 8-byte entry number of the entry written.
+     * The Add entry request payload will be a ledger entry exactly as it should
+     * be logged. The response payload will be a 4-byte integer that has the
+     * error code followed by the 8-byte ledger number and 8-byte entry number
+     * of the entry written.
      */
     public static final int ADDENTRY = 1;
     /**
-     * The Read entry request payload will be the ledger number and
-     * entry number to read. (The ledger number is an 8-byte integer
-     * and the entry number is a 8-byte integer.) The
-     * response payload will be a 4-byte integer representing an 
-     * error code and a ledger entry if the error code is EOK, otherwise
-     * it will be the 8-byte ledger number and the 4-byte entry number
-     * requested. (Note that the first sixteen bytes of the entry happen
-     * to be the ledger number and entry number as well.)
+     * The Read entry request payload will be the ledger number and entry number
+     * to read. (The ledger number is an 8-byte integer and the entry number is
+     * a 8-byte integer.) The response payload will be a 4-byte integer
+     * representing an error code and a ledger entry if the error code is EOK,
+     * otherwise it will be the 8-byte ledger number and the 4-byte entry number
+     * requested. (Note that the first sixteen bytes of the entry happen to be
+     * the ledger number and entry number as well.)
      */
     public static final int READENTRY = 2;
-    
+
     /**
      * The error code that indicates success
      */
@@ -67,10 +66,10 @@
      * General error occurred at the server
      */
     public static final int EIO = 101;
-    
+
     /**
      * Unauthorized access to ledger
      */
     public static final int EUA = 102;
-    
+
 }

Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieServer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieServer.java?rev=903483&r1=903482&r2=903483&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieServer.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieServer.java Tue Jan 26 23:16:45 2010
@@ -1,4 +1,5 @@
 package org.apache.bookkeeper.proto;
+
 /*
  * 
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -20,9 +21,9 @@
  * 
  */
 
-
 import java.io.File;
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 
 import org.apache.bookkeeper.bookie.Bookie;
@@ -30,107 +31,114 @@
 import org.apache.bookkeeper.proto.NIOServerFactory.Cnxn;
 import org.apache.log4j.Logger;
 
-
-
 /**
  * Implements the server-side part of the BookKeeper protocol.
- *
+ * 
  */
-public class BookieServer implements NIOServerFactory.PacketProcessor, WriteCallback {
+public class BookieServer implements NIOServerFactory.PacketProcessor, BookkeeperInternalCallbacks.WriteCallback {
     int port;
     NIOServerFactory nioServerFactory;
     volatile boolean down = false;
     Bookie bookie;
     static Logger LOG = Logger.getLogger(BookieServer.class);
-    
-    public BookieServer(int port, File journalDirectory, File ledgerDirectories[]) {
+
+    public BookieServer(int port, File journalDirectory, File ledgerDirectories[]) throws IOException {
         this.port = port;
         this.bookie = new Bookie(journalDirectory, ledgerDirectories);
     }
+
     public void start() throws IOException {
         nioServerFactory = new NIOServerFactory(port, this);
     }
+
     public void shutdown() throws InterruptedException {
         down = true;
         nioServerFactory.shutdown();
         bookie.shutdown();
     }
-    public boolean isDown(){
+
+    public boolean isDown() {
         return down;
     }
+
     public void join() throws InterruptedException {
         nioServerFactory.join();
     }
+
     /**
      * @param args
-     * @throws IOException 
-     * @throws InterruptedException 
+     * @throws IOException
+     * @throws InterruptedException
      */
     public static void main(String[] args) throws IOException, InterruptedException {
-    	if (args.length < 3) {
+        if (args.length < 3) {
             System.err.println("USAGE: BookieServer port journalDirectory ledgerDirectory [ledgerDirectory]*");
             return;
         }
         int port = Integer.parseInt(args[0]);
         File journalDirectory = new File(args[1]);
-        File ledgerDirectory[] = new File[args.length-2];
+        File ledgerDirectory[] = new File[args.length - 2];
         StringBuilder sb = new StringBuilder();
-        for(int i = 0; i < ledgerDirectory.length; i++) {
-            ledgerDirectory[i] = new File(args[i+2]);
+        for (int i = 0; i < ledgerDirectory.length; i++) {
+            ledgerDirectory[i] = new File(args[i + 2]);
             if (i != 0) {
                 sb.append(',');
             }
             sb.append(ledgerDirectory[i]);
         }
-        String hello = String.format("Hello, I'm your bookie, listening on port %1$s. Journals are in %2$s. Ledgers are stored in %3$s.", port, journalDirectory, sb);
+        String hello = String.format(
+                "Hello, I'm your bookie, listening on port %1$s. Journals are in %2$s. Ledgers are stored in %3$s.",
+                port, journalDirectory, sb);
         LOG.info(hello);
         BookieServer bs = new BookieServer(port, journalDirectory, ledgerDirectory);
         bs.start();
         bs.join();
     }
 
-   
     public void processPacket(ByteBuffer packet, Cnxn src) {
         int type = packet.getInt();
-        switch(type) {
+        switch (type) {
         case BookieProtocol.ADDENTRY:
             try {
                 byte[] masterKey = new byte[20];
                 packet.get(masterKey, 0, 20);
-                //LOG.debug("Master key: " + new String(masterKey));
+                // LOG.debug("Master key: " + new String(masterKey));
                 bookie.addEntry(packet.slice(), this, src, masterKey);
-            } catch(IOException e) {
-                if (LOG.isTraceEnabled()) {
-                    ByteBuffer bb = packet.duplicate();
-    
-                    long ledgerId = bb.getLong();
-                    long entryId = bb.getLong();
-                    LOG.trace("Error reading " + entryId + "@" + ledgerId, e);
-                }
-                ByteBuffer eio = ByteBuffer.allocate(8);
+            } catch (IOException e) {
+                ByteBuffer bb = packet.duplicate();
+
+                long ledgerId = bb.getLong();
+                long entryId = bb.getLong();
+                LOG.error("Error writing " + entryId + "@" + ledgerId, e);
+                ByteBuffer eio = ByteBuffer.allocate(8 + 16);
                 eio.putInt(type);
                 eio.putInt(BookieProtocol.EIO);
+                eio.putLong(ledgerId);
+                eio.putLong(entryId);
                 eio.flip();
-                src.sendResponse(new ByteBuffer[] {eio});
-            } catch(BookieException e){
+                src.sendResponse(new ByteBuffer[] { eio });
+            } catch (BookieException e) {
                 ByteBuffer bb = packet.duplicate();
                 long ledgerId = bb.getLong();
-                
+                long entryId = bb.getLong();
+
                 LOG.error("Unauthorized access to ledger " + ledgerId);
-                
-                ByteBuffer eio = ByteBuffer.allocate(8);
+
+                ByteBuffer eio = ByteBuffer.allocate(8 + 16);
                 eio.putInt(type);
                 eio.putInt(BookieProtocol.EUA);
+                eio.putLong(ledgerId);
+                eio.putLong(entryId);
                 eio.flip();
-                src.sendResponse(new ByteBuffer[] {eio});
+                src.sendResponse(new ByteBuffer[] { eio });
             }
             break;
         case BookieProtocol.READENTRY:
             ByteBuffer[] rsp = new ByteBuffer[2];
-            ByteBuffer rc = ByteBuffer.allocate(8+8+8);
+            ByteBuffer rc = ByteBuffer.allocate(8 + 8 + 8);
             rsp[0] = rc;
             rc.putInt(type);
-            
+
             long ledgerId = packet.getLong();
             long entryId = packet.getLong();
             LOG.debug("Received new read request: " + ledgerId + ", " + entryId);
@@ -138,17 +146,17 @@
                 rsp[1] = bookie.readEntry(ledgerId, entryId);
                 LOG.debug("##### Read entry ##### " + rsp[1].remaining());
                 rc.putInt(BookieProtocol.EOK);
-            } catch(Bookie.NoLedgerException e) {
+            } catch (Bookie.NoLedgerException e) {
                 if (LOG.isTraceEnabled()) {
                     LOG.error("Error reading " + entryId + "@" + ledgerId, e);
                 }
                 rc.putInt(BookieProtocol.ENOLEDGER);
-            } catch(Bookie.NoEntryException e) {
+            } catch (Bookie.NoEntryException e) {
                 if (LOG.isTraceEnabled()) {
                     LOG.error("Error reading " + entryId + "@" + ledgerId, e);
                 }
                 rc.putInt(BookieProtocol.ENOENTRY);
-            } catch(IOException e) {
+            } catch (IOException e) {
                 if (LOG.isTraceEnabled()) {
                     LOG.error("Error reading " + entryId + "@" + ledgerId, e);
                 }
@@ -178,12 +186,12 @@
             badType.putInt(type);
             badType.putInt(BookieProtocol.EBADREQ);
             badType.flip();
-            src.sendResponse(new ByteBuffer[] {packet});
+            src.sendResponse(new ByteBuffer[] { packet });
         }
     }
-    
-    public void writeComplete(int rc, long ledgerId, long entryId, Object ctx) {
-        Cnxn src = (Cnxn)ctx;
+
+    public void writeComplete(int rc, long ledgerId, long entryId, InetSocketAddress addr, Object ctx) {
+        Cnxn src = (Cnxn) ctx;
         ByteBuffer bb = ByteBuffer.allocate(24);
         bb.putInt(BookieProtocol.ADDENTRY);
         bb.putInt(rc);
@@ -193,7 +201,7 @@
         if (LOG.isTraceEnabled()) {
             LOG.trace("Add entry rc = " + rc + " for " + entryId + "@" + ledgerId);
         }
-        src.sendResponse(new ByteBuffer[] {bb});
+        src.sendResponse(new ByteBuffer[] { bb });
     }
-
+    
 }

Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java?rev=903483&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java Tue Jan 26 23:16:45 2010
@@ -0,0 +1,57 @@
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+package org.apache.bookkeeper.proto;
+
+import java.net.InetSocketAddress;
+import org.jboss.netty.buffer.ChannelBuffer;
+
+/**
+ * Declaration of a callback interfaces used in bookkeeper client library but
+ * not exposed to the client application.
+ */
+
+public class BookkeeperInternalCallbacks {
+    /**
+     * Callback for calls from BookieClient objects. Such calls are for replies
+     * of write operations (operations to add an entry to a ledger).
+     * 
+     */
+
+    public interface WriteCallback {
+        void writeComplete(int rc, long ledgerId, long entryId, InetSocketAddress addr, Object ctx);
+    }
+
+    public interface GenericCallback<T> {
+        void operationComplete(int rc, T result);
+    }
+    
+    /**
+     * Declaration of a callback implementation for calls from BookieClient objects.
+     * Such calls are for replies of read operations (operations to read an entry
+     * from a ledger).
+     * 
+     */
+
+    public interface ReadEntryCallback {
+        void readEntryComplete(int rc, long ledgerId, long entryId, ChannelBuffer buffer, Object ctx);
+    }
+}

Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/NIOServerFactory.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/NIOServerFactory.java?rev=903483&r1=903482&r2=903483&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/NIOServerFactory.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/NIOServerFactory.java Tue Jan 26 23:16:45 2010
@@ -45,8 +45,9 @@
     public interface PacketProcessor {
         public void processPacket(ByteBuffer packet, Cnxn src);
     }
-    ServerStats stats = new ServerStats();
     
+    ServerStats stats = new ServerStats();
+
     Logger LOG = Logger.getLogger(NIOServerFactory.class);
 
     ServerSocketChannel ss;
@@ -89,6 +90,7 @@
         }
     }
 
+    @Override
     public void run() {
         while (!ss.socket().isClosed()) {
             try {
@@ -97,16 +99,13 @@
                 synchronized (this) {
                     selected = selector.selectedKeys();
                 }
-                ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(
-                        selected);
+                ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(selected);
                 Collections.shuffle(selectedList);
                 for (SelectionKey k : selectedList) {
                     if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
-                        SocketChannel sc = ((ServerSocketChannel) k.channel())
-                                .accept();
+                        SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();
                         sc.configureBlocking(false);
-                        SelectionKey sk = sc.register(selector,
-                                SelectionKey.OP_READ);
+                        SelectionKey sk = sc.register(selector, SelectionKey.OP_READ);
                         Cnxn cnxn = new Cnxn(sc, sk);
                         sk.attach(cnxn);
                         addCnxn(cnxn);
@@ -167,7 +166,7 @@
     public class Cnxn {
 
         private SocketChannel sock;
-        
+
         private SelectionKey sk;
 
         boolean initialized;
@@ -183,7 +182,7 @@
         int packetsSent;
 
         int packetsReceived;
-        
+
         void doIO(SelectionKey k) throws InterruptedException {
             try {
                 if (sock == null) {
@@ -233,8 +232,7 @@
                                  * be copied, so we've got to slice the buffer
                                  * if it's too big.
                                  */
-                                b = (ByteBuffer) b.slice().limit(
-                                        directBuffer.remaining());
+                                b = (ByteBuffer) b.slice().limit(directBuffer.remaining());
                             }
                             /*
                              * put() is going to modify the positions of both
@@ -286,15 +284,12 @@
                     }
                     synchronized (this) {
                         if (outgoingBuffers.size() == 0) {
-                            if (!initialized
-                                    && (sk.interestOps() & SelectionKey.OP_READ) == 0) {
+                            if (!initialized && (sk.interestOps() & SelectionKey.OP_READ) == 0) {
                                 throw new IOException("Responded to info probe");
                             }
-                            sk.interestOps(sk.interestOps()
-                                    & (~SelectionKey.OP_WRITE));
+                            sk.interestOps(sk.interestOps() & (~SelectionKey.OP_WRITE));
                         } else {
-                            sk.interestOps(sk.interestOps()
-                                    | SelectionKey.OP_WRITE);
+                            sk.interestOps(sk.interestOps() | SelectionKey.OP_WRITE);
                         }
                     }
                 }
@@ -349,9 +344,8 @@
         }
 
         String peerName;
-        
-        public Cnxn(SocketChannel sock, SelectionKey sk)
-                throws IOException {
+
+        public Cnxn(SocketChannel sock, SelectionKey sk) throws IOException {
             this.sock = sock;
             this.sk = sk;
             sock.socket().setTcpNoDelay(true);
@@ -360,14 +354,14 @@
             if (LOG.isTraceEnabled()) {
                 peerName = sock.socket().toString();
             }
-            
+
             lenBuffer.clear();
             incomingBuffer = lenBuffer;
         }
 
+        @Override
         public String toString() {
-            return "NIOServerCnxn object with sock = " + sock + " and sk = "
-                    + sk;
+            return "NIOServerCnxn object with sock = " + sock + " and sk = " + sk;
         }
 
         boolean closed;
@@ -437,11 +431,11 @@
                 throw e;
             }
         }
-        
+
         private void sendBuffers(ByteBuffer bb[]) {
             ByteBuffer len = ByteBuffer.allocate(4);
             int total = 0;
-            for(int i = 0; i < bb.length; i++) {
+            for (int i = 0; i < bb.length; i++) {
                 if (bb[i] != null) {
                     total += bb[i].remaining();
                 }
@@ -452,14 +446,14 @@
             len.putInt(total);
             len.flip();
             outgoingBuffers.add(len);
-            for(int i = 0; i < bb.length; i++) {
+            for (int i = 0; i < bb.length; i++) {
                 if (bb[i] != null) {
                     outgoingBuffers.add(bb[i]);
                 }
             }
             makeWritable(sk);
         }
-        
+
         synchronized public void sendResponse(ByteBuffer bb[]) {
             if (closed) {
                 return;
@@ -485,8 +479,8 @@
             long packetsSent;
 
             /**
-             * The number of requests that have been submitted but not yet responded
-             * to.
+             * The number of requests that have been submitted but not yet
+             * responded to.
              */
             public long getOutstandingRequests() {
                 return outstandingRequests;
@@ -500,19 +494,15 @@
                 return packetsSent;
             }
 
+            @Override
             public String toString() {
                 StringBuilder sb = new StringBuilder();
                 Channel channel = sk.channel();
                 if (channel instanceof SocketChannel) {
-                    sb.append(" ").append(
-                            ((SocketChannel) channel).socket()
-                                    .getRemoteSocketAddress()).append("[")
-                            .append(Integer.toHexString(sk.interestOps()))
-                            .append("](queued=").append(
-                                    getOutstandingRequests())
-                            .append(",recved=").append(getPacketsReceived())
-                            .append(",sent=").append(getPacketsSent()).append(
-                                    ")\n");
+                    sb.append(" ").append(((SocketChannel) channel).socket().getRemoteSocketAddress()).append("[")
+                            .append(Integer.toHexString(sk.interestOps())).append("](queued=").append(
+                                    getOutstandingRequests()).append(",recved=").append(getPacketsReceived()).append(
+                                    ",sent=").append(getPacketsSent()).append(")\n");
                 }
                 return sb.toString();
             }

Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java?rev=903483&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java Tue Jan 26 23:16:45 2010
@@ -0,0 +1,570 @@
+package org.apache.bookkeeper.proto;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayDeque;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
+import org.apache.bookkeeper.util.OrderedSafeExecutor;
+import org.apache.bookkeeper.util.SafeRunnable;
+import org.apache.log4j.Logger;
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineCoverage;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelHandler;
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+import org.jboss.netty.handler.codec.frame.CorruptedFrameException;
+import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
+import org.jboss.netty.handler.codec.frame.TooLongFrameException;
+
+/**
+ * This class manages all details of connection to a particular bookie. It also
+ * has reconnect logic if a connection to a bookie fails.
+ * 
+ */
+
+@ChannelPipelineCoverage("one")
+public class PerChannelBookieClient extends SimpleChannelHandler implements ChannelPipelineFactory {
+
+    static final Logger LOG = Logger.getLogger(PerChannelBookieClient.class);
+
+    static final long maxMemory = Runtime.getRuntime().maxMemory() / 5;
+    public static int MAX_FRAME_LENGTH = 2 * 1024 * 1024; // 2M
+
+    InetSocketAddress addr;
+    boolean connected = false;
+    AtomicLong totalBytesOutstanding;
+    ClientSocketChannelFactory channelFactory;
+    OrderedSafeExecutor executor;
+
+    ConcurrentHashMap<CompletionKey, AddCompletion> addCompletions = new ConcurrentHashMap<CompletionKey, AddCompletion>();
+    ConcurrentHashMap<CompletionKey, ReadCompletion> readCompletions = new ConcurrentHashMap<CompletionKey, ReadCompletion>();
+
+    /**
+     * The following member variables do not need to be concurrent, or volatile
+     * because they are always updated under a lock
+     */
+    Queue<GenericCallback<Void>> pendingOps = new ArrayDeque<GenericCallback<Void>>();
+    boolean connectionAttemptInProgress;
+    Channel channel = null;
+
+    public PerChannelBookieClient(OrderedSafeExecutor executor, ClientSocketChannelFactory channelFactory,
+            InetSocketAddress addr, AtomicLong totalBytesOutstanding) {
+        this.addr = addr;
+        this.executor = executor;
+        this.totalBytesOutstanding = totalBytesOutstanding;
+        this.channelFactory = channelFactory;
+        connect(channelFactory);
+    }
+
+    void connect(ChannelFactory channelFactory) {
+
+        if (LOG.isDebugEnabled())
+            LOG.debug("Connecting to bookie: " + addr);
+
+        // Set up the ClientBootStrap so we can create a new Channel connection
+        // to the bookie.
+        ClientBootstrap bootstrap = new ClientBootstrap(channelFactory);
+        bootstrap.setPipelineFactory(this);
+        bootstrap.setOption("tcpNoDelay", true);
+        bootstrap.setOption("keepAlive", true);
+
+        // Start the connection attempt to the input server host.
+        connectionAttemptInProgress = true;
+
+        ChannelFuture future = bootstrap.connect(addr);
+
+        future.addListener(new ChannelFutureListener() {
+            @Override
+            public void operationComplete(ChannelFuture future) throws Exception {
+                int rc;
+                Queue<GenericCallback<Void>> oldPendingOps;
+
+                synchronized (PerChannelBookieClient.this) {
+
+                    if (future.isSuccess()) {
+                        LOG.info("Successfully connected to bookie: " + addr);
+                        rc = BKException.Code.OK;
+                        channel = future.getChannel();
+                        connected = true;
+                    } else {
+                        LOG.error("Could not connect to bookie: " + addr);
+                        rc = BKException.Code.BookieHandleNotAvailableException;
+                        channel = null;
+                        connected = false;
+                    }
+
+                    connectionAttemptInProgress = false;
+                    PerChannelBookieClient.this.channel = channel;
+
+                    // trick to not do operations under the lock, take the list
+                    // of pending ops and assign it to a new variable, while
+                    // emptying the pending ops by just assigning it to a new
+                    // list
+                    oldPendingOps = pendingOps;
+                    pendingOps = new ArrayDeque<GenericCallback<Void>>();
+                }
+
+                for (GenericCallback<Void> pendingOp : oldPendingOps) {
+                    pendingOp.operationComplete(rc, null);
+                }
+
+            }
+        });
+    }
+
+    void connectIfNeededAndDoOp(GenericCallback<Void> op) {
+        boolean doOpNow;
+
+        // common case without lock first
+        if (channel != null && connected) {
+            doOpNow = true;
+        } else {
+
+            synchronized (this) {
+                // check again under lock
+                if (channel != null && connected) {
+                    doOpNow = true;
+                } else {
+
+                    // if reached here, channel is either null (first connection
+                    // attempt),
+                    // or the channel is disconnected
+                    doOpNow = false;
+
+                    // connection attempt is still in progress, queue up this
+                    // op. Op will be executed when connection attempt either
+                    // fails
+                    // or
+                    // succeeds
+                    pendingOps.add(op);
+
+                    if (!connectionAttemptInProgress) {
+                        connect(channelFactory);
+                    }
+
+                }
+            }
+        }
+
+        if (doOpNow) {
+            op.operationComplete(BKException.Code.OK, null);
+        }
+
+    }
+
+    /**
+     * This method should be called only after connection has been checked for
+     * {@link #connectIfNeededAndDoOp(GenericCallback)}
+     * 
+     * @param ledgerId
+     * @param masterKey
+     * @param entryId
+     * @param lastConfirmed
+     * @param macCode
+     * @param data
+     * @param cb
+     * @param ctx
+     */
+    void addEntry(final long ledgerId, byte[] masterKey, final long entryId, ChannelBuffer toSend, WriteCallback cb,
+            Object ctx) {
+
+        final int entrySize = toSend.readableBytes();
+        // if (totalBytesOutstanding.get() > maxMemory) {
+        // // TODO: how to throttle, throw an exception, or call the callback?
+        // // Maybe this should be done at the layer above?
+        // }
+
+        final CompletionKey completionKey = new CompletionKey(ledgerId, entryId);
+
+        addCompletions.put(completionKey, new AddCompletion(cb, entrySize, ctx));
+
+        int totalHeaderSize = 4 // for the length of the packet
+        + 4 // for the type of request
+        + masterKey.length; // for the master key
+
+        ChannelBuffer header = channel.getConfig().getBufferFactory().getBuffer(totalHeaderSize);
+        header.writeInt(totalHeaderSize - 4 + entrySize);
+        header.writeInt(BookieProtocol.ADDENTRY);
+        header.writeBytes(masterKey);
+
+        ChannelBuffer wrappedBuffer = ChannelBuffers.wrappedBuffer(header, toSend);
+
+        ChannelFuture future = channel.write(wrappedBuffer);
+        future.addListener(new ChannelFutureListener() {
+            @Override
+            public void operationComplete(ChannelFuture future) throws Exception {
+                if (future.isSuccess()) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Successfully wrote request for adding entry: " + entryId + " ledger-id: " + ledgerId
+                                + " bookie: " + channel.getRemoteAddress() + " entry length: " + entrySize);
+                    }
+                    // totalBytesOutstanding.addAndGet(entrySize);
+                } else {
+                    errorOutAddKey(completionKey);
+                }
+            }
+        });
+
+    }
+
+    public void readEntry(final long ledgerId, final long entryId, ReadEntryCallback cb, Object ctx) {
+
+        final CompletionKey key = new CompletionKey(ledgerId, entryId);
+        readCompletions.put(key, new ReadCompletion(cb, ctx));
+
+        int totalHeaderSize = 4 // for the length of the packet
+        + 4 // for request type
+        + 8 // for ledgerId
+        + 8; // for entryId
+
+        ChannelBuffer tmpEntry = channel.getConfig().getBufferFactory().getBuffer(totalHeaderSize);
+        tmpEntry.writeInt(totalHeaderSize - 4);
+        tmpEntry.writeInt(BookieProtocol.READENTRY);
+        tmpEntry.writeLong(ledgerId);
+        tmpEntry.writeLong(entryId);
+
+        ChannelFuture future = channel.write(tmpEntry);
+        future.addListener(new ChannelFutureListener() {
+            @Override
+            public void operationComplete(ChannelFuture future) throws Exception {
+                if (future.isSuccess()) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Successfully wrote request for reading entry: " + entryId + " ledger-id: "
+                                + ledgerId + " bookie: " + channel.getRemoteAddress());
+                    }
+                } else {
+                    errorOutReadKey(key);
+                }
+            }
+        });
+
+    }
+
+    public void close() {
+        if (channel != null) {
+            channel.close();
+        }
+    }
+
+    void errorOutReadKey(final CompletionKey key) {
+        executor.submitOrdered(key.ledgerId, new SafeRunnable() {
+            @Override
+            public void safeRun() {
+
+                ReadCompletion readCompletion = readCompletions.remove(key);
+
+                if (readCompletion != null) {
+                    LOG.error("Could not write  request for reading entry: " + key.entryId + " ledger-id: "
+                            + key.ledgerId + " bookie: " + channel.getRemoteAddress());
+
+                    readCompletion.cb.readEntryComplete(BKException.Code.BookieHandleNotAvailableException,
+                            key.ledgerId, key.entryId, null, readCompletion.ctx);
+                }
+            }
+
+        });
+    }
+
+    void errorOutAddKey(final CompletionKey key) {
+        executor.submitOrdered(key.ledgerId, new SafeRunnable() {
+            @Override
+            public void safeRun() {
+
+                AddCompletion addCompletion = addCompletions.remove(key);
+
+                if (addCompletion != null) {
+                    String bAddress = "null";
+                    if(channel != null)
+                        bAddress = channel.getRemoteAddress().toString();
+                    LOG.error("Could not write request for adding entry: " + key.entryId + " ledger-id: "
+                            + key.ledgerId + " bookie: " + bAddress);
+
+                    addCompletion.cb.writeComplete(BKException.Code.BookieHandleNotAvailableException, key.ledgerId,
+                            key.entryId, addr, addCompletion.ctx);
+                    LOG.error("Invoked callback method: " + key.entryId);
+                }
+            }
+
+        });
+
+    }
+
+    /**
+     * Errors out pending entries. We call this method from one thread to avoid
+     * concurrent executions to QuorumOpMonitor (implements callbacks). It seems
+     * simpler to call it from BookieHandle instead of calling directly from
+     * here.
+     */
+
+    void errorOutOutstandingEntries() {
+
+        // DO NOT rewrite these using Map.Entry iterations. We want to iterate
+        // on keys and see if we are successfully able to remove the key from
+        // the map. Because the add and the read methods also do the same thing
+        // in case they get a write failure on the socket. The one who
+        // successfully removes the key from the map is the one responsible for
+        // calling the application callback.
+
+        for (CompletionKey key : addCompletions.keySet()) {
+            errorOutAddKey(key);
+        }
+
+        for (CompletionKey key : readCompletions.keySet()) {
+            errorOutReadKey(key);
+        }
+    }
+
+    /**
+     * In the netty pipeline, we need to split packets based on length, so we
+     * use the {@link LengthFieldBasedFrameDecoder}. Other than that all actions
+     * are carried out in this class, e.g., making sense of received messages,
+     * prepending the length to outgoing packets etc.
+     */
+    @Override
+    public ChannelPipeline getPipeline() throws Exception {
+        ChannelPipeline pipeline = Channels.pipeline();
+        pipeline.addLast("lengthbasedframedecoder", new LengthFieldBasedFrameDecoder(MAX_FRAME_LENGTH, 0, 4, 0, 4));
+        pipeline.addLast("mainhandler", this);
+        return pipeline;
+    }
+
+    /**
+     * If our channel has disconnected, we just error out the pending entries
+     */
+    @Override
+    public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
+        LOG.info("Disconnected from bookie: " + addr);
+    	errorOutOutstandingEntries();
+        channel.close();
+
+        connected = false;
+
+        // we don't want to reconnect right away. If someone sends a request to
+        // this address, we will reconnect.
+    }
+
+    /**
+     * Called by netty when an exception happens in one of the netty threads
+     * (mostly due to what we do in the netty threads)
+     */
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
+        Throwable t = e.getCause();
+        if (t instanceof CorruptedFrameException || t instanceof TooLongFrameException) {
+            LOG.error("Corrupted fram recieved from bookie: " + e.getChannel().getRemoteAddress());
+            return;
+        }
+        if (t instanceof IOException) {
+            // these are thrown when a bookie fails, logging them just pollutes
+            // the logs (the failure is logged from the listeners on the write
+            // operation), so I'll just ignore it here.
+            return;
+        }
+
+        LOG.fatal("Unexpected exception caught by bookie client channel handler", t);
+        // Since we are a library, cant terminate App here, can we?
+    }
+
+    /**
+     * Called by netty when a message is received on a channel
+     */
+    @Override
+    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
+        if (!(e.getMessage() instanceof ChannelBuffer)) {
+            ctx.sendUpstream(e);
+            return;
+        }
+
+        final ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
+        final int type, rc;
+        final long ledgerId, entryId;
+
+        try {
+            type = buffer.readInt();
+            rc = buffer.readInt();
+            ledgerId = buffer.readLong();
+            entryId = buffer.readLong();
+        } catch (IndexOutOfBoundsException ex) {
+            LOG.error("Unparseable response from bookie: " + addr, ex);
+            return;
+        }
+
+        executor.submitOrdered(ledgerId, new SafeRunnable() {
+            @Override
+            public void safeRun() {
+                switch (type) {
+                case BookieProtocol.ADDENTRY:
+                    handleAddResponse(ledgerId, entryId, rc);
+                    break;
+                case BookieProtocol.READENTRY:
+                    handleReadResponse(ledgerId, entryId, rc, buffer);
+                    break;
+                default:
+                    LOG.error("Unexpected response, type: " + type + " recieved from bookie: " + addr + " , ignoring");
+                }
+            }
+
+        });
+    }
+
+    void handleAddResponse(long ledgerId, long entryId, int rc) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Got response for add request from bookie: " + addr + " for ledger: " + ledgerId + " entry: "
+                    + entryId + " rc: " + rc);
+        }
+
+        // convert to BKException code because thats what the uppper
+        // layers expect. This is UGLY, there should just be one set of
+        // error codes.
+        if (rc != BookieProtocol.EOK) {
+            LOG.error("Add for ledger: " + ledgerId + ", entry: " + entryId + " failed on bookie: " + addr
+                    + " with code: " + rc);
+            rc = BKException.Code.WriteException;
+        } else {
+            rc = BKException.Code.OK;
+        }
+
+        AddCompletion ac;
+        ac = addCompletions.remove(new CompletionKey(ledgerId, entryId));
+        if (ac == null) {
+            LOG.error("Unexpected add response received from bookie: " + addr + " for ledger: " + ledgerId
+                    + ", entry: " + entryId + " , ignoring");
+            return;
+        }
+
+        // totalBytesOutstanding.addAndGet(-ac.size);
+
+        ac.cb.writeComplete(rc, ledgerId, entryId, addr, ac.ctx);
+
+    }
+
+    void handleReadResponse(long ledgerId, long entryId, int rc, ChannelBuffer buffer) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Got response for read request from bookie: " + addr + " for ledger: " + ledgerId + " entry: "
+                    + entryId + " rc: " + rc + "entry length: " + buffer.readableBytes());
+        }
+
+        // convert to BKException code because thats what the uppper
+        // layers expect. This is UGLY, there should just be one set of
+        // error codes.
+        if (rc == BookieProtocol.EOK) {
+            rc = BKException.Code.OK;
+        } else if (rc == BookieProtocol.ENOENTRY || rc == BookieProtocol.ENOLEDGER) {
+            rc = BKException.Code.NoSuchEntryException;
+        } else {
+            LOG.error("Read for ledger: " + ledgerId + ", entry: " + entryId + " failed on bookie: " + addr
+                    + " with code: " + rc);
+            rc = BKException.Code.ReadException;
+        }
+
+        CompletionKey key = new CompletionKey(ledgerId, entryId);
+        ReadCompletion readCompletion = readCompletions.remove(key);
+
+        if (readCompletion == null) {
+            /*
+             * This is a special case. When recovering a ledger, a client
+             * submits a read request with id -1, and receives a response with a
+             * different entry id.
+             */
+            readCompletion = readCompletions.remove(new CompletionKey(ledgerId, -1));
+        }
+
+        if (readCompletion == null) {
+            LOG.error("Unexpected read response recieved from bookie: " + addr + " for ledger: " + ledgerId
+                    + ", entry: " + entryId + " , ignoring");
+            return;
+        }
+
+        readCompletion.cb.readEntryComplete(rc, ledgerId, entryId, buffer.slice(), readCompletion.ctx);
+    }
+
+    /**
+     * Boiler-plate wrapper classes follow
+     * 
+     */
+
+    private static class ReadCompletion {
+        final ReadEntryCallback cb;
+        final Object ctx;
+
+        public ReadCompletion(ReadEntryCallback cb, Object ctx) {
+            this.cb = cb;
+            this.ctx = ctx;
+        }
+    }
+
+    private static class AddCompletion {
+        final WriteCallback cb;
+        //final long size;
+        final Object ctx;
+
+        public AddCompletion(WriteCallback cb, long size, Object ctx) {
+            this.cb = cb;
+            //this.size = size;
+            this.ctx = ctx;
+        }
+    }
+
+    private static class CompletionKey {
+        long ledgerId;
+        long entryId;
+
+        CompletionKey(long ledgerId, long entryId) {
+            this.ledgerId = ledgerId;
+            this.entryId = entryId;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (!(obj instanceof CompletionKey) || obj == null) {
+                return false;
+            }
+            CompletionKey that = (CompletionKey) obj;
+            return this.ledgerId == that.ledgerId && this.entryId == that.entryId;
+        }
+
+        @Override
+        public int hashCode() {
+            return ((int) ledgerId << 16) ^ ((int) entryId);
+        }
+
+    }
+
+}

Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/ServerStats.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/ServerStats.java?rev=903483&r1=903482&r2=903483&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/ServerStats.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/ServerStats.java Tue Jan 26 23:16:45 2010
@@ -14,45 +14,51 @@
 
 package org.apache.bookkeeper.proto;
 
-
 public class ServerStats {
-    private static ServerStats instance= new ServerStats();
+    private static ServerStats instance = new ServerStats();
     private long packetsSent;
     private long packetsReceived;
     private long maxLatency;
     private long minLatency = Long.MAX_VALUE;
     private long totalLatency = 0;
     private long count = 0;
-    
-    public interface Provider{
+
+    public interface Provider {
         public long getOutstandingRequests();
+
         public long getLastProcessedZxid();
     }
-    private Provider provider=null;
-    private Object mutex=new Object();
-    
-    static public ServerStats getInstance(){
+
+    private Provider provider = null;
+    private Object mutex = new Object();
+
+    static public ServerStats getInstance() {
         return instance;
     }
+
     static public void registerAsConcrete() {
         setInstance(new ServerStats());
     }
+
     static synchronized public void unregister() {
-        instance=null;
+        instance = null;
     }
-    static synchronized protected void setInstance(ServerStats newInstance){
-        assert instance==null;
+
+    static synchronized protected void setInstance(ServerStats newInstance) {
+        assert instance == null;
         instance = newInstance;
     }
-    protected ServerStats(){}
-    
+
+    protected ServerStats() {
+    }
+
     // getters
     synchronized public long getMinLatency() {
         return (minLatency == Long.MAX_VALUE) ? 0 : minLatency;
     }
 
     synchronized public long getAvgLatency() {
-        if(count!=0)
+        if (count != 0)
             return totalLatency / count;
         return 0;
     }
@@ -62,15 +68,17 @@
     }
 
     public long getOutstandingRequests() {
-        synchronized(mutex){
-            return (provider!=null)?provider.getOutstandingRequests():-1;
+        synchronized (mutex) {
+            return (provider != null) ? provider.getOutstandingRequests() : -1;
         }
     }
-    public long getLastProcessedZxid(){
-        synchronized(mutex){
-            return (provider!=null)?provider.getLastProcessedZxid():-1;
+
+    public long getLastProcessedZxid() {
+        synchronized (mutex) {
+            return (provider != null) ? provider.getLastProcessedZxid() : -1;
         }
     }
+
     synchronized public long getPacketsReceived() {
         return packetsReceived;
     }
@@ -79,29 +87,31 @@
         return packetsSent;
     }
 
-    public String getServerState(){
+    public String getServerState() {
         return "standalone";
     }
-    
-    public String toString(){
+
+    @Override
+    public String toString() {
         StringBuilder sb = new StringBuilder();
-        sb.append("Latency min/avg/max: " + getMinLatency() + "/"
-                + getAvgLatency() + "/" + getMaxLatency() + "\n");
+        sb.append("Latency min/avg/max: " + getMinLatency() + "/" + getAvgLatency() + "/" + getMaxLatency() + "\n");
         sb.append("Received: " + getPacketsReceived() + "\n");
         sb.append("Sent: " + getPacketsSent() + "\n");
         if (provider != null) {
             sb.append("Outstanding: " + getOutstandingRequests() + "\n");
-            sb.append("Zxid: 0x"+ Long.toHexString(getLastProcessedZxid())+ "\n");
+            sb.append("Zxid: 0x" + Long.toHexString(getLastProcessedZxid()) + "\n");
         }
-        sb.append("Mode: "+getServerState()+"\n");
+        sb.append("Mode: " + getServerState() + "\n");
         return sb.toString();
     }
+
     // mutators
-    public void setStatsProvider(Provider zk){
-        synchronized(mutex){
-            provider=zk;
+    public void setStatsProvider(Provider zk) {
+        synchronized (mutex) {
+            provider = zk;
         }
     }
+
     synchronized void updateLatency(long requestCreateTime) {
         long latency = System.currentTimeMillis() - requestCreateTime;
         totalLatency += latency;
@@ -113,21 +123,26 @@
             maxLatency = latency;
         }
     }
-    synchronized public void resetLatency(){
-        totalLatency=count=maxLatency=0;
-        minLatency=Long.MAX_VALUE;
+
+    synchronized public void resetLatency() {
+        totalLatency = count = maxLatency = 0;
+        minLatency = Long.MAX_VALUE;
     }
-    synchronized public void resetMaxLatency(){
-        maxLatency=getMinLatency();
+
+    synchronized public void resetMaxLatency() {
+        maxLatency = getMinLatency();
     }
+
     synchronized public void incrementPacketsReceived() {
         packetsReceived++;
     }
+
     synchronized public void incrementPacketsSent() {
         packetsSent++;
     }
-    synchronized public void resetRequestCounters(){
-        packetsReceived=packetsSent=0;
+
+    synchronized public void resetRequestCounters() {
+        packetsReceived = packetsSent = 0;
     }
 
 }

Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/streaming/LedgerInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/streaming/LedgerInputStream.java?rev=903483&r1=903482&r2=903483&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/streaming/LedgerInputStream.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/streaming/LedgerInputStream.java Tue Jan 26 23:16:45 2010
@@ -23,11 +23,11 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
+import java.util.Enumeration;
 
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.LedgerEntry;
 import org.apache.bookkeeper.client.LedgerHandle;
-import org.apache.bookkeeper.client.LedgerSequence;
 import org.apache.log4j.Logger;
 
 public class LedgerInputStream extends InputStream {
@@ -35,14 +35,16 @@
     private LedgerHandle lh;
     private ByteBuffer bytebuff;
     byte[] bbytes;
-    long lastEntry =0;
+    long lastEntry = 0;
     int increment = 50;
     int defaultSize = 1024 * 1024; // 1MB default size
-    LedgerSequence ledgerSeq = null;
-    
+    Enumeration<LedgerEntry> ledgerSeq = null;
+
     /**
      * construct a outputstream from a ledger handle
-     * @param lh ledger handle
+     * 
+     * @param lh
+     *            ledger handle
      * @throws {@link BKException}, {@link InterruptedException}
      */
     public LedgerInputStream(LedgerHandle lh) throws BKException, InterruptedException {
@@ -50,14 +52,17 @@
         bbytes = new byte[defaultSize];
         this.bytebuff = ByteBuffer.wrap(bbytes);
         this.bytebuff.position(this.bytebuff.limit());
-        lastEntry = Math.max(lh.getLast(), increment);
+        lastEntry = Math.min(lh.getLastAddConfirmed(), increment);
         ledgerSeq = lh.readEntries(0, lastEntry);
     }
 
     /**
      * construct a outputstream from a ledger handle
-     * @param lh the ledger handle
-     * @param size the size of the buffer
+     * 
+     * @param lh
+     *            the ledger handle
+     * @param size
+     *            the size of the buffer
      * @throws {@link BKException}, {@link InterruptedException}
      */
     public LedgerInputStream(LedgerHandle lh, int size) throws BKException, InterruptedException {
@@ -65,38 +70,37 @@
         bbytes = new byte[size];
         this.bytebuff = ByteBuffer.wrap(bbytes);
         this.bytebuff.position(this.bytebuff.limit());
-        lastEntry = Math.max(lh.getLast(), increment);
+        lastEntry = Math.min(lh.getLastAddConfirmed(), increment);
         ledgerSeq = lh.readEntries(0, lastEntry);
     }
-    
-    
+
     @Override
     public void close() {
         // do nothing
-        // let the applciation
+        // let the application
         // close the ledger
     }
-    
+
     /**
-     * refill the buffer, we 
-     * need to read more bytes
+     * refill the buffer, we need to read more bytes
+     * 
      * @return if we can refill or not
      */
     private synchronized boolean refill() throws IOException {
         bytebuff.clear();
-        if (!ledgerSeq.hasMoreElements() && lastEntry >= lh.getLast()) {
+        if (!ledgerSeq.hasMoreElements() && lastEntry >= lh.getLastAddConfirmed()) {
             return false;
         }
         if (!ledgerSeq.hasMoreElements()) {
-            //do refill 
-            long last = Math.max( lastEntry + increment, lh.getLast());
+            // do refill
+            long last = Math.min(lastEntry + increment, lh.getLastAddConfirmed());
             try {
                 ledgerSeq = lh.readEntries(lastEntry + 1, last);
-            } catch(BKException bk) {
+            } catch (BKException bk) {
                 IOException ie = new IOException(bk.getMessage());
                 ie.initCause(bk);
                 throw ie;
-            } catch(InterruptedException ie) {
+            } catch (InterruptedException ie) {
                 Thread.currentThread().interrupt();
             }
             lastEntry = last;
@@ -106,7 +110,7 @@
         bytebuff = ByteBuffer.wrap(bbytes);
         return true;
     }
-    
+
     @Override
     public synchronized int read() throws IOException {
         boolean toread = true;
@@ -120,10 +124,10 @@
         }
         return -1;
     }
-    
+
     @Override
     public synchronized int read(byte[] b) throws IOException {
-        // be smart ... just copy the bytes 
+        // be smart ... just copy the bytes
         // once and return the size
         // user will call it again
         boolean toread = true;
@@ -133,19 +137,19 @@
         if (toread) {
             int bcopied = bytebuff.remaining();
             int tocopy = Math.min(bcopied, b.length);
-            //cannot used gets because of
+            // cannot used gets because of
             // the underflow/overflow exceptions
-            System.arraycopy(bbytes, bytebuff.position(), b,0, tocopy);
+            System.arraycopy(bbytes, bytebuff.position(), b, 0, tocopy);
             bytebuff.position(bytebuff.position() + tocopy);
             return tocopy;
         }
         return -1;
     }
-    
+
     @Override
     public synchronized int read(byte[] b, int off, int len) throws IOException {
-        //again dont need ot fully
-        // fill b, just return 
+        // again dont need ot fully
+        // fill b, just return
         // what we have and let the application call read
         // again
         boolean toread = true;

Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/streaming/LedgerOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/streaming/LedgerOutputStream.java?rev=903483&r1=903482&r2=903483&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/streaming/LedgerOutputStream.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/streaming/LedgerOutputStream.java Tue Jan 26 23:16:45 2010
@@ -28,15 +28,11 @@
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.log4j.Logger;
 
-
 /**
- * this class provides a streaming api 
- * to get an output stream from a ledger
- * handle and write to it as a stream of 
- * bytes. This is built on top of ledgerhandle
- * api and uses a buffer to cache the data
- * written to it and writes out the entry 
- * to the ledger.
+ * this class provides a streaming api to get an output stream from a ledger
+ * handle and write to it as a stream of bytes. This is built on top of
+ * ledgerhandle api and uses a buffer to cache the data written to it and writes
+ * out the entry to the ledger.
  */
 public class LedgerOutputStream extends OutputStream {
     Logger LOG = Logger.getLogger(LedgerOutputStream.class);
@@ -44,62 +40,66 @@
     private ByteBuffer bytebuff;
     byte[] bbytes;
     int defaultSize = 1024 * 1024; // 1MB default size
-    
+
     /**
      * construct a outputstream from a ledger handle
-     * @param lh ledger handle
+     * 
+     * @param lh
+     *            ledger handle
      */
     public LedgerOutputStream(LedgerHandle lh) {
         this.lh = lh;
         bbytes = new byte[defaultSize];
         this.bytebuff = ByteBuffer.wrap(bbytes);
     }
-    
+
     /**
      * construct a outputstream from a ledger handle
-     * @param lh the ledger handle
-     * @param size the size of the buffer
+     * 
+     * @param lh
+     *            the ledger handle
+     * @param size
+     *            the size of the buffer
      */
     public LedgerOutputStream(LedgerHandle lh, int size) {
         this.lh = lh;
         bbytes = new byte[size];
         this.bytebuff = ByteBuffer.wrap(bbytes);
     }
-    
+
     @Override
     public void close() {
-        //flush everything
+        // flush everything
         // we have
         flush();
     }
-    
+
     @Override
     public synchronized void flush() {
-        // lets flush all the data 
+        // lets flush all the data
         // into the ledger entry
         if (bytebuff.position() > 0) {
-            //copy the bytes into 
+            // copy the bytes into
             // a new byte buffer and send it out
             byte[] b = new byte[bytebuff.position()];
             LOG.info("Comment: flushing with params " + " " + bytebuff.position());
             System.arraycopy(bbytes, 0, b, 0, bytebuff.position());
             try {
                 lh.addEntry(b);
-            } catch(InterruptedException ie) {
+            } catch (InterruptedException ie) {
                 LOG.warn("Interrupted while flusing " + ie);
                 Thread.currentThread().interrupt();
-            } catch(BKException bke) {
+            } catch (BKException bke) {
                 LOG.warn("BookKeeper exception ", bke);
             }
         }
     }
-    
+
     /**
-     * make space for len bytes to be written
-     * to the buffer. 
+     * make space for len bytes to be written to the buffer.
+     * 
      * @param len
-     * @return if true then we can make space for len
-     * if false we cannot
+     * @return if true then we can make space for len if false we cannot
      */
     private boolean makeSpace(int len) {
         if (bytebuff.remaining() < len) {
@@ -111,34 +111,33 @@
         }
         return true;
     }
-    
+
     @Override
     public synchronized void write(byte[] b) {
         if (makeSpace(b.length)) {
             bytebuff.put(b);
-        }
-        else {
+        } else {
             try {
                 lh.addEntry(b);
-            } catch(InterruptedException ie) {
+            } catch (InterruptedException ie) {
                 LOG.warn("Interrupted while writing", ie);
                 Thread.currentThread().interrupt();
-            } catch(BKException bke) {
+            } catch (BKException bke) {
                 LOG.warn("BookKeeper exception", bke);
             }
         }
     }
-    
+
     @Override
     public synchronized void write(byte[] b, int off, int len) {
         if (!makeSpace(len)) {
-            //lets try making the buffer bigger
+            // lets try making the buffer bigger
             bbytes = new byte[len];
             bytebuff = ByteBuffer.wrap(bbytes);
         }
         bytebuff.put(b, off, len);
     }
-    
+
     @Override
     public synchronized void write(int b) throws IOException {
         makeSpace(1);

Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/util/LocalBookKeeper.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/util/LocalBookKeeper.java?rev=903483&r1=903482&r2=903483&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/util/LocalBookKeeper.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/util/LocalBookKeeper.java Tue Jan 26 23:16:45 2010
@@ -25,9 +25,6 @@
 import java.io.OutputStream;
 import java.net.Socket;
 
-import org.apache.bookkeeper.client.BookKeeper;
-import org.apache.bookkeeper.client.LedgerHandle;
-import org.apache.bookkeeper.client.LedgerSequence;
 import org.apache.bookkeeper.proto.BookieServer;
 import org.apache.log4j.ConsoleAppender;
 import org.apache.log4j.Level;
@@ -40,11 +37,8 @@
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.server.NIOServerCnxn;
-import org.apache.zookeeper.server.ServerStats;
 import org.apache.zookeeper.server.ZooKeeperServer;
 
-import org.apache.log4j.Logger;
-
 public class LocalBookKeeper {
     protected static final Logger LOG = Logger.getLogger(LocalBookKeeper.class);
     public static final int CONNECTION_TIMEOUT = 30000;
@@ -98,7 +92,7 @@
 			// TODO Auto-generated catch block
 			LOG.fatal("Exception while instantiating ZooKeeper", e);
 		} 
-		
+
         boolean b = waitForServerUp(HOSTPORT, CONNECTION_TIMEOUT);
         LOG.debug("ZooKeeper server up: " + b);
 	}
@@ -210,5 +204,5 @@
         }
         return false;
     }
-
+	
 }

Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/util/Main.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/util/Main.java?rev=903483&r1=903482&r2=903483&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/util/Main.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/util/Main.java Tue Jan 26 23:16:45 2010
@@ -1,4 +1,5 @@
 package org.apache.bookkeeper.util;
+
 /*
  * 
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -20,30 +21,28 @@
  * 
  */
 
-
 import java.io.IOException;
 
 import org.apache.bookkeeper.proto.BookieClient;
 import org.apache.bookkeeper.proto.BookieServer;
 
-
 public class Main {
 
     static void usage() {
         System.err.println("USAGE: bookeeper client|bookie");
     }
+
     /**
      * @param args
-     * @throws InterruptedException 
-     * @throws IOException 
+     * @throws InterruptedException
+     * @throws IOException
      */
     public static void main(String[] args) throws IOException, InterruptedException {
-        if (args.length < 1 || !(args[0].equals("client") || 
-                args[0].equals("bookie"))) {
+        if (args.length < 1 || !(args[0].equals("client") || args[0].equals("bookie"))) {
             usage();
             return;
         }
-        String newArgs[] = new String[args.length-1];
+        String newArgs[] = new String[args.length - 1];
         System.arraycopy(args, 1, newArgs, 0, newArgs.length);
         if (args[0].equals("bookie")) {
             BookieServer.main(newArgs);

Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/util/MathUtils.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/util/MathUtils.java?rev=903483&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/util/MathUtils.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/util/MathUtils.java Tue Jan 26 23:16:45 2010
@@ -0,0 +1,38 @@
+package org.apache.bookkeeper.util;
+
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Provides misc math functions that dont come standard
+ */
+public class MathUtils {
+
+    public static int signSafeMod(long dividend, int divisor){
+        int mod = (int) (dividend % divisor);
+        
+        if (mod < 0){
+            mod += divisor;
+        }
+        
+        return mod;
+        
+    }
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java?rev=903483&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java Tue Jan 26 23:16:45 2010
@@ -0,0 +1,98 @@
+package org.apache.bookkeeper.util;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+/**
+ * This class provides 2 things over the java {@link ScheduledExecutorService}.
+ * 
+ * 1. It takes {@link SafeRunnable objects} instead of plain Runnable objects.
+ * This means that exceptions in scheduled tasks wont go unnoticed and will be
+ * logged.
+ * 
+ * 2. It supports submitting tasks with an ordering key, so that tasks submitted
+ * with the same key will always be executed in order, but tasks across
+ * different keys can be unordered. This retains parallelism while retaining the
+ * basic amount of ordering we want (e.g. , per ledger handle). Ordering is
+ * achieved by hashing the key objects to threads by their {@link #hashCode()}
+ * method.
+ * 
+ */
+public class OrderedSafeExecutor {
+    ExecutorService threads[];
+    Random rand = new Random();
+
+    public OrderedSafeExecutor(int numThreads) {
+        if (numThreads <= 0) {
+            throw new IllegalArgumentException();
+        }
+
+        threads = new ExecutorService[numThreads];
+        for (int i = 0; i < numThreads; i++) {
+            threads[i] = Executors.newSingleThreadExecutor();
+        }
+    }
+
+    ExecutorService chooseThread() {
+        // skip random # generation in this special case
+        if (threads.length == 1) {
+            return threads[0];
+        }
+
+        return threads[rand.nextInt(threads.length)];
+
+    }
+
+    ExecutorService chooseThread(Object orderingKey) {
+        // skip hashcode generation in this special case
+        if (threads.length == 1) {
+            return threads[0];
+        }
+
+        return threads[MathUtils.signSafeMod(orderingKey.hashCode(), threads.length)];
+
+    }
+
+    /**
+     * schedules a one time action to execute 
+     */
+    public void submit(SafeRunnable r) {
+        chooseThread().submit(r);
+    }
+
+    /**
+     * schedules a one time action to execute with an ordering guarantee on the key
+     * @param orderingKey
+     * @param r
+     */
+    public void submitOrdered(Object orderingKey, SafeRunnable r) {
+        chooseThread(orderingKey).submit(r);
+    }
+
+    public void shutdown() {
+        for (int i = 0; i < threads.length; i++) {
+            threads[i].shutdown();
+        }
+    }
+
+}