You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2016/04/05 08:46:03 UTC

[3/3] bookkeeper git commit: BOOKKEEPER-901: Authentication framework

BOOKKEEPER-901: Authentication framework

Author: Ivan Kelly <iv...@yahoo-inc.com>

Reviewers: Sijie Guo<si...@apache.org>

Closes #23 from merlimat/authentication-framework and squashes the following commits:

aa01548 [Ivan Kelly] BOOKKEEPER-901: Add an authentication framework
f930fbd [Ivan Kelly] BOOKKEEPER-794 BookkeeperProtocol.Response.status is completely ignored


Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/b1c12c0f
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/b1c12c0f
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/b1c12c0f

Branch: refs/heads/master
Commit: b1c12c0f41b7c27b2452fef311f12077d771f431
Parents: e32c388
Author: Ivan Kelly <iv...@yahoo-inc.com>
Authored: Mon Apr 4 23:45:51 2016 -0700
Committer: Sijie Guo <si...@apache.org>
Committed: Mon Apr 4 23:45:51 2016 -0700

----------------------------------------------------------------------
 bookkeeper-server/pom.xml                       |   9 +
 .../auth/AuthProviderFactoryFactory.java        | 111 +++
 .../bookkeeper/auth/BookieAuthProvider.java     |  83 ++
 .../bookkeeper/auth/ClientAuthProvider.java     |  89 +++
 .../apache/bookkeeper/client/PendingAddOp.java  |   4 +-
 .../bookkeeper/conf/ClientConfiguration.java    |  53 +-
 .../bookkeeper/conf/ServerConfiguration.java    |  24 +
 .../apache/bookkeeper/proto/AuthHandler.java    | 356 +++++++++
 .../apache/bookkeeper/proto/BookieClient.java   |  32 +-
 .../bookkeeper/proto/BookieNettyServer.java     |  21 +-
 .../bookkeeper/proto/BookieProtoEncoding.java   | 125 ++-
 .../apache/bookkeeper/proto/BookieProtocol.java |  36 +
 .../proto/BookieRequestProcessor.java           |   1 +
 .../bookkeeper/proto/BookkeeperProtocol.java    | 787 ++++++++++++++++++-
 .../proto/PerChannelBookieClient.java           |  70 +-
 .../src/main/proto/BookkeeperProtocol.proto     |  14 +-
 .../org/apache/bookkeeper/auth/TestAuth.java    | 654 +++++++++++++++
 .../proto/TestBackwardCompatCMS42.java          | 239 ++++++
 .../bookkeeper/proto/TestDataFormats.java       | 126 +++
 .../proto/TestPerChannelBookieClient.java       |  25 +-
 .../test/BookKeeperClusterTestCase.java         |  10 +
 .../src/test/proto/TestDataFormats.proto        |  34 +
 22 files changed, 2803 insertions(+), 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/pom.xml
----------------------------------------------------------------------
diff --git a/bookkeeper-server/pom.xml b/bookkeeper-server/pom.xml
index a1a74e0..eb67fd8 100644
--- a/bookkeeper-server/pom.xml
+++ b/bookkeeper-server/pom.xml
@@ -262,6 +262,7 @@
             <!-- exclude generated file //-->
             <exclude>**/DataFormats.java</exclude>
             <exclude>**/BookkeeperProtocol.java</exclude>
+            <exclude>**/TestDataFormats.java</exclude>
           </excludes>
         </configuration>
       </plugin>
@@ -324,6 +325,14 @@
                       <arg value="--java_out=src/main/java" />
                       <arg value="src/main/proto/DataFormats.proto" />
                     </exec>
+                    <exec executable="protoc" failonerror="true">
+                      <arg value="--java_out=src/main/java" />
+                      <arg value="src/main/proto/BookkeeperProtocol.proto" />
+                    </exec>
+                    <exec executable="protoc" failonerror="true">
+                      <arg value="--java_out=src/test/java" />
+                      <arg value="src/test/proto/TestDataFormats.proto" />
+                    </exec>
                   </target>
                 </configuration>
                 <goals>

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/AuthProviderFactoryFactory.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/AuthProviderFactoryFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/AuthProviderFactoryFactory.java
new file mode 100644
index 0000000..d05c475
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/AuthProviderFactoryFactory.java
@@ -0,0 +1,111 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.auth;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage;
+import org.apache.bookkeeper.util.ReflectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.protobuf.ExtensionRegistry;
+
+
+
+public class AuthProviderFactoryFactory {
+    static Logger LOG = LoggerFactory.getLogger(AuthProviderFactoryFactory.class);
+
+    public static BookieAuthProvider.Factory newBookieAuthProviderFactory(ServerConfiguration conf,
+                                                                          ExtensionRegistry registry) throws IOException {
+        String factoryClassName = conf.getBookieAuthProviderFactoryClass();
+
+        if (factoryClassName == null || factoryClassName.length() == 0) {
+            return new NullBookieAuthProviderFactory();
+        }
+
+        BookieAuthProvider.Factory factory = ReflectionUtils.newInstance(factoryClassName,
+                                                                         BookieAuthProvider.Factory.class);
+        factory.init(conf, registry);
+        return factory;
+    }
+
+    public static ClientAuthProvider.Factory newClientAuthProviderFactory(ClientConfiguration conf,
+                                                                          ExtensionRegistry registry) throws IOException {
+        String factoryClassName = conf.getClientAuthProviderFactoryClass();
+
+        if (factoryClassName == null || factoryClassName.length() == 0) {
+            return new NullClientAuthProviderFactory();
+        }
+
+        ClientAuthProvider.Factory factory = ReflectionUtils.newInstance(factoryClassName,
+                                                                         ClientAuthProvider.Factory.class);
+        factory.init(conf, registry);
+        return factory;
+    }
+
+    private final static String nullPluginName = "NULLPlugin";
+
+    private static class NullBookieAuthProviderFactory implements BookieAuthProvider.Factory {
+        @Override
+        public String getPluginName() {
+            return nullPluginName;
+        }
+
+        @Override
+        public void init(ServerConfiguration conf, ExtensionRegistry registry) {}
+
+        @Override
+        public BookieAuthProvider newProvider(InetSocketAddress addr,
+                                              GenericCallback<Void> completeCb) {
+            completeCb.operationComplete(BKException.Code.OK, null);
+            return new BookieAuthProvider() {
+                public void process(AuthMessage m, GenericCallback<AuthMessage> cb) {}
+            };
+        }
+    }
+
+    private static class NullClientAuthProviderFactory implements ClientAuthProvider.Factory {
+        @Override
+        public String getPluginName() {
+            return nullPluginName;
+        }
+
+        @Override
+        public void init(ClientConfiguration conf, ExtensionRegistry registry) {}
+
+        @Override
+        public ClientAuthProvider newProvider(InetSocketAddress addr,
+                                              GenericCallback<Void> completeCb) {
+            completeCb.operationComplete(BKException.Code.OK, null);
+            return new ClientAuthProvider() {
+                public void init(GenericCallback<AuthMessage> cb) {}
+                public void process(AuthMessage m, GenericCallback<AuthMessage> cb) {}
+            };
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/BookieAuthProvider.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/BookieAuthProvider.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/BookieAuthProvider.java
new file mode 100644
index 0000000..4fb7d07
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/BookieAuthProvider.java
@@ -0,0 +1,83 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.auth;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage;
+
+import com.google.protobuf.ExtensionRegistry;
+
+/**
+ * Bookie authentication provider interface.
+ * This must be implemented by any party wishing to implement
+ * an authentication mechanism for bookkeeper connections.
+ */
+public interface BookieAuthProvider {
+    interface Factory {
+        /**
+         * Initialize the factory with the server configuration
+         * and protobuf message registry. Implementors must
+         * add any extention messages which contain the auth
+         * payload, so that the server can decode auth messages
+         * it receives from the client.
+         */
+        void init(ServerConfiguration conf,
+                  ExtensionRegistry registry) throws IOException;
+
+        /**
+         * Create a new instance of a bookie auth provider.
+         * Each connection should get its own instance, as they
+         * can hold connection specific state.
+         * The completeCb is used to notify the server that
+         * the authentication handshake is complete.
+         * CompleteCb should be called only once.
+         * If the authentication was successful, BKException.Code.OK
+         * should be passed as the return code. Otherwise, another
+         * error code should be passed.
+         * If authentication fails, the server will close the
+         * connection.
+         * @param addr the address of the client being authenticated
+         * @param completeCb callback to be notified when authentication
+         *                   is complete.
+         */
+        BookieAuthProvider newProvider(InetSocketAddress addr,
+                                       GenericCallback<Void> completeCb);
+
+        /**
+         * Get Auth provider plugin name.
+         * Used as a sanity check to ensure that the bookie and the client.
+         * are using the same auth provider.
+         */
+        String getPluginName();
+    }
+
+    /**
+     * Process a request from the client. cb will receive the next
+     * message to be sent to the client. If there are no more messages
+     * to send to the client, cb should not be called, and completeCb
+     * must be called instead.
+     */
+    void process(AuthMessage m, GenericCallback<AuthMessage> cb);
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/ClientAuthProvider.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/ClientAuthProvider.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/ClientAuthProvider.java
new file mode 100644
index 0000000..fba2264
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/ClientAuthProvider.java
@@ -0,0 +1,89 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.auth;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage;
+
+import com.google.protobuf.ExtensionRegistry;
+
+/**
+ * Client authentication provider interface.
+ * This must be implemented by any party wishing to implement
+ * an authentication mechanism for bookkeeper connections.
+ */
+public interface ClientAuthProvider {
+    interface Factory {
+        /**
+         * Initialize the factory with the client configuration
+         * and protobuf message registry. Implementors must
+         * add any extention messages which contain the auth
+         * payload, so that the client can decode auth messages
+         * it receives from the server.
+         */
+        void init(ClientConfiguration conf,
+                  ExtensionRegistry registry) throws IOException;
+
+        /**
+         * Create a new instance of a client auth provider.
+         * Each connection should get its own instance, as they
+         * can hold connection specific state.
+         * The completeCb is used to notify the client that
+         * the authentication handshake is complete.
+         * CompleteCb should be called only once.
+         * If the authentication was successful, BKException.Code.OK
+         * should be passed as the return code. Otherwise, another
+         * error code should be passed.
+         * @param addr the address of the socket being authenticated
+         * @param completeCb callback to be notified when authentication
+         *                   is complete.
+         */
+        ClientAuthProvider newProvider(InetSocketAddress addr,
+                                       GenericCallback<Void> completeCb);
+
+        /**
+         * Get Auth provider plugin name.
+         * Used as a sanity check to ensure that the bookie and the client.
+         * are using the same auth provider.
+         */
+        String getPluginName();
+    }
+
+    /**
+     * Initiate the authentication. cb will receive the initial
+     * authentication message which should be sent to the server.
+     * cb may not be called if authentication is not requires. In
+     * this case, completeCb should be called.
+     */
+    void init(GenericCallback<AuthMessage> cb);
+
+    /**
+     * Process a response from the server. cb will receive the next
+     * message to be sent to the server. If there are no more messages
+     * to send to the server, cb should not be called, and completeCb
+     * must be called instead.
+     */
+    void process(AuthMessage m, GenericCallback<AuthMessage> cb);
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
index bc487f6..1946069 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
@@ -208,8 +208,8 @@ class PendingAddOp implements WriteCallback, TimerTask {
             lh.handleUnrecoverableErrorDuringAdd(rc);
             return;
         default:
-            LOG.warn("Write did not succeed: L{} E{} on {}",
-                     new Object[] { ledgerId, entryId, addr });
+            LOG.warn("Write did not succeed: L{} E{} on {}, rc = {}",
+                     new Object[] { ledgerId, entryId, addr, rc });
             lh.handleBookieFailure(addr, bookieIndex);
             return;
         }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
index d0750d3..b8d738b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
@@ -81,6 +81,9 @@ public class ClientConfiguration extends AbstractConfiguration {
     protected final static String ENABLE_TASK_EXECUTION_STATS = "enableTaskExecutionStats";
     protected final static String TASK_EXECUTION_WARN_TIME_MICROS = "taskExecutionWarnTimeMicros";
 
+    // Client auth provider factory class name
+    protected final static String CLIENT_AUTH_PROVIDER_FACTORY_CLASS = "clientAuthProviderFactoryClass";
+
     /**
      * Construct a default client-side configuration
      */
@@ -700,7 +703,7 @@ public class ClientConfiguration extends AbstractConfiguration {
 
     /**
      * Check if bookie health check is enabled.
-     * 
+     *
      * @return
      */
     public boolean isBookieHealthCheckEnabled() {
@@ -709,15 +712,15 @@ public class ClientConfiguration extends AbstractConfiguration {
 
     /**
      * Enables the bookie health check.
-     * 
+     *
      * <p>
      * If the number of read/write errors for a bookie exceeds {@link #getBookieErrorThresholdPerInterval()} per
      * interval, that bookie is quarantined for {@link #getBookieQuarantineTimeSeconds()} seconds. During this
      * quarantined period, the client will try not to use this bookie when creating new ensembles.
      * </p>
-     * 
+     *
      * By default, the bookie health check is <b>disabled</b>.
-     * 
+     *
      * @return client configuration
      */
     public ClientConfiguration enableBookieHealthCheck() {
@@ -727,7 +730,7 @@ public class ClientConfiguration extends AbstractConfiguration {
 
     /**
      * Get the bookie health check interval in seconds.
-     * 
+     *
      * @return
      */
     public int getBookieHealthCheckIntervalSeconds() {
@@ -736,11 +739,11 @@ public class ClientConfiguration extends AbstractConfiguration {
 
     /**
      * Set the bookie health check interval. Default is 60 seconds.
-     * 
+     *
      * <p>
      * Note: Please {@link #enableBookieHealthCheck()} to use this configuration.
      * </p>
-     * 
+     *
      * @param interval
      * @param unit
      * @return client configuration
@@ -752,7 +755,7 @@ public class ClientConfiguration extends AbstractConfiguration {
 
     /**
      * Get the error threshold for a bookie to be quarantined.
-     * 
+     *
      * @return
      */
     public long getBookieErrorThresholdPerInterval() {
@@ -762,11 +765,11 @@ public class ClientConfiguration extends AbstractConfiguration {
     /**
      * Set the error threshold per interval ({@link #getBookieHealthCheckIntervalSeconds()}) for a bookie before it is
      * quarantined. Default is 100 errors per minute.
-     * 
+     *
      * <p>
      * Note: Please {@link #enableBookieHealthCheck()} to use this configuration.
      * </p>
-     * 
+     *
      * @param threshold
      * @param unit
      * @return client configuration
@@ -778,7 +781,7 @@ public class ClientConfiguration extends AbstractConfiguration {
 
     /**
      * Get the time for which a bookie will be quarantined.
-     * 
+     *
      * @return
      */
     public int getBookieQuarantineTimeSeconds() {
@@ -787,11 +790,11 @@ public class ClientConfiguration extends AbstractConfiguration {
 
     /**
      * Set the time for which a bookie will be quarantined. Default is 30 minutes.
-     * 
+     *
      * <p>
      * Note: Please {@link #enableBookieHealthCheck()} to use this configuration.
      * </p>
-     * 
+     *
      * @param quarantineTime
      * @param unit
      * @return client configuration
@@ -800,4 +803,28 @@ public class ClientConfiguration extends AbstractConfiguration {
         setProperty(BOOKIE_QUARANTINE_TIME_SECONDS, unit.toSeconds(quarantineTime));
         return this;
     }
+
+    /**
+     * Set the client authentication provider factory class name.
+     * If this is not set, no authentication will be used
+     *
+     * @param factoryClass
+     *          the client authentication provider factory class name
+     * @return client configuration
+     */
+    public ClientConfiguration setClientAuthProviderFactoryClass(
+            String factoryClass) {
+        setProperty(CLIENT_AUTH_PROVIDER_FACTORY_CLASS, factoryClass);
+        return this;
+    }
+
+    /**
+     * Get the client authentication provider factory class name. If this returns null, no authentication will take
+     * place.
+     *
+     * @return the client authentication provider factory class name or null.
+     */
+    public String getClientAuthProviderFactoryClass() {
+        return getString(CLIENT_AUTH_PROVIDER_FACTORY_CLASS, null);
+    }
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index 76e5037..d770650 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -122,6 +122,9 @@ public class ServerConfiguration extends AbstractConfiguration {
 
     protected final static String LEDGER_STORAGE_CLASS = "ledgerStorageClass";
 
+    // Bookie auth provider factory class name
+    protected final static String BOOKIE_AUTH_PROVIDER_FACTORY_CLASS = "bookieAuthProviderFactoryClass";
+
     /**
      * Construct a default configuration object
      */
@@ -1566,4 +1569,25 @@ public class ServerConfiguration extends AbstractConfiguration {
         }
     }
 
+    /*
+     * Set the bookie authentication provider factory class name.
+     * If this is not set, no authentication will be used
+     *
+     * @param factoryClass
+     *          the bookie authentication provider factory class name
+     * @return void
+     */
+    public void setBookieAuthProviderFactoryClass(String factoryClass) {
+        setProperty(BOOKIE_AUTH_PROVIDER_FACTORY_CLASS, factoryClass);
+    }
+
+    /**
+     * Get the bookie authentication provider factory class name.
+     * If this returns null, no authentication will take place.
+     *
+     * @return the bookie authentication provider factory class name or null.
+     */
+    public String getBookieAuthProviderFactoryClass() {
+        return getString(BOOKIE_AUTH_PROVIDER_FACTORY_CLASS, null);
+    }
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
new file mode 100644
index 0000000..522bc0b
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
@@ -0,0 +1,356 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.proto;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.jboss.netty.channel.SimpleChannelHandler;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.DefaultExceptionEvent;
+import org.jboss.netty.channel.ExceptionEvent;
+
+import org.apache.bookkeeper.auth.BookieAuthProvider;
+import org.apache.bookkeeper.auth.ClientAuthProvider;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.AddRequest;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.AddResponse;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.OperationType;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class AuthHandler {
+    static final Logger LOG = LoggerFactory.getLogger(AuthHandler.class);
+
+    static class ServerSideHandler extends SimpleChannelHandler {
+        volatile boolean authenticated = false;
+        final BookieAuthProvider.Factory authProviderFactory;
+        BookieAuthProvider authProvider;
+
+        ServerSideHandler(BookieAuthProvider.Factory authProviderFactory) {
+            this.authProviderFactory = authProviderFactory;
+            authProvider = null;
+        }
+
+        @Override
+        public void channelOpen(ChannelHandlerContext ctx,
+                                ChannelStateEvent e) throws Exception {
+            LOG.info("Channel open {}", ctx.getChannel());
+            SocketAddress remote  = ctx.getChannel().getRemoteAddress();
+            if (remote instanceof InetSocketAddress) {
+                authProvider = authProviderFactory.newProvider((InetSocketAddress)remote,
+                        new AuthHandshakeCompleteCallback());
+            } else {
+                LOG.error("Unknown socket type {} for {}", remote.getClass(), remote);
+            }
+            super.channelOpen(ctx, e);
+        }
+
+        @Override
+        public void messageReceived(ChannelHandlerContext ctx,
+                                    MessageEvent e)
+                throws Exception {
+            if (authProvider == null) {
+                // close the channel, authProvider should only be
+                // null if the other end of line is an InetSocketAddress
+                // anything else is strange, and we don't want to deal
+                // with it
+                ctx.getChannel().close();
+                return;
+            }
+
+            Object event = e.getMessage();
+            if (authenticated) {
+                super.messageReceived(ctx, e);
+            } else if (event instanceof BookieProtocol.AuthRequest) { // pre-PB-client
+                BookieProtocol.AuthRequest req = (BookieProtocol.AuthRequest)event;
+                assert (req.getOpCode() == BookieProtocol.AUTH);
+                if (checkAuthPlugin(req.getAuthMessage(), ctx.getChannel())) {
+                    authProvider.process(req.getAuthMessage(),
+                                new AuthResponseCallbackLegacy(req, ctx.getChannel()));
+                } else {
+                    ctx.getChannel().close();
+                }
+            } else if (event instanceof BookieProtocol.Request) {
+                BookieProtocol.Request req = (BookieProtocol.Request)event;
+                if (req.getOpCode() == BookieProtocol.ADDENTRY) {
+                    ctx.getChannel().write(
+                            new BookieProtocol.AddResponse(
+                                    req.getProtocolVersion(), BookieProtocol.EUA,
+                                    req.getLedgerId(), req.getEntryId()));
+                } else if (req.getOpCode() == BookieProtocol.READENTRY) {
+                    ctx.getChannel().write(
+                            new BookieProtocol.ReadResponse(
+                                    req.getProtocolVersion(), BookieProtocol.EUA,
+                                    req.getLedgerId(), req.getEntryId()));
+                } else {
+                    ctx.getChannel().close();
+                }
+            } else if (event instanceof BookkeeperProtocol.Request) { // post-PB-client
+                BookkeeperProtocol.Request req = (BookkeeperProtocol.Request)event;
+                if (req.getHeader().getOperation() == BookkeeperProtocol.OperationType.AUTH
+                        && req.hasAuthRequest()
+                        && checkAuthPlugin(req.getAuthRequest(), ctx.getChannel())) {
+                    authProvider.process(req.getAuthRequest(),
+                                         new AuthResponseCallback(req, ctx.getChannel()));
+                } else {
+                    BookkeeperProtocol.Response.Builder builder
+                        = BookkeeperProtocol.Response.newBuilder()
+                        .setHeader(req.getHeader())
+                        .setStatus(BookkeeperProtocol.StatusCode.EUA);
+
+                    ctx.getChannel().write(builder.build());
+                }
+            } else {
+                // close the channel, junk coming over it
+                ctx.getChannel().close();
+            }
+        }
+
+        private boolean checkAuthPlugin(AuthMessage am, final Channel src) {
+            if (!am.hasAuthPluginName()
+                || !am.getAuthPluginName().equals(authProviderFactory.getPluginName())) {
+                LOG.error("Received message from incompatible auth plugin. Local = {},"
+                          + " Remote = {}, Channel = {}",
+                          authProviderFactory.getPluginName(), am.getAuthPluginName());
+                return false;
+            }
+            return true;
+        }
+
+        static class AuthResponseCallbackLegacy implements GenericCallback<AuthMessage> {
+            final BookieProtocol.AuthRequest req;
+            final Channel channel;
+
+            AuthResponseCallbackLegacy(BookieProtocol.AuthRequest req, Channel channel) {
+                this.req = req;
+                this.channel = channel;
+            }
+
+            public void operationComplete(int rc, AuthMessage newam) {
+                if (rc != BKException.Code.OK) {
+                    LOG.error("Error processing auth message, closing connection");
+                    channel.close();
+                    return;
+                }
+                channel.write(new BookieProtocol.AuthResponse(req.getProtocolVersion(),
+                                                              newam));
+            }
+        }
+
+        static class AuthResponseCallback implements GenericCallback<AuthMessage> {
+            final BookkeeperProtocol.Request req;
+            final Channel channel;
+
+            AuthResponseCallback(BookkeeperProtocol.Request req, Channel channel) {
+                this.req = req;
+                this.channel = channel;
+            }
+
+            public void operationComplete(int rc, AuthMessage newam) {
+                BookkeeperProtocol.Response.Builder builder
+                    = BookkeeperProtocol.Response.newBuilder()
+                    .setHeader(req.getHeader());
+
+                if (rc != BKException.Code.OK) {
+                    LOG.error("Error processing auth message, closing connection");
+
+                    builder.setStatus(BookkeeperProtocol.StatusCode.EUA);
+                    channel.write(builder.build());
+                    channel.close();
+                    return;
+                } else {
+                    builder.setStatus(BookkeeperProtocol.StatusCode.EOK)
+                        .setAuthResponse(newam);
+                    channel.write(builder.build());
+                }
+            }
+        }
+
+        class AuthHandshakeCompleteCallback implements GenericCallback<Void> {
+            @Override
+            public void operationComplete(int rc, Void v) {
+                if (rc == BKException.Code.OK) {
+                    authenticated = true;
+                } else {
+                    LOG.debug("Authentication failed on server side");
+                }
+            }
+        }
+    }
+
+    static class ClientSideHandler extends SimpleChannelHandler {
+        volatile boolean authenticated = false;
+        final ClientAuthProvider.Factory authProviderFactory;
+        ClientAuthProvider authProvider;
+        AtomicLong transactionIdGenerator;
+        Queue<MessageEvent> waitingForAuth = new ConcurrentLinkedQueue<MessageEvent>();
+
+        ClientSideHandler(ClientAuthProvider.Factory authProviderFactory,
+                          AtomicLong transactionIdGenerator) {
+            this.authProviderFactory = authProviderFactory;
+            this.transactionIdGenerator = transactionIdGenerator;
+            authProvider = null;
+        }
+
+        @Override
+        public void channelConnected(ChannelHandlerContext ctx,
+                                     ChannelStateEvent e)
+                throws Exception {
+            SocketAddress remote  = ctx.getChannel().getRemoteAddress();
+            if (remote instanceof InetSocketAddress) {
+                authProvider = authProviderFactory.newProvider((InetSocketAddress)remote,
+                        new AuthHandshakeCompleteCallback(ctx));
+                authProvider.init(new AuthRequestCallback(ctx));
+            } else {
+                LOG.error("Unknown socket type {} for {}", remote.getClass(), remote);
+            }
+            super.channelConnected(ctx, e);
+        }
+
+        @Override
+        public void messageReceived(ChannelHandlerContext ctx,
+                                    MessageEvent e)
+                throws Exception {
+            assert (authProvider != null);
+
+            Object event = e.getMessage();
+
+            if (authenticated) {
+                super.messageReceived(ctx, e);
+            } else if (event instanceof BookkeeperProtocol.Response) {
+                BookkeeperProtocol.Response resp = (BookkeeperProtocol.Response)event;
+                if (resp.getHeader().getOperation() == BookkeeperProtocol.OperationType.AUTH) {
+                    if (resp.getStatus() != BookkeeperProtocol.StatusCode.EOK) {
+                        authenticationError(ctx, resp.getStatus().getNumber());
+                    } else {
+                        assert (resp.hasAuthResponse());
+                        BookkeeperProtocol.AuthMessage am = resp.getAuthResponse();
+                        authProvider.process(am, new AuthRequestCallback(ctx));
+                    }
+                } else {
+                    // else just drop the message,
+                    // we're not authenticated so nothing should be coming through
+                }
+            }
+        }
+
+        @Override
+        public void writeRequested(ChannelHandlerContext ctx,
+                                   MessageEvent e)
+                throws Exception {
+            synchronized (waitingForAuth) {
+                if (authenticated) {
+                    super.writeRequested(ctx, e);
+                } else if (e.getMessage() instanceof BookkeeperProtocol.Request) {
+                    // let auth messages through, queue the rest
+                    BookkeeperProtocol.Request req = (BookkeeperProtocol.Request)e.getMessage();
+                    if (req.getHeader().getOperation()
+                            == BookkeeperProtocol.OperationType.AUTH) {
+                        super.writeRequested(ctx, e);
+                    } else {
+                        waitingForAuth.add(e);
+                    }
+                } // else just drop
+            }
+        }
+
+        long newTxnId() {
+            return transactionIdGenerator.incrementAndGet();
+        }
+
+        void authenticationError(ChannelHandlerContext ctx, int errorCode) {
+            LOG.error("Error processing auth message, erroring connection {}", errorCode);
+            ctx.sendUpstream(new DefaultExceptionEvent(ctx.getChannel(),
+                                     new AuthenticationException(
+                                             "Auth failed with error " + errorCode)));
+        }
+
+        class AuthRequestCallback implements GenericCallback<AuthMessage> {
+            Channel channel;
+            ChannelHandlerContext ctx;
+
+            AuthRequestCallback(ChannelHandlerContext ctx) {
+                this.channel = ctx.getChannel();
+                this.ctx = ctx;
+            }
+
+            public void operationComplete(int rc, AuthMessage newam) {
+                if (rc != BKException.Code.OK) {
+                    authenticationError(ctx, rc);
+                    return;
+                }
+
+                BookkeeperProtocol.BKPacketHeader header
+                    = BookkeeperProtocol.BKPacketHeader.newBuilder()
+                    .setVersion(BookkeeperProtocol.ProtocolVersion.VERSION_THREE)
+                    .setOperation(BookkeeperProtocol.OperationType.AUTH)
+                    .setTxnId(newTxnId()).build();
+                BookkeeperProtocol.Request.Builder builder
+                    = BookkeeperProtocol.Request.newBuilder()
+                    .setHeader(header)
+                    .setAuthRequest(newam);
+
+                channel.write(builder.build());
+            }
+        }
+
+        class AuthHandshakeCompleteCallback implements GenericCallback<Void> {
+            ChannelHandlerContext ctx;
+            AuthHandshakeCompleteCallback(ChannelHandlerContext ctx) {
+                this.ctx = ctx;
+            }
+
+            @Override
+            public void operationComplete(int rc, Void v) {
+                if (rc == BKException.Code.OK) {
+                    synchronized (waitingForAuth) {
+                        authenticated = true;
+                        MessageEvent e = waitingForAuth.poll();
+                        while (e != null) {
+                            ctx.sendDownstream(e);
+                            e = waitingForAuth.poll();
+                        }
+                    }
+                } else {
+                    authenticationError(ctx, rc);
+                    LOG.debug("Authentication failed on server side");
+                }
+            }
+        }
+    }
+
+    static class AuthenticationException extends IOException {
+        AuthenticationException(String reason) {
+            super(reason);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
index 8a79547..d0052d6 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
@@ -28,8 +28,11 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import org.apache.bookkeeper.auth.AuthProviderFactoryFactory;
+import org.apache.bookkeeper.auth.ClientAuthProvider;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.net.BookieSocketAddress;
@@ -52,6 +55,7 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.protobuf.ExtensionRegistry;
 
 /**
  * Implements the client-side part of the BookKeeper protocol.
@@ -60,11 +64,18 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
 public class BookieClient implements PerChannelBookieClientFactory {
     static final Logger LOG = LoggerFactory.getLogger(BookieClient.class);
 
-    final OrderedSafeExecutor executor;
-    final ClientSocketChannelFactory channelFactory;
+    // This is global state that should be across all BookieClients
+    AtomicLong totalBytesOutstanding = new AtomicLong();
+
+    OrderedSafeExecutor executor;
+    ClientSocketChannelFactory channelFactory;
     final ConcurrentHashMap<BookieSocketAddress, PerChannelBookieClientPool> channels =
             new ConcurrentHashMap<BookieSocketAddress, PerChannelBookieClientPool>();
     final HashedWheelTimer requestTimer;
+
+    final private ClientAuthProvider.Factory authProviderFactory;
+    final private ExtensionRegistry registry;
+
     private final ClientConfiguration conf;
     private volatile boolean closed;
     private final ReentrantReadWriteLock closeLock;
@@ -73,17 +84,22 @@ public class BookieClient implements PerChannelBookieClientFactory {
 
     private final long bookieErrorThresholdPerInterval;
 
-    public BookieClient(ClientConfiguration conf, ClientSocketChannelFactory channelFactory, OrderedSafeExecutor executor) {
+    public BookieClient(ClientConfiguration conf, ClientSocketChannelFactory channelFactory,
+            OrderedSafeExecutor executor) throws IOException {
         this(conf, channelFactory, executor, NullStatsLogger.INSTANCE);
     }
 
-    public BookieClient(ClientConfiguration conf, ClientSocketChannelFactory channelFactory, OrderedSafeExecutor executor,
-                        StatsLogger statsLogger) {
+    public BookieClient(ClientConfiguration conf, ClientSocketChannelFactory channelFactory,
+                        OrderedSafeExecutor executor, StatsLogger statsLogger) throws IOException {
         this.conf = conf;
         this.channelFactory = channelFactory;
         this.executor = executor;
         this.closed = false;
         this.closeLock = new ReentrantReadWriteLock();
+
+        this.registry = ExtensionRegistry.newInstance();
+        this.authProviderFactory = AuthProviderFactoryFactory.newClientAuthProviderFactory(conf, registry);
+
         this.statsLogger = statsLogger;
         this.numConnectionsPerBookie = conf.getNumChannelsPerBookie();
         this.requestTimer = new HashedWheelTimer(
@@ -120,8 +136,8 @@ public class BookieClient implements PerChannelBookieClientFactory {
 
     @Override
     public PerChannelBookieClient create(BookieSocketAddress address, PerChannelBookieClientPool pcbcPool) {
-        return new PerChannelBookieClient(conf, executor, channelFactory, address,
-                                          requestTimer, statsLogger, pcbcPool);
+        return new PerChannelBookieClient(conf, executor, channelFactory, address, requestTimer, statsLogger,
+                authProviderFactory, registry, pcbcPool);
     }
 
     private PerChannelBookieClientPool lookupClient(BookieSocketAddress addr, Object key) {
@@ -133,7 +149,7 @@ public class BookieClient implements PerChannelBookieClientFactory {
                     return null;
                 }
                 PerChannelBookieClientPool newClientPool =
-                        new DefaultPerChannelBookieClientPool(this, addr, numConnectionsPerBookie);
+                    new DefaultPerChannelBookieClientPool(this, addr, numConnectionsPerBookie);
                 PerChannelBookieClientPool oldClientPool = channels.putIfAbsent(addr, newClientPool);
                 if (null == oldClientPool) {
                     clientPool = newClientPool;

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
index b623998..bb1b207 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
@@ -28,6 +28,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.bookie.BookieException;
 import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.auth.BookieAuthProvider;
+import org.apache.bookkeeper.auth.AuthProviderFactoryFactory;
 import org.apache.bookkeeper.processor.RequestProcessor;
 import org.apache.zookeeper.KeeperException;
 import org.jboss.netty.bootstrap.ServerBootstrap;
@@ -48,6 +50,7 @@ import org.jboss.netty.handler.codec.frame.LengthFieldPrepender;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.protobuf.ExtensionRegistry;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
@@ -66,11 +69,21 @@ class BookieNettyServer {
     Object suspensionLock = new Object();
     boolean suspended = false;
 
+    final BookieAuthProvider.Factory authProviderFactory;
+    final BookieProtoEncoding.ResponseEncoder responseEncoder;
+    final BookieProtoEncoding.RequestDecoder requestDecoder;
+
     BookieNettyServer(ServerConfiguration conf, RequestProcessor processor)
             throws IOException, KeeperException, InterruptedException, BookieException  {
         this.conf = conf;
         this.requestProcessor = processor;
 
+        ExtensionRegistry registry = ExtensionRegistry.newInstance();
+        authProviderFactory = AuthProviderFactoryFactory.newBookieAuthProviderFactory(conf, registry);
+
+        responseEncoder = new BookieProtoEncoding.ResponseEncoder(registry);
+        requestDecoder = new BookieProtoEncoding.RequestDecoder(registry);
+
         ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
         String base = "bookie-" + conf.getBookiePort() + "-netty";
         serverChannelFactory = new NioServerSocketChannelFactory(
@@ -140,11 +153,15 @@ class BookieNettyServer {
                              new LengthFieldBasedFrameDecoder(maxMessageSize, 0, 4, 0, 4));
             pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
 
-            pipeline.addLast("bookieProtoDecoder", new BookieProtoEncoding.RequestDecoder());
-            pipeline.addLast("bookieProtoEncoder", new BookieProtoEncoding.ResponseEncoder());
+            pipeline.addLast("bookieProtoDecoder", requestDecoder);
+            pipeline.addLast("bookieProtoEncoder", responseEncoder);
+            pipeline.addLast("bookieAuthHandler",
+                             new AuthHandler.ServerSideHandler(authProviderFactory));
+
             SimpleChannelHandler requestHandler = isRunning.get() ?
                     new BookieRequestHandler(conf, requestProcessor, allChannels)
                     : new RejectRequestHandler();
+
             pipeline.addLast("bookieRequestHandler", requestHandler);
             return pipeline;
         }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
index 6ece56e..683a6fb 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
@@ -20,29 +20,26 @@
  */
 package org.apache.bookkeeper.proto;
 
+import com.google.protobuf.ByteString;
+import com.google.protobuf.ExtensionRegistry;
 import com.google.protobuf.InvalidProtocolBufferException;
+
 import org.jboss.netty.buffer.ChannelBufferFactory;
 import org.jboss.netty.buffer.ChannelBufferInputStream;
+import org.jboss.netty.buffer.ChannelBufferOutputStream;
 import org.jboss.netty.buffer.ChannelBuffers;
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelHandlerContext;
-
 import org.apache.bookkeeper.proto.BookieProtocol.PacketHeader;
 import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
 import org.jboss.netty.handler.codec.oneone.OneToOneDecoder;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class BookieProtoEncoding {
     private final static Logger LOG = LoggerFactory.getLogger(BookieProtoEncoding.class);
 
-    static final EnDecoder REQ_PREV3 = new RequestEnDeCoderPreV3();
-    static final EnDecoder REP_PREV3 = new ResponseEnDeCoderPreV3();
-    static final EnDecoder REQ_V3 = new RequestEnDecoderV3();
-    static final EnDecoder REP_V3 = new ResponseEnDecoderV3();
-
     static interface EnDecoder {
 
         /**
@@ -68,6 +65,12 @@ public class BookieProtoEncoding {
     }
 
     static class RequestEnDeCoderPreV3 implements EnDecoder {
+        final ExtensionRegistry extensionRegistry;
+
+        RequestEnDeCoderPreV3(ExtensionRegistry extensionRegistry) {
+            this.extensionRegistry = extensionRegistry;
+        }
+
         @Override
         public Object encode(Object msg, ChannelBufferFactory bufferFactory)
                 throws Exception {
@@ -83,8 +86,7 @@ public class BookieProtoEncoding {
                 buf.writeInt(new PacketHeader(r.getProtocolVersion(), r.getOpCode(), r.getFlags()).toInt());
                 buf.writeBytes(r.getMasterKey(), 0, BookieProtocol.MASTER_KEY_LENGTH);
                 return ChannelBuffers.wrappedBuffer(buf, ar.getData());
-            } else {
-                assert(r instanceof BookieProtocol.ReadRequest);
+            } else if (r instanceof BookieProtocol.ReadRequest) {
                 int totalHeaderSize = 4 // for request type
                     + 8 // for ledgerId
                     + 8; // for entryId
@@ -101,6 +103,19 @@ public class BookieProtoEncoding {
                 }
 
                 return buf;
+            } else if (r instanceof BookieProtocol.AuthRequest) {
+                BookkeeperProtocol.AuthMessage am = ((BookieProtocol.AuthRequest)r).getAuthMessage();
+                int totalHeaderSize = 4; // for request type
+                int totalSize = totalHeaderSize + am.getSerializedSize();
+                ChannelBuffer buf = bufferFactory.getBuffer(totalSize);
+                buf.writeInt(new PacketHeader(r.getProtocolVersion(),
+                                              r.getOpCode(),
+                                              r.getFlags()).toInt());
+                ChannelBufferOutputStream bufStream = new ChannelBufferOutputStream(buf);
+                am.writeTo(bufStream);
+                return buf;
+            } else {
+                return msg;
             }
         }
 
@@ -141,12 +156,23 @@ public class BookieProtoEncoding {
                 } else {
                     return new BookieProtocol.ReadRequest(h.getVersion(), ledgerId, entryId, flags);
                 }
+            case BookieProtocol.AUTH:
+                BookkeeperProtocol.AuthMessage.Builder builder
+                    = BookkeeperProtocol.AuthMessage.newBuilder();
+                builder.mergeFrom(new ChannelBufferInputStream(packet), extensionRegistry);
+                return new BookieProtocol.AuthRequest(h.getVersion(), builder.build());
             }
             return packet;
         }
     }
 
     static class ResponseEnDeCoderPreV3 implements EnDecoder {
+        final ExtensionRegistry extensionRegistry;
+
+        ResponseEnDeCoderPreV3(ExtensionRegistry extensionRegistry) {
+            this.extensionRegistry = extensionRegistry;
+        }
+
         @Override
         public Object encode(Object msg, ChannelBufferFactory bufferFactory)
                 throws Exception {
@@ -157,12 +183,13 @@ public class BookieProtoEncoding {
             ChannelBuffer buf = bufferFactory.getBuffer(24);
             buf.writeInt(new PacketHeader(r.getProtocolVersion(),
                                           r.getOpCode(), (short)0).toInt());
-            buf.writeInt(r.getErrorCode());
-            buf.writeLong(r.getLedgerId());
-            buf.writeLong(r.getEntryId());
 
             ServerStats.getInstance().incrementPacketsSent();
             if (msg instanceof BookieProtocol.ReadResponse) {
+                buf.writeInt(r.getErrorCode());
+                buf.writeLong(r.getLedgerId());
+                buf.writeLong(r.getEntryId());
+
                 BookieProtocol.ReadResponse rr = (BookieProtocol.ReadResponse)r;
                 if (rr.hasData()) {
                     return ChannelBuffers.wrappedBuffer(buf,
@@ -171,7 +198,15 @@ public class BookieProtoEncoding {
                     return buf;
                 }
             } else if (msg instanceof BookieProtocol.AddResponse) {
+                buf.writeInt(r.getErrorCode());
+                buf.writeLong(r.getLedgerId());
+                buf.writeLong(r.getEntryId());
+
                 return buf;
+            } else if (msg instanceof BookieProtocol.AuthResponse) {
+                BookkeeperProtocol.AuthMessage am = ((BookieProtocol.AuthResponse)r).getAuthMessage();
+                return ChannelBuffers.wrappedBuffer(buf,
+                        ChannelBuffers.wrappedBuffer(am.toByteArray()));
             } else {
                 LOG.error("Cannot encode unknown response type {}", msg.getClass().getName());
                 return msg;
@@ -180,19 +215,23 @@ public class BookieProtoEncoding {
         @Override
         public Object decode(ChannelBuffer buffer)
                 throws Exception {
-            final int rc;
-            final long ledgerId, entryId;
+            int rc;
+            long ledgerId, entryId;
             final PacketHeader header;
 
             header = PacketHeader.fromInt(buffer.readInt());
-            rc = buffer.readInt();
-            ledgerId = buffer.readLong();
-            entryId = buffer.readLong();
 
             switch (header.getOpCode()) {
             case BookieProtocol.ADDENTRY:
+                rc = buffer.readInt();
+                ledgerId = buffer.readLong();
+                entryId = buffer.readLong();
                 return new BookieProtocol.AddResponse(header.getVersion(), rc, ledgerId, entryId);
             case BookieProtocol.READENTRY:
+                rc = buffer.readInt();
+                ledgerId = buffer.readLong();
+                entryId = buffer.readLong();
+
                 if (rc == BookieProtocol.EOK) {
                     return new BookieProtocol.ReadResponse(header.getVersion(), rc,
                                                            ledgerId, entryId, buffer.slice());
@@ -200,6 +239,13 @@ public class BookieProtoEncoding {
                     return new BookieProtocol.ReadResponse(header.getVersion(), rc,
                                                            ledgerId, entryId);
                 }
+            case BookieProtocol.AUTH:
+                ChannelBufferInputStream bufStream = new ChannelBufferInputStream(buffer);
+                BookkeeperProtocol.AuthMessage.Builder builder
+                    = BookkeeperProtocol.AuthMessage.newBuilder();
+                builder.mergeFrom(bufStream, extensionRegistry);
+                BookkeeperProtocol.AuthMessage am = builder.build();
+                return new BookieProtocol.AuthResponse(header.getVersion(), am);
             default:
                 return buffer;
             }
@@ -207,10 +253,16 @@ public class BookieProtoEncoding {
     }
 
     static class RequestEnDecoderV3 implements EnDecoder {
+        final ExtensionRegistry extensionRegistry;
+
+        RequestEnDecoderV3(ExtensionRegistry extensionRegistry) {
+            this.extensionRegistry = extensionRegistry;
+        }
 
         @Override
         public Object decode(ChannelBuffer packet) throws Exception {
-            return BookkeeperProtocol.Request.parseFrom(new ChannelBufferInputStream(packet));
+            return BookkeeperProtocol.Request.parseFrom(new ChannelBufferInputStream(packet),
+                                                        extensionRegistry);
         }
 
         @Override
@@ -222,10 +274,16 @@ public class BookieProtoEncoding {
     }
 
     static class ResponseEnDecoderV3 implements EnDecoder {
+        final ExtensionRegistry extensionRegistry;
+
+        ResponseEnDecoderV3(ExtensionRegistry extensionRegistry) {
+            this.extensionRegistry = extensionRegistry;
+        }
 
         @Override
         public Object decode(ChannelBuffer packet) throws Exception {
-            return BookkeeperProtocol.Response.parseFrom(new ChannelBufferInputStream(packet));
+            return BookkeeperProtocol.Response.parseFrom(new ChannelBufferInputStream(packet),
+                                                         extensionRegistry);
         }
 
         @Override
@@ -238,6 +296,14 @@ public class BookieProtoEncoding {
 
     public static class RequestEncoder extends OneToOneEncoder {
 
+        final EnDecoder REQ_PREV3;
+        final EnDecoder REQ_V3;
+
+        RequestEncoder(ExtensionRegistry extensionRegistry) {
+            REQ_PREV3 = new RequestEnDeCoderPreV3(extensionRegistry);
+            REQ_V3 = new RequestEnDecoderV3(extensionRegistry);
+        }
+
         @Override
         protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg)
                 throws Exception {
@@ -256,6 +322,13 @@ public class BookieProtoEncoding {
     }
 
     public static class RequestDecoder extends OneToOneDecoder {
+        final EnDecoder REQ_PREV3;
+        final EnDecoder REQ_V3;
+
+        RequestDecoder(ExtensionRegistry extensionRegistry) {
+            REQ_PREV3 = new RequestEnDeCoderPreV3(extensionRegistry);
+            REQ_V3 = new RequestEnDecoderV3(extensionRegistry);
+        }
 
         @Override
         protected Object decode(ChannelHandlerContext ctx, Channel channel, Object msg)
@@ -283,6 +356,13 @@ public class BookieProtoEncoding {
     }
 
     public static class ResponseEncoder extends OneToOneEncoder {
+        final EnDecoder REP_PREV3;
+        final EnDecoder REP_V3;
+
+        ResponseEncoder(ExtensionRegistry extensionRegistry) {
+            REP_PREV3 = new ResponseEnDeCoderPreV3(extensionRegistry);
+            REP_V3 = new ResponseEnDecoderV3(extensionRegistry);
+        }
 
         @Override
         protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg)
@@ -302,6 +382,13 @@ public class BookieProtoEncoding {
     }
 
     public static class ResponseDecoder extends OneToOneDecoder {
+        final EnDecoder REP_PREV3;
+        final EnDecoder REP_V3;
+
+        ResponseDecoder(ExtensionRegistry extensionRegistry) {
+            REP_PREV3 = new ResponseEnDeCoderPreV3(extensionRegistry);
+            REP_V3 = new ResponseEnDecoderV3(extensionRegistry);
+        }
 
         @Override
         protected Object decode(ChannelHandlerContext ctx, Channel channel, Object msg)

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
index 4dd26d6..2ce5ed8 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
@@ -24,6 +24,8 @@ package org.apache.bookkeeper.proto;
 import org.jboss.netty.buffer.ChannelBuffer;
 import java.nio.ByteBuffer;
 
+import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage;
+
 /**
  * The packets of the Bookie protocol all have a 4-byte integer indicating the
  * type of request or response at the very beginning of the packet followed by a
@@ -133,6 +135,13 @@ public interface BookieProtocol {
     public static final byte READENTRY = 2;
 
     /**
+     * Auth message. This code is for passing auth messages between the auth
+     * providers on the client and bookie. The message payload is determined
+     * by the auth providers themselves.
+     */
+    public static final byte AUTH = 3;
+
+    /**
      * The error code that indicates success
      */
     public static final int EOK = 0;
@@ -273,6 +282,19 @@ public interface BookieProtocol {
         }
     }
 
+    static class AuthRequest extends Request {
+        final AuthMessage authMessage;
+
+        AuthRequest(byte protocolVersion, AuthMessage authMessage) {
+            super(protocolVersion, AUTH, -1, -1, FLAG_NONE, null);
+            this.authMessage = authMessage;
+        }
+
+        AuthMessage getAuthMessage() {
+            return authMessage;
+        }
+    }
+
     static class Response {
         final byte protocolVersion;
         final byte opCode;
@@ -343,4 +365,18 @@ public interface BookieProtocol {
             super(protocolVersion, ADDENTRY, errorCode, ledgerId, entryId);
         }
     }
+
+    static class AuthResponse extends Response {
+        final AuthMessage authMessage;
+
+        AuthResponse(byte protocolVersion, AuthMessage authMessage) {
+            super(protocolVersion, AUTH, EOK, -1, -1);
+            this.authMessage = authMessage;
+        }
+
+        AuthMessage getAuthMessage() {
+            return authMessage;
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
index 9fec15f..1608328 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
@@ -124,6 +124,7 @@ public class BookieRequestProcessor implements RequestProcessor {
                     processReadRequestV3(r, c);
                     break;
                 default:
+                    LOG.info("Unknown operation type {}", header.getOperation());
                     BookkeeperProtocol.Response.Builder response =
                             BookkeeperProtocol.Response.newBuilder().setHeader(r.getHeader())
                             .setStatus(BookkeeperProtocol.StatusCode.EBADREQ);