You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2016/04/05 08:46:01 UTC

[1/3] bookkeeper git commit: BOOKKEEPER-901: Authentication framework

Repository: bookkeeper
Updated Branches:
  refs/heads/master e32c38890 -> b1c12c0f4


http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java
new file mode 100644
index 0000000..fe87ac9
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java
@@ -0,0 +1,239 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.proto;
+
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.proto.BookieProtocol;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.protobuf.ExtensionRegistry;
+
+import org.apache.bookkeeper.auth.ClientAuthProvider;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.PerChannelBookieClient;
+import org.apache.bookkeeper.proto.BookieServer;
+import org.apache.bookkeeper.util.OrderedSafeExecutor;
+import org.apache.bookkeeper.auth.TestAuth;
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+import org.apache.bookkeeper.auth.AuthProviderFactoryFactory;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+
+import org.apache.bookkeeper.proto.BookieProtocol.*;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage;
+
+import java.util.concurrent.ArrayBlockingQueue;
+
+import static org.junit.Assert.*;
+
+public class TestBackwardCompatCMS42 extends BookKeeperClusterTestCase {
+    static final Logger LOG = LoggerFactory.getLogger(TestBackwardCompatCMS42.class);
+
+    ExtensionRegistry extRegistry = ExtensionRegistry.newInstance();
+    ClientAuthProvider.Factory authProvider;
+    ClientSocketChannelFactory channelFactory
+        = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
+                                            Executors.newCachedThreadPool());
+    OrderedSafeExecutor executor = OrderedSafeExecutor.newBuilder().numThreads(1).name("TestBackwardCompatClient")
+            .build();
+
+    public TestBackwardCompatCMS42() throws Exception {
+        super(0);
+
+        TestDataFormats.registerAllExtensions(extRegistry);
+        authProvider = AuthProviderFactoryFactory.newClientAuthProviderFactory(
+                new ClientConfiguration(), extRegistry);
+    }
+
+    @Test(timeout=60000)
+    public void testAuthSingleMessage() throws Exception {
+        ServerConfiguration bookieConf = newServerConfiguration();
+        bookieConf.setBookieAuthProviderFactoryClass(
+                TestAuth.AlwaysSucceedBookieAuthProviderFactory.class.getName());
+        BookieServer bookie1 = startAndStoreBookie(bookieConf);
+
+        AuthMessage.Builder builder = AuthMessage.newBuilder()
+            .setAuthPluginName(TestAuth.TEST_AUTH_PROVIDER_PLUGIN_NAME);
+        builder.setExtension(TestDataFormats.messageType,
+                             TestDataFormats.AuthMessageType.PAYLOAD_MESSAGE);
+        final AuthMessage authMessage = builder.build();
+
+        CompatClient42 client = newCompatClient(bookie1.getLocalAddress());
+
+        Request request = new AuthRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, authMessage);
+        client.sendRequest(request);
+
+        Response response = client.takeResponse();
+        assertTrue("Should be auth response", response instanceof AuthResponse);
+        assertEquals("Should have succeeded", response.getErrorCode(), BookieProtocol.EOK);
+    }
+
+    @Test(timeout=60000)
+    public void testAuthMultiMessage() throws Exception {
+        ServerConfiguration bookieConf = newServerConfiguration();
+        bookieConf.setBookieAuthProviderFactoryClass(
+                TestAuth.SucceedAfter3BookieAuthProviderFactory.class.getName());
+        BookieServer bookie1 = startAndStoreBookie(bookieConf);
+
+        AuthMessage.Builder builder = AuthMessage.newBuilder()
+            .setAuthPluginName(TestAuth.TEST_AUTH_PROVIDER_PLUGIN_NAME);
+        builder.setExtension(TestDataFormats.messageType,
+                             TestDataFormats.AuthMessageType.PAYLOAD_MESSAGE);
+        final AuthMessage authMessage = builder.build();
+        CompatClient42 client = newCompatClient(bookie1.getLocalAddress());
+
+        Request request = new AuthRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, authMessage);
+        for (int i = 0; i < 3 ; i++) {
+            client.sendRequest(request);
+            Response response = client.takeResponse();
+            assertTrue("Should be auth response", response instanceof AuthResponse);
+            AuthResponse authResponse = (AuthResponse)response;
+            assertEquals("Should have succeeded",
+                         response.getErrorCode(), BookieProtocol.EOK);
+            TestDataFormats.AuthMessageType type = authResponse.getAuthMessage()
+                .getExtension(TestDataFormats.messageType);
+            if (i == 2) {
+                assertEquals("Should succeed after 3",
+                             type, TestDataFormats.AuthMessageType.SUCCESS_RESPONSE);
+            } else {
+                assertEquals("Should be payload", type,
+                             TestDataFormats.AuthMessageType.PAYLOAD_MESSAGE);
+            }
+        }
+    }
+
+    @Test(timeout=60000)
+    public void testAuthFail() throws Exception {
+        ServerConfiguration bookieConf = newServerConfiguration();
+        bookieConf.setBookieAuthProviderFactoryClass(
+                TestAuth.FailAfter3BookieAuthProviderFactory.class.getName());
+        BookieServer bookie1 = startAndStoreBookie(bookieConf);
+
+        AuthMessage.Builder builder = AuthMessage.newBuilder()
+            .setAuthPluginName(TestAuth.TEST_AUTH_PROVIDER_PLUGIN_NAME);
+        builder.setExtension(TestDataFormats.messageType,
+                             TestDataFormats.AuthMessageType.PAYLOAD_MESSAGE);
+        final AuthMessage authMessage = builder.build();
+        CompatClient42 client = newCompatClient(bookie1.getLocalAddress());
+
+        Request request = new AuthRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, authMessage);
+        for (int i = 0; i < 3 ; i++) {
+            client.sendRequest(request);
+            Response response = client.takeResponse();
+            assertTrue("Should be auth response", response instanceof AuthResponse);
+            AuthResponse authResponse = (AuthResponse)response;
+            assertEquals("Should have succeeded",
+                         response.getErrorCode(), BookieProtocol.EOK);
+            TestDataFormats.AuthMessageType type = authResponse.getAuthMessage()
+                .getExtension(TestDataFormats.messageType);
+            if (i == 2) {
+                assertEquals("Should fail after 3",
+                             type, TestDataFormats.AuthMessageType.FAILURE_RESPONSE);
+            } else {
+                assertEquals("Should be payload", type,
+                             TestDataFormats.AuthMessageType.PAYLOAD_MESSAGE);
+            }
+
+        }
+
+        client.sendRequest(new ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION,
+                                           1L, 1L, (short)0));
+        Response response = client.takeResponse();
+        assertEquals("Should have failed",
+                     response.getErrorCode(), BookieProtocol.EUA);
+    }
+
+    // copy from TestAuth
+    BookieServer startAndStoreBookie(ServerConfiguration conf) throws Exception {
+        bsConfs.add(conf);
+        BookieServer s = startBookie(conf);
+        bs.add(s);
+        return s;
+    }
+
+    CompatClient42 newCompatClient(BookieSocketAddress addr) throws Exception {
+        return new CompatClient42(executor, channelFactory, addr, authProvider, extRegistry);
+    }
+
+    // extending PerChannelBookieClient to get the pipeline factory
+    class CompatClient42 extends PerChannelBookieClient {
+        final ArrayBlockingQueue<Response> responses = new ArrayBlockingQueue<Response>(10);
+        final Channel channel;
+        final CountDownLatch connected = new CountDownLatch(1);
+
+        CompatClient42(OrderedSafeExecutor executor, ClientSocketChannelFactory channelFactory,
+                       BookieSocketAddress addr,
+                       ClientAuthProvider.Factory authProviderFactory,
+                       ExtensionRegistry extRegistry) throws Exception {
+            super(executor, channelFactory, addr, authProviderFactory, extRegistry);
+
+            ClientBootstrap bootstrap = new ClientBootstrap(channelFactory);
+            bootstrap.setPipelineFactory(this);
+            bootstrap.setOption("tcpNoDelay", false);
+            bootstrap.setOption("keepAlive", true);
+            ChannelFuture f = bootstrap.connect(addr.getSocketAddress()).await();
+            channel = f.getChannel();
+        }
+
+        @Override
+        public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
+            if (!(e.getMessage() instanceof Response)) {
+                LOG.error("Unknown message {}, passing upstream", e.getMessage());
+                ctx.sendUpstream(e);
+                return;
+            }
+            responses.add((Response)e.getMessage());
+        }
+
+        @Override
+        public void channelConnected(ChannelHandlerContext ctx,
+                                     ChannelStateEvent e)
+                throws Exception {
+            connected.countDown();
+        }
+
+        Response takeResponse() throws Exception {
+            return responses.take();
+        }
+
+        Response pollResponse() throws Exception {
+            return responses.poll();
+        }
+
+        void sendRequest(Request request) throws Exception {
+            connected.await();
+            channel.write(request);
+        }
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestDataFormats.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestDataFormats.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestDataFormats.java
new file mode 100644
index 0000000..c3f675f
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestDataFormats.java
@@ -0,0 +1,126 @@
+// Generated by the protocol buffer compiler.  DO NOT EDIT!
+// source: src/test/proto/TestDataFormats.proto
+
+package org.apache.bookkeeper.proto;
+
+public final class TestDataFormats {
+  private TestDataFormats() {}
+  public static void registerAllExtensions(
+      com.google.protobuf.ExtensionRegistry registry) {
+    registry.add(org.apache.bookkeeper.proto.TestDataFormats.messageType);
+  }
+  public enum AuthMessageType
+      implements com.google.protobuf.ProtocolMessageEnum {
+    SUCCESS_RESPONSE(0, 1),
+    FAILURE_RESPONSE(1, 2),
+    PAYLOAD_MESSAGE(2, 3),
+    ;
+    
+    public static final int SUCCESS_RESPONSE_VALUE = 1;
+    public static final int FAILURE_RESPONSE_VALUE = 2;
+    public static final int PAYLOAD_MESSAGE_VALUE = 3;
+    
+    
+    public final int getNumber() { return value; }
+    
+    public static AuthMessageType valueOf(int value) {
+      switch (value) {
+        case 1: return SUCCESS_RESPONSE;
+        case 2: return FAILURE_RESPONSE;
+        case 3: return PAYLOAD_MESSAGE;
+        default: return null;
+      }
+    }
+    
+    public static com.google.protobuf.Internal.EnumLiteMap<AuthMessageType>
+        internalGetValueMap() {
+      return internalValueMap;
+    }
+    private static com.google.protobuf.Internal.EnumLiteMap<AuthMessageType>
+        internalValueMap =
+          new com.google.protobuf.Internal.EnumLiteMap<AuthMessageType>() {
+            public AuthMessageType findValueByNumber(int number) {
+              return AuthMessageType.valueOf(number);
+            }
+          };
+    
+    public final com.google.protobuf.Descriptors.EnumValueDescriptor
+        getValueDescriptor() {
+      return getDescriptor().getValues().get(index);
+    }
+    public final com.google.protobuf.Descriptors.EnumDescriptor
+        getDescriptorForType() {
+      return getDescriptor();
+    }
+    public static final com.google.protobuf.Descriptors.EnumDescriptor
+        getDescriptor() {
+      return org.apache.bookkeeper.proto.TestDataFormats.getDescriptor().getEnumTypes().get(0);
+    }
+    
+    private static final AuthMessageType[] VALUES = {
+      SUCCESS_RESPONSE, FAILURE_RESPONSE, PAYLOAD_MESSAGE, 
+    };
+    
+    public static AuthMessageType valueOf(
+        com.google.protobuf.Descriptors.EnumValueDescriptor desc) {
+      if (desc.getType() != getDescriptor()) {
+        throw new java.lang.IllegalArgumentException(
+          "EnumValueDescriptor is not for this type.");
+      }
+      return VALUES[desc.getIndex()];
+    }
+    
+    private final int index;
+    private final int value;
+    
+    private AuthMessageType(int index, int value) {
+      this.index = index;
+      this.value = value;
+    }
+    
+    // @@protoc_insertion_point(enum_scope:AuthMessageType)
+  }
+  
+  public static final int MESSAGETYPE_FIELD_NUMBER = 1000;
+  public static final
+    com.google.protobuf.GeneratedMessage.GeneratedExtension<
+      org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage,
+      org.apache.bookkeeper.proto.TestDataFormats.AuthMessageType> messageType = com.google.protobuf.GeneratedMessage
+          .newFileScopedGeneratedExtension(
+        org.apache.bookkeeper.proto.TestDataFormats.AuthMessageType.class,
+        null);
+  
+  public static com.google.protobuf.Descriptors.FileDescriptor
+      getDescriptor() {
+    return descriptor;
+  }
+  private static com.google.protobuf.Descriptors.FileDescriptor
+      descriptor;
+  static {
+    java.lang.String[] descriptorData = {
+      "\n$src/test/proto/TestDataFormats.proto\032\'" +
+      "src/main/proto/BookkeeperProtocol.proto*" +
+      "R\n\017AuthMessageType\022\024\n\020SUCCESS_RESPONSE\020\001" +
+      "\022\024\n\020FAILURE_RESPONSE\020\002\022\023\n\017PAYLOAD_MESSAG" +
+      "E\020\003:4\n\013messageType\022\014.AuthMessage\030\350\007 \002(\0162" +
+      "\020.AuthMessageTypeB\037\n\033org.apache.bookkeep" +
+      "er.protoH\001"
+    };
+    com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
+      new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
+        public com.google.protobuf.ExtensionRegistry assignDescriptors(
+            com.google.protobuf.Descriptors.FileDescriptor root) {
+          descriptor = root;
+          messageType.internalInit(descriptor.getExtensions().get(0));
+          return null;
+        }
+      };
+    com.google.protobuf.Descriptors.FileDescriptor
+      .internalBuildGeneratedFileFrom(descriptorData,
+        new com.google.protobuf.Descriptors.FileDescriptor[] {
+          org.apache.bookkeeper.proto.BookkeeperProtocol.getDescriptor(),
+        }, assigner);
+  }
+  
+  // @@protoc_insertion_point(outer_class_scope)
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java
index ac6bd8d..b43f5cd 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java
@@ -20,9 +20,12 @@
  */
 package org.apache.bookkeeper.proto;
 
+import org.apache.bookkeeper.auth.ClientAuthProvider;
+import org.apache.bookkeeper.auth.AuthProviderFactoryFactory;
 import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
@@ -38,6 +41,8 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.protobuf.ExtensionRegistry;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.concurrent.CountDownLatch;
@@ -54,10 +59,16 @@ import static org.junit.Assert.*;
 public class TestPerChannelBookieClient extends BookKeeperClusterTestCase {
     private final static Logger LOG = LoggerFactory.getLogger(TestPerChannelBookieClient.class);
 
-    public TestPerChannelBookieClient() {
+    ExtensionRegistry extRegistry = ExtensionRegistry.newInstance();
+    ClientAuthProvider.Factory authProvider;
+
+    public TestPerChannelBookieClient() throws Exception {
         super(1);
+        authProvider = AuthProviderFactoryFactory.newClientAuthProviderFactory(
+                new ClientConfiguration(), extRegistry);
     }
 
+
     /**
      * Test that a race does not exist between connection completion
      * and client closure. If a race does exist, this test will simply
@@ -74,7 +85,8 @@ public class TestPerChannelBookieClient extends BookKeeperClusterTestCase {
 
         BookieSocketAddress addr = getBookie(0);
         for (int i = 0; i < 1000; i++) {
-            PerChannelBookieClient client = new PerChannelBookieClient(executor, channelFactory, addr);
+            PerChannelBookieClient client = new PerChannelBookieClient(executor, channelFactory, addr,
+                    authProvider, extRegistry);
             client.connectIfNeededAndDoOp(new GenericCallback<PerChannelBookieClient>() {
                     @Override
                     public void operationComplete(int rc, PerChannelBookieClient client) {
@@ -118,7 +130,8 @@ public class TestPerChannelBookieClient extends BookKeeperClusterTestCase {
 
         BookieSocketAddress addr = getBookie(0);
         for (int i = 0; i < 100; i++) {
-            PerChannelBookieClient client = new PerChannelBookieClient(executor, channelFactory, addr);
+            PerChannelBookieClient client = new PerChannelBookieClient(executor, channelFactory, addr,
+                                                                       authProvider, extRegistry);
             for (int j = i; j < 10; j++) {
                 client.connectIfNeededAndDoOp(nullop);
             }
@@ -150,7 +163,8 @@ public class TestPerChannelBookieClient extends BookKeeperClusterTestCase {
         OrderedSafeExecutor executor = getOrderedSafeExecutor();
         BookieSocketAddress addr = getBookie(0);
 
-        final PerChannelBookieClient client = new PerChannelBookieClient(executor, channelFactory, addr);
+        final PerChannelBookieClient client = new PerChannelBookieClient(executor, channelFactory,
+                addr, authProvider, extRegistry);
         final AtomicBoolean shouldFail = new AtomicBoolean(false);
         final AtomicBoolean inconsistent = new AtomicBoolean(false);
         final AtomicBoolean running = new AtomicBoolean(true);
@@ -247,7 +261,8 @@ public class TestPerChannelBookieClient extends BookKeeperClusterTestCase {
         final OrderedSafeExecutor executor = getOrderedSafeExecutor();
         BookieSocketAddress addr = getBookie(0);
 
-        final PerChannelBookieClient client = new PerChannelBookieClient(executor, channelFactory, addr);
+        final PerChannelBookieClient client = new PerChannelBookieClient(executor, channelFactory,
+                addr, authProvider, extRegistry);
         final CountDownLatch completion = new CountDownLatch(1);
         final ReadEntryCallback cb = new ReadEntryCallback() {
                 @Override

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
index 278dc8c..28de0b2 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
@@ -195,6 +195,10 @@ public abstract class BookKeeperClusterTestCase {
                                       f, new File[] { f });
     }
 
+    protected ClientConfiguration newClientConfiguration() {
+        return new ClientConfiguration(baseConf);
+    }
+
     protected ServerConfiguration newServerConfiguration(int port, String zkServers, File journalDir, File[] ledgerDirs) {
         ServerConfiguration conf = new ServerConfiguration(baseConf);
         conf.setBookiePort(port);
@@ -289,9 +293,11 @@ public abstract class BookKeeperClusterTestCase {
                         public void run() {
                             try {
                                 bookie.suspendProcessing();
+                                LOG.info("bookie {} is asleep", bookie.getLocalAddress());
                                 l.countDown();
                                 Thread.sleep(seconds*1000);
                                 bookie.resumeProcessing();
+                                LOG.info("bookie {} is awake", bookie.getLocalAddress());
                             } catch (Exception e) {
                                 LOG.error("Error suspending bookie", e);
                             }
@@ -441,6 +447,10 @@ public abstract class BookKeeperClusterTestCase {
         BookieServer server = new BookieServer(conf);
         server.start();
 
+        if (bkc == null) {
+            bkc = new BookKeeperTestClient(baseClientConf);
+        }
+
         int port = conf.getBookiePort();
         String host = InetAddress.getLocalHost().getHostAddress();
         if (conf.getUseHostNameAsBookieID()) {

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/test/proto/TestDataFormats.proto
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/proto/TestDataFormats.proto b/bookkeeper-server/src/test/proto/TestDataFormats.proto
new file mode 100644
index 0000000..0c616d7
--- /dev/null
+++ b/bookkeeper-server/src/test/proto/TestDataFormats.proto
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.bookkeeper.proto";
+option optimize_for = SPEED;
+
+import "src/main/proto/BookkeeperProtocol.proto";
+
+enum AuthMessageType {
+    SUCCESS_RESPONSE = 1;
+    FAILURE_RESPONSE = 2;
+    PAYLOAD_MESSAGE = 3;
+}
+
+/**
+ *
+ */
+extend AuthMessage {
+    required AuthMessageType messageType = 1000;
+}


[2/3] bookkeeper git commit: BOOKKEEPER-901: Authentication framework

Posted by si...@apache.org.
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperProtocol.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperProtocol.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperProtocol.java
index 57d3503..7fbb2cd 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperProtocol.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperProtocol.java
@@ -176,12 +176,14 @@ public final class BookkeeperProtocol {
     ADD_ENTRY(1, 2),
     RANGE_READ_ENTRY(2, 3),
     RANGE_ADD_ENTRY(3, 4),
+    AUTH(4, 5),
     ;
     
     public static final int READ_ENTRY_VALUE = 1;
     public static final int ADD_ENTRY_VALUE = 2;
     public static final int RANGE_READ_ENTRY_VALUE = 3;
     public static final int RANGE_ADD_ENTRY_VALUE = 4;
+    public static final int AUTH_VALUE = 5;
     
     
     public final int getNumber() { return value; }
@@ -192,6 +194,7 @@ public final class BookkeeperProtocol {
         case 2: return ADD_ENTRY;
         case 3: return RANGE_READ_ENTRY;
         case 4: return RANGE_ADD_ENTRY;
+        case 5: return AUTH;
         default: return null;
       }
     }
@@ -222,7 +225,7 @@ public final class BookkeeperProtocol {
     }
     
     private static final OperationType[] VALUES = {
-      READ_ENTRY, ADD_ENTRY, RANGE_READ_ENTRY, RANGE_ADD_ENTRY, 
+      READ_ENTRY, ADD_ENTRY, RANGE_READ_ENTRY, RANGE_ADD_ENTRY, AUTH, 
     };
     
     public static OperationType valueOf(
@@ -756,6 +759,11 @@ public final class BookkeeperProtocol {
     boolean hasAddRequest();
     org.apache.bookkeeper.proto.BookkeeperProtocol.AddRequest getAddRequest();
     org.apache.bookkeeper.proto.BookkeeperProtocol.AddRequestOrBuilder getAddRequestOrBuilder();
+    
+    // optional .AuthMessage authRequest = 102;
+    boolean hasAuthRequest();
+    org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage getAuthRequest();
+    org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessageOrBuilder getAuthRequestOrBuilder();
   }
   public static final class Request extends
       com.google.protobuf.GeneratedMessage
@@ -825,10 +833,24 @@ public final class BookkeeperProtocol {
       return addRequest_;
     }
     
+    // optional .AuthMessage authRequest = 102;
+    public static final int AUTHREQUEST_FIELD_NUMBER = 102;
+    private org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage authRequest_;
+    public boolean hasAuthRequest() {
+      return ((bitField0_ & 0x00000008) == 0x00000008);
+    }
+    public org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage getAuthRequest() {
+      return authRequest_;
+    }
+    public org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessageOrBuilder getAuthRequestOrBuilder() {
+      return authRequest_;
+    }
+    
     private void initFields() {
       header_ = org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader.getDefaultInstance();
       readRequest_ = org.apache.bookkeeper.proto.BookkeeperProtocol.ReadRequest.getDefaultInstance();
       addRequest_ = org.apache.bookkeeper.proto.BookkeeperProtocol.AddRequest.getDefaultInstance();
+      authRequest_ = org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.getDefaultInstance();
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -855,6 +877,12 @@ public final class BookkeeperProtocol {
           return false;
         }
       }
+      if (hasAuthRequest()) {
+        if (!getAuthRequest().isInitialized()) {
+          memoizedIsInitialized = 0;
+          return false;
+        }
+      }
       memoizedIsInitialized = 1;
       return true;
     }
@@ -871,6 +899,9 @@ public final class BookkeeperProtocol {
       if (((bitField0_ & 0x00000004) == 0x00000004)) {
         output.writeMessage(101, addRequest_);
       }
+      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        output.writeMessage(102, authRequest_);
+      }
       getUnknownFields().writeTo(output);
     }
     
@@ -892,6 +923,10 @@ public final class BookkeeperProtocol {
         size += com.google.protobuf.CodedOutputStream
           .computeMessageSize(101, addRequest_);
       }
+      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeMessageSize(102, authRequest_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -1011,6 +1046,7 @@ public final class BookkeeperProtocol {
           getHeaderFieldBuilder();
           getReadRequestFieldBuilder();
           getAddRequestFieldBuilder();
+          getAuthRequestFieldBuilder();
         }
       }
       private static Builder create() {
@@ -1037,6 +1073,12 @@ public final class BookkeeperProtocol {
           addRequestBuilder_.clear();
         }
         bitField0_ = (bitField0_ & ~0x00000004);
+        if (authRequestBuilder_ == null) {
+          authRequest_ = org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.getDefaultInstance();
+        } else {
+          authRequestBuilder_.clear();
+        }
+        bitField0_ = (bitField0_ & ~0x00000008);
         return this;
       }
       
@@ -1099,6 +1141,14 @@ public final class BookkeeperProtocol {
         } else {
           result.addRequest_ = addRequestBuilder_.build();
         }
+        if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
+          to_bitField0_ |= 0x00000008;
+        }
+        if (authRequestBuilder_ == null) {
+          result.authRequest_ = authRequest_;
+        } else {
+          result.authRequest_ = authRequestBuilder_.build();
+        }
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -1124,6 +1174,9 @@ public final class BookkeeperProtocol {
         if (other.hasAddRequest()) {
           mergeAddRequest(other.getAddRequest());
         }
+        if (other.hasAuthRequest()) {
+          mergeAuthRequest(other.getAuthRequest());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -1149,6 +1202,12 @@ public final class BookkeeperProtocol {
             return false;
           }
         }
+        if (hasAuthRequest()) {
+          if (!getAuthRequest().isInitialized()) {
+            
+            return false;
+          }
+        }
         return true;
       }
       
@@ -1202,6 +1261,15 @@ public final class BookkeeperProtocol {
               setAddRequest(subBuilder.buildPartial());
               break;
             }
+            case 818: {
+              org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.Builder subBuilder = org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.newBuilder();
+              if (hasAuthRequest()) {
+                subBuilder.mergeFrom(getAuthRequest());
+              }
+              input.readMessage(subBuilder, extensionRegistry);
+              setAuthRequest(subBuilder.buildPartial());
+              break;
+            }
           }
         }
       }
@@ -1478,6 +1546,96 @@ public final class BookkeeperProtocol {
         return addRequestBuilder_;
       }
       
+      // optional .AuthMessage authRequest = 102;
+      private org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage authRequest_ = org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.getDefaultInstance();
+      private com.google.protobuf.SingleFieldBuilder<
+          org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage, org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.Builder, org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessageOrBuilder> authRequestBuilder_;
+      public boolean hasAuthRequest() {
+        return ((bitField0_ & 0x00000008) == 0x00000008);
+      }
+      public org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage getAuthRequest() {
+        if (authRequestBuilder_ == null) {
+          return authRequest_;
+        } else {
+          return authRequestBuilder_.getMessage();
+        }
+      }
+      public Builder setAuthRequest(org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage value) {
+        if (authRequestBuilder_ == null) {
+          if (value == null) {
+            throw new NullPointerException();
+          }
+          authRequest_ = value;
+          onChanged();
+        } else {
+          authRequestBuilder_.setMessage(value);
+        }
+        bitField0_ |= 0x00000008;
+        return this;
+      }
+      public Builder setAuthRequest(
+          org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.Builder builderForValue) {
+        if (authRequestBuilder_ == null) {
+          authRequest_ = builderForValue.build();
+          onChanged();
+        } else {
+          authRequestBuilder_.setMessage(builderForValue.build());
+        }
+        bitField0_ |= 0x00000008;
+        return this;
+      }
+      public Builder mergeAuthRequest(org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage value) {
+        if (authRequestBuilder_ == null) {
+          if (((bitField0_ & 0x00000008) == 0x00000008) &&
+              authRequest_ != org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.getDefaultInstance()) {
+            authRequest_ =
+              org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.newBuilder(authRequest_).mergeFrom(value).buildPartial();
+          } else {
+            authRequest_ = value;
+          }
+          onChanged();
+        } else {
+          authRequestBuilder_.mergeFrom(value);
+        }
+        bitField0_ |= 0x00000008;
+        return this;
+      }
+      public Builder clearAuthRequest() {
+        if (authRequestBuilder_ == null) {
+          authRequest_ = org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.getDefaultInstance();
+          onChanged();
+        } else {
+          authRequestBuilder_.clear();
+        }
+        bitField0_ = (bitField0_ & ~0x00000008);
+        return this;
+      }
+      public org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.Builder getAuthRequestBuilder() {
+        bitField0_ |= 0x00000008;
+        onChanged();
+        return getAuthRequestFieldBuilder().getBuilder();
+      }
+      public org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessageOrBuilder getAuthRequestOrBuilder() {
+        if (authRequestBuilder_ != null) {
+          return authRequestBuilder_.getMessageOrBuilder();
+        } else {
+          return authRequest_;
+        }
+      }
+      private com.google.protobuf.SingleFieldBuilder<
+          org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage, org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.Builder, org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessageOrBuilder> 
+          getAuthRequestFieldBuilder() {
+        if (authRequestBuilder_ == null) {
+          authRequestBuilder_ = new com.google.protobuf.SingleFieldBuilder<
+              org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage, org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.Builder, org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessageOrBuilder>(
+                  authRequest_,
+                  getParentForChildren(),
+                  isClean());
+          authRequest_ = null;
+        }
+        return authRequestBuilder_;
+      }
+      
       // @@protoc_insertion_point(builder_scope:Request)
     }
     
@@ -2792,6 +2950,11 @@ public final class BookkeeperProtocol {
     boolean hasAddResponse();
     org.apache.bookkeeper.proto.BookkeeperProtocol.AddResponse getAddResponse();
     org.apache.bookkeeper.proto.BookkeeperProtocol.AddResponseOrBuilder getAddResponseOrBuilder();
+    
+    // optional .AuthMessage authResponse = 102;
+    boolean hasAuthResponse();
+    org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage getAuthResponse();
+    org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessageOrBuilder getAuthResponseOrBuilder();
   }
   public static final class Response extends
       com.google.protobuf.GeneratedMessage
@@ -2871,11 +3034,25 @@ public final class BookkeeperProtocol {
       return addResponse_;
     }
     
+    // optional .AuthMessage authResponse = 102;
+    public static final int AUTHRESPONSE_FIELD_NUMBER = 102;
+    private org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage authResponse_;
+    public boolean hasAuthResponse() {
+      return ((bitField0_ & 0x00000010) == 0x00000010);
+    }
+    public org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage getAuthResponse() {
+      return authResponse_;
+    }
+    public org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessageOrBuilder getAuthResponseOrBuilder() {
+      return authResponse_;
+    }
+    
     private void initFields() {
       header_ = org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader.getDefaultInstance();
       status_ = org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode.EOK;
       readResponse_ = org.apache.bookkeeper.proto.BookkeeperProtocol.ReadResponse.getDefaultInstance();
       addResponse_ = org.apache.bookkeeper.proto.BookkeeperProtocol.AddResponse.getDefaultInstance();
+      authResponse_ = org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.getDefaultInstance();
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -2906,6 +3083,12 @@ public final class BookkeeperProtocol {
           return false;
         }
       }
+      if (hasAuthResponse()) {
+        if (!getAuthResponse().isInitialized()) {
+          memoizedIsInitialized = 0;
+          return false;
+        }
+      }
       memoizedIsInitialized = 1;
       return true;
     }
@@ -2925,6 +3108,9 @@ public final class BookkeeperProtocol {
       if (((bitField0_ & 0x00000008) == 0x00000008)) {
         output.writeMessage(101, addResponse_);
       }
+      if (((bitField0_ & 0x00000010) == 0x00000010)) {
+        output.writeMessage(102, authResponse_);
+      }
       getUnknownFields().writeTo(output);
     }
     
@@ -2950,6 +3136,10 @@ public final class BookkeeperProtocol {
         size += com.google.protobuf.CodedOutputStream
           .computeMessageSize(101, addResponse_);
       }
+      if (((bitField0_ & 0x00000010) == 0x00000010)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeMessageSize(102, authResponse_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -3069,6 +3259,7 @@ public final class BookkeeperProtocol {
           getHeaderFieldBuilder();
           getReadResponseFieldBuilder();
           getAddResponseFieldBuilder();
+          getAuthResponseFieldBuilder();
         }
       }
       private static Builder create() {
@@ -3097,6 +3288,12 @@ public final class BookkeeperProtocol {
           addResponseBuilder_.clear();
         }
         bitField0_ = (bitField0_ & ~0x00000008);
+        if (authResponseBuilder_ == null) {
+          authResponse_ = org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.getDefaultInstance();
+        } else {
+          authResponseBuilder_.clear();
+        }
+        bitField0_ = (bitField0_ & ~0x00000010);
         return this;
       }
       
@@ -3163,6 +3360,14 @@ public final class BookkeeperProtocol {
         } else {
           result.addResponse_ = addResponseBuilder_.build();
         }
+        if (((from_bitField0_ & 0x00000010) == 0x00000010)) {
+          to_bitField0_ |= 0x00000010;
+        }
+        if (authResponseBuilder_ == null) {
+          result.authResponse_ = authResponse_;
+        } else {
+          result.authResponse_ = authResponseBuilder_.build();
+        }
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -3191,6 +3396,9 @@ public final class BookkeeperProtocol {
         if (other.hasAddResponse()) {
           mergeAddResponse(other.getAddResponse());
         }
+        if (other.hasAuthResponse()) {
+          mergeAuthResponse(other.getAuthResponse());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -3220,6 +3428,12 @@ public final class BookkeeperProtocol {
             return false;
           }
         }
+        if (hasAuthResponse()) {
+          if (!getAuthResponse().isInitialized()) {
+            
+            return false;
+          }
+        }
         return true;
       }
       
@@ -3284,6 +3498,15 @@ public final class BookkeeperProtocol {
               setAddResponse(subBuilder.buildPartial());
               break;
             }
+            case 818: {
+              org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.Builder subBuilder = org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.newBuilder();
+              if (hasAuthResponse()) {
+                subBuilder.mergeFrom(getAuthResponse());
+              }
+              input.readMessage(subBuilder, extensionRegistry);
+              setAuthResponse(subBuilder.buildPartial());
+              break;
+            }
           }
         }
       }
@@ -3584,6 +3807,96 @@ public final class BookkeeperProtocol {
         return addResponseBuilder_;
       }
       
+      // optional .AuthMessage authResponse = 102;
+      private org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage authResponse_ = org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.getDefaultInstance();
+      private com.google.protobuf.SingleFieldBuilder<
+          org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage, org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.Builder, org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessageOrBuilder> authResponseBuilder_;
+      public boolean hasAuthResponse() {
+        return ((bitField0_ & 0x00000010) == 0x00000010);
+      }
+      public org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage getAuthResponse() {
+        if (authResponseBuilder_ == null) {
+          return authResponse_;
+        } else {
+          return authResponseBuilder_.getMessage();
+        }
+      }
+      public Builder setAuthResponse(org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage value) {
+        if (authResponseBuilder_ == null) {
+          if (value == null) {
+            throw new NullPointerException();
+          }
+          authResponse_ = value;
+          onChanged();
+        } else {
+          authResponseBuilder_.setMessage(value);
+        }
+        bitField0_ |= 0x00000010;
+        return this;
+      }
+      public Builder setAuthResponse(
+          org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.Builder builderForValue) {
+        if (authResponseBuilder_ == null) {
+          authResponse_ = builderForValue.build();
+          onChanged();
+        } else {
+          authResponseBuilder_.setMessage(builderForValue.build());
+        }
+        bitField0_ |= 0x00000010;
+        return this;
+      }
+      public Builder mergeAuthResponse(org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage value) {
+        if (authResponseBuilder_ == null) {
+          if (((bitField0_ & 0x00000010) == 0x00000010) &&
+              authResponse_ != org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.getDefaultInstance()) {
+            authResponse_ =
+              org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.newBuilder(authResponse_).mergeFrom(value).buildPartial();
+          } else {
+            authResponse_ = value;
+          }
+          onChanged();
+        } else {
+          authResponseBuilder_.mergeFrom(value);
+        }
+        bitField0_ |= 0x00000010;
+        return this;
+      }
+      public Builder clearAuthResponse() {
+        if (authResponseBuilder_ == null) {
+          authResponse_ = org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.getDefaultInstance();
+          onChanged();
+        } else {
+          authResponseBuilder_.clear();
+        }
+        bitField0_ = (bitField0_ & ~0x00000010);
+        return this;
+      }
+      public org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.Builder getAuthResponseBuilder() {
+        bitField0_ |= 0x00000010;
+        onChanged();
+        return getAuthResponseFieldBuilder().getBuilder();
+      }
+      public org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessageOrBuilder getAuthResponseOrBuilder() {
+        if (authResponseBuilder_ != null) {
+          return authResponseBuilder_.getMessageOrBuilder();
+        } else {
+          return authResponse_;
+        }
+      }
+      private com.google.protobuf.SingleFieldBuilder<
+          org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage, org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.Builder, org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessageOrBuilder> 
+          getAuthResponseFieldBuilder() {
+        if (authResponseBuilder_ == null) {
+          authResponseBuilder_ = new com.google.protobuf.SingleFieldBuilder<
+              org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage, org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.Builder, org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessageOrBuilder>(
+                  authResponse_,
+                  getParentForChildren(),
+                  isClean());
+          authResponse_ = null;
+        }
+        return authResponseBuilder_;
+      }
+      
       // @@protoc_insertion_point(builder_scope:Response)
     }
     
@@ -4625,6 +4938,404 @@ public final class BookkeeperProtocol {
     // @@protoc_insertion_point(class_scope:AddResponse)
   }
   
+  public interface AuthMessageOrBuilder extends
+      com.google.protobuf.GeneratedMessage.
+          ExtendableMessageOrBuilder<AuthMessage> {
+    
+    // required string authPluginName = 1;
+    boolean hasAuthPluginName();
+    String getAuthPluginName();
+  }
+  public static final class AuthMessage extends
+      com.google.protobuf.GeneratedMessage.ExtendableMessage<
+        AuthMessage> implements AuthMessageOrBuilder {
+    // Use AuthMessage.newBuilder() to construct.
+    private AuthMessage(Builder builder) {
+      super(builder);
+    }
+    private AuthMessage(boolean noInit) {}
+    
+    private static final AuthMessage defaultInstance;
+    public static AuthMessage getDefaultInstance() {
+      return defaultInstance;
+    }
+    
+    public AuthMessage getDefaultInstanceForType() {
+      return defaultInstance;
+    }
+    
+    public static final com.google.protobuf.Descriptors.Descriptor
+        getDescriptor() {
+      return org.apache.bookkeeper.proto.BookkeeperProtocol.internal_static_AuthMessage_descriptor;
+    }
+    
+    protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+        internalGetFieldAccessorTable() {
+      return org.apache.bookkeeper.proto.BookkeeperProtocol.internal_static_AuthMessage_fieldAccessorTable;
+    }
+    
+    private int bitField0_;
+    // required string authPluginName = 1;
+    public static final int AUTHPLUGINNAME_FIELD_NUMBER = 1;
+    private java.lang.Object authPluginName_;
+    public boolean hasAuthPluginName() {
+      return ((bitField0_ & 0x00000001) == 0x00000001);
+    }
+    public String getAuthPluginName() {
+      java.lang.Object ref = authPluginName_;
+      if (ref instanceof String) {
+        return (String) ref;
+      } else {
+        com.google.protobuf.ByteString bs = 
+            (com.google.protobuf.ByteString) ref;
+        String s = bs.toStringUtf8();
+        if (com.google.protobuf.Internal.isValidUtf8(bs)) {
+          authPluginName_ = s;
+        }
+        return s;
+      }
+    }
+    private com.google.protobuf.ByteString getAuthPluginNameBytes() {
+      java.lang.Object ref = authPluginName_;
+      if (ref instanceof String) {
+        com.google.protobuf.ByteString b = 
+            com.google.protobuf.ByteString.copyFromUtf8((String) ref);
+        authPluginName_ = b;
+        return b;
+      } else {
+        return (com.google.protobuf.ByteString) ref;
+      }
+    }
+    
+    private void initFields() {
+      authPluginName_ = "";
+    }
+    private byte memoizedIsInitialized = -1;
+    public final boolean isInitialized() {
+      byte isInitialized = memoizedIsInitialized;
+      if (isInitialized != -1) return isInitialized == 1;
+      
+      if (!hasAuthPluginName()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      if (!extensionsAreInitialized()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      memoizedIsInitialized = 1;
+      return true;
+    }
+    
+    public void writeTo(com.google.protobuf.CodedOutputStream output)
+                        throws java.io.IOException {
+      getSerializedSize();
+      com.google.protobuf.GeneratedMessage
+        .ExtendableMessage<org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage>.ExtensionWriter extensionWriter =
+          newExtensionWriter();
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        output.writeBytes(1, getAuthPluginNameBytes());
+      }
+      extensionWriter.writeUntil(536870912, output);
+      getUnknownFields().writeTo(output);
+    }
+    
+    private int memoizedSerializedSize = -1;
+    public int getSerializedSize() {
+      int size = memoizedSerializedSize;
+      if (size != -1) return size;
+    
+      size = 0;
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(1, getAuthPluginNameBytes());
+      }
+      size += extensionsSerializedSize();
+      size += getUnknownFields().getSerializedSize();
+      memoizedSerializedSize = size;
+      return size;
+    }
+    
+    private static final long serialVersionUID = 0L;
+    @java.lang.Override
+    protected java.lang.Object writeReplace()
+        throws java.io.ObjectStreamException {
+      return super.writeReplace();
+    }
+    
+    public static org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage parseFrom(
+        com.google.protobuf.ByteString data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data).buildParsed();
+    }
+    public static org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage parseFrom(
+        com.google.protobuf.ByteString data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data, extensionRegistry)
+               .buildParsed();
+    }
+    public static org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage parseFrom(byte[] data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data).buildParsed();
+    }
+    public static org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage parseFrom(
+        byte[] data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data, extensionRegistry)
+               .buildParsed();
+    }
+    public static org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage parseFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage parseFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    public static org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage parseDelimitedFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage parseDelimitedFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage parseFrom(
+        com.google.protobuf.CodedInputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage parseFrom(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    
+    public static Builder newBuilder() { return Builder.create(); }
+    public Builder newBuilderForType() { return newBuilder(); }
+    public static Builder newBuilder(org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage prototype) {
+      return newBuilder().mergeFrom(prototype);
+    }
+    public Builder toBuilder() { return newBuilder(this); }
+    
+    @java.lang.Override
+    protected Builder newBuilderForType(
+        com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+      Builder builder = new Builder(parent);
+      return builder;
+    }
+    public static final class Builder extends
+        com.google.protobuf.GeneratedMessage.ExtendableBuilder<
+          org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage, Builder> implements org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessageOrBuilder {
+      public static final com.google.protobuf.Descriptors.Descriptor
+          getDescriptor() {
+        return org.apache.bookkeeper.proto.BookkeeperProtocol.internal_static_AuthMessage_descriptor;
+      }
+      
+      protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+          internalGetFieldAccessorTable() {
+        return org.apache.bookkeeper.proto.BookkeeperProtocol.internal_static_AuthMessage_fieldAccessorTable;
+      }
+      
+      // Construct using org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.newBuilder()
+      private Builder() {
+        maybeForceBuilderInitialization();
+      }
+      
+      private Builder(BuilderParent parent) {
+        super(parent);
+        maybeForceBuilderInitialization();
+      }
+      private void maybeForceBuilderInitialization() {
+        if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
+        }
+      }
+      private static Builder create() {
+        return new Builder();
+      }
+      
+      public Builder clear() {
+        super.clear();
+        authPluginName_ = "";
+        bitField0_ = (bitField0_ & ~0x00000001);
+        return this;
+      }
+      
+      public Builder clone() {
+        return create().mergeFrom(buildPartial());
+      }
+      
+      public com.google.protobuf.Descriptors.Descriptor
+          getDescriptorForType() {
+        return org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.getDescriptor();
+      }
+      
+      public org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage getDefaultInstanceForType() {
+        return org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.getDefaultInstance();
+      }
+      
+      public org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage build() {
+        org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(result);
+        }
+        return result;
+      }
+      
+      private org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage buildParsed()
+          throws com.google.protobuf.InvalidProtocolBufferException {
+        org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(
+            result).asInvalidProtocolBufferException();
+        }
+        return result;
+      }
+      
+      public org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage buildPartial() {
+        org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage result = new org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage(this);
+        int from_bitField0_ = bitField0_;
+        int to_bitField0_ = 0;
+        if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+          to_bitField0_ |= 0x00000001;
+        }
+        result.authPluginName_ = authPluginName_;
+        result.bitField0_ = to_bitField0_;
+        onBuilt();
+        return result;
+      }
+      
+      public Builder mergeFrom(com.google.protobuf.Message other) {
+        if (other instanceof org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage) {
+          return mergeFrom((org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage)other);
+        } else {
+          super.mergeFrom(other);
+          return this;
+        }
+      }
+      
+      public Builder mergeFrom(org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage other) {
+        if (other == org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.getDefaultInstance()) return this;
+        if (other.hasAuthPluginName()) {
+          setAuthPluginName(other.getAuthPluginName());
+        }
+        this.mergeExtensionFields(other);
+        this.mergeUnknownFields(other.getUnknownFields());
+        return this;
+      }
+      
+      public final boolean isInitialized() {
+        if (!hasAuthPluginName()) {
+          
+          return false;
+        }
+        if (!extensionsAreInitialized()) {
+          
+          return false;
+        }
+        return true;
+      }
+      
+      public Builder mergeFrom(
+          com.google.protobuf.CodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws java.io.IOException {
+        com.google.protobuf.UnknownFieldSet.Builder unknownFields =
+          com.google.protobuf.UnknownFieldSet.newBuilder(
+            this.getUnknownFields());
+        while (true) {
+          int tag = input.readTag();
+          switch (tag) {
+            case 0:
+              this.setUnknownFields(unknownFields.build());
+              onChanged();
+              return this;
+            default: {
+              if (!parseUnknownField(input, unknownFields,
+                                     extensionRegistry, tag)) {
+                this.setUnknownFields(unknownFields.build());
+                onChanged();
+                return this;
+              }
+              break;
+            }
+            case 10: {
+              bitField0_ |= 0x00000001;
+              authPluginName_ = input.readBytes();
+              break;
+            }
+          }
+        }
+      }
+      
+      private int bitField0_;
+      
+      // required string authPluginName = 1;
+      private java.lang.Object authPluginName_ = "";
+      public boolean hasAuthPluginName() {
+        return ((bitField0_ & 0x00000001) == 0x00000001);
+      }
+      public String getAuthPluginName() {
+        java.lang.Object ref = authPluginName_;
+        if (!(ref instanceof String)) {
+          String s = ((com.google.protobuf.ByteString) ref).toStringUtf8();
+          authPluginName_ = s;
+          return s;
+        } else {
+          return (String) ref;
+        }
+      }
+      public Builder setAuthPluginName(String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000001;
+        authPluginName_ = value;
+        onChanged();
+        return this;
+      }
+      public Builder clearAuthPluginName() {
+        bitField0_ = (bitField0_ & ~0x00000001);
+        authPluginName_ = getDefaultInstance().getAuthPluginName();
+        onChanged();
+        return this;
+      }
+      void setAuthPluginName(com.google.protobuf.ByteString value) {
+        bitField0_ |= 0x00000001;
+        authPluginName_ = value;
+        onChanged();
+      }
+      
+      // @@protoc_insertion_point(builder_scope:AuthMessage)
+    }
+    
+    static {
+      defaultInstance = new AuthMessage(true);
+      defaultInstance.initFields();
+    }
+    
+    // @@protoc_insertion_point(class_scope:AuthMessage)
+  }
+  
   private static com.google.protobuf.Descriptors.Descriptor
     internal_static_BKPacketHeader_descriptor;
   private static
@@ -4660,6 +5371,11 @@ public final class BookkeeperProtocol {
   private static
     com.google.protobuf.GeneratedMessage.FieldAccessorTable
       internal_static_AddResponse_fieldAccessorTable;
+  private static com.google.protobuf.Descriptors.Descriptor
+    internal_static_AuthMessage_descriptor;
+  private static
+    com.google.protobuf.GeneratedMessage.FieldAccessorTable
+      internal_static_AuthMessage_fieldAccessorTable;
   
   public static com.google.protobuf.Descriptors.FileDescriptor
       getDescriptor() {
@@ -4672,33 +5388,36 @@ public final class BookkeeperProtocol {
       "\n\'src/main/proto/BookkeeperProtocol.prot" +
       "o\"e\n\016BKPacketHeader\022!\n\007version\030\001 \002(\0162\020.P" +
       "rotocolVersion\022!\n\toperation\030\002 \002(\0162\016.Oper" +
-      "ationType\022\r\n\005txnId\030\003 \002(\004\"n\n\007Request\022\037\n\006h" +
-      "eader\030\001 \002(\0132\017.BKPacketHeader\022!\n\013readRequ" +
-      "est\030d \001(\0132\014.ReadRequest\022\037\n\naddRequest\030e " +
-      "\001(\0132\013.AddRequest\"~\n\013ReadRequest\022\037\n\004flag\030" +
-      "d \001(\0162\021.ReadRequest.Flag\022\020\n\010ledgerId\030\001 \002" +
-      "(\003\022\017\n\007entryId\030\002 \002(\003\022\021\n\tmasterKey\030\003 \001(\014\"\030" +
-      "\n\004Flag\022\020\n\014FENCE_LEDGER\020\001\"\212\001\n\nAddRequest\022",
-      "\036\n\004flag\030d \001(\0162\020.AddRequest.Flag\022\020\n\010ledge" +
-      "rId\030\001 \002(\003\022\017\n\007entryId\030\002 \002(\003\022\021\n\tmasterKey\030" +
-      "\003 \002(\014\022\014\n\004body\030\004 \002(\014\"\030\n\004Flag\022\020\n\014RECOVERY_" +
-      "ADD\020\001\"\220\001\n\010Response\022\037\n\006header\030\001 \002(\0132\017.BKP" +
-      "acketHeader\022\033\n\006status\030\002 \002(\0162\013.StatusCode" +
-      "\022#\n\014readResponse\030d \001(\0132\r.ReadResponse\022!\n" +
-      "\013addResponse\030e \001(\0132\014.AddResponse\"\\\n\014Read" +
-      "Response\022\033\n\006status\030\001 \002(\0162\013.StatusCode\022\020\n" +
-      "\010ledgerId\030\002 \002(\003\022\017\n\007entryId\030\003 \002(\003\022\014\n\004body" +
-      "\030\004 \001(\014\"M\n\013AddResponse\022\033\n\006status\030\001 \002(\0162\013.",
-      "StatusCode\022\020\n\010ledgerId\030\002 \002(\003\022\017\n\007entryId\030" +
-      "\003 \002(\003*F\n\017ProtocolVersion\022\017\n\013VERSION_ONE\020" +
-      "\001\022\017\n\013VERSION_TWO\020\002\022\021\n\rVERSION_THREE\020\003*\206\001" +
-      "\n\nStatusCode\022\007\n\003EOK\020\000\022\016\n\tENOLEDGER\020\222\003\022\r\n" +
-      "\010ENOENTRY\020\223\003\022\014\n\007EBADREQ\020\224\003\022\010\n\003EIO\020\365\003\022\010\n\003" +
-      "EUA\020\366\003\022\020\n\013EBADVERSION\020\367\003\022\014\n\007EFENCED\020\370\003\022\016" +
-      "\n\tEREADONLY\020\371\003*Y\n\rOperationType\022\016\n\nREAD_" +
-      "ENTRY\020\001\022\r\n\tADD_ENTRY\020\002\022\024\n\020RANGE_READ_ENT" +
-      "RY\020\003\022\023\n\017RANGE_ADD_ENTRY\020\004B\037\n\033org.apache." +
-      "bookkeeper.protoH\001"
+      "ationType\022\r\n\005txnId\030\003 \002(\004\"\221\001\n\007Request\022\037\n\006" +
+      "header\030\001 \002(\0132\017.BKPacketHeader\022!\n\013readReq" +
+      "uest\030d \001(\0132\014.ReadRequest\022\037\n\naddRequest\030e" +
+      " \001(\0132\013.AddRequest\022!\n\013authRequest\030f \001(\0132\014" +
+      ".AuthMessage\"~\n\013ReadRequest\022\037\n\004flag\030d \001(" +
+      "\0162\021.ReadRequest.Flag\022\020\n\010ledgerId\030\001 \002(\003\022\017" +
+      "\n\007entryId\030\002 \002(\003\022\021\n\tmasterKey\030\003 \001(\014\"\030\n\004Fl",
+      "ag\022\020\n\014FENCE_LEDGER\020\001\"\212\001\n\nAddRequest\022\036\n\004f" +
+      "lag\030d \001(\0162\020.AddRequest.Flag\022\020\n\010ledgerId\030" +
+      "\001 \002(\003\022\017\n\007entryId\030\002 \002(\003\022\021\n\tmasterKey\030\003 \002(" +
+      "\014\022\014\n\004body\030\004 \002(\014\"\030\n\004Flag\022\020\n\014RECOVERY_ADD\020" +
+      "\001\"\264\001\n\010Response\022\037\n\006header\030\001 \002(\0132\017.BKPacke" +
+      "tHeader\022\033\n\006status\030\002 \002(\0162\013.StatusCode\022#\n\014" +
+      "readResponse\030d \001(\0132\r.ReadResponse\022!\n\013add" +
+      "Response\030e \001(\0132\014.AddResponse\022\"\n\014authResp" +
+      "onse\030f \001(\0132\014.AuthMessage\"\\\n\014ReadResponse" +
+      "\022\033\n\006status\030\001 \002(\0162\013.StatusCode\022\020\n\010ledgerI",
+      "d\030\002 \002(\003\022\017\n\007entryId\030\003 \002(\003\022\014\n\004body\030\004 \001(\014\"M" +
+      "\n\013AddResponse\022\033\n\006status\030\001 \002(\0162\013.StatusCo" +
+      "de\022\020\n\010ledgerId\030\002 \002(\003\022\017\n\007entryId\030\003 \002(\003\"0\n" +
+      "\013AuthMessage\022\026\n\016authPluginName\030\001 \002(\t*\t\010\350" +
+      "\007\020\200\200\200\200\002*F\n\017ProtocolVersion\022\017\n\013VERSION_ON" +
+      "E\020\001\022\017\n\013VERSION_TWO\020\002\022\021\n\rVERSION_THREE\020\003*" +
+      "\206\001\n\nStatusCode\022\007\n\003EOK\020\000\022\016\n\tENOLEDGER\020\222\003\022" +
+      "\r\n\010ENOENTRY\020\223\003\022\014\n\007EBADREQ\020\224\003\022\010\n\003EIO\020\365\003\022\010" +
+      "\n\003EUA\020\366\003\022\020\n\013EBADVERSION\020\367\003\022\014\n\007EFENCED\020\370\003" +
+      "\022\016\n\tEREADONLY\020\371\003*c\n\rOperationType\022\016\n\nREA",
+      "D_ENTRY\020\001\022\r\n\tADD_ENTRY\020\002\022\024\n\020RANGE_READ_E" +
+      "NTRY\020\003\022\023\n\017RANGE_ADD_ENTRY\020\004\022\010\n\004AUTH\020\005B\037\n" +
+      "\033org.apache.bookkeeper.protoH\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -4718,7 +5437,7 @@ public final class BookkeeperProtocol {
           internal_static_Request_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_Request_descriptor,
-              new java.lang.String[] { "Header", "ReadRequest", "AddRequest", },
+              new java.lang.String[] { "Header", "ReadRequest", "AddRequest", "AuthRequest", },
               org.apache.bookkeeper.proto.BookkeeperProtocol.Request.class,
               org.apache.bookkeeper.proto.BookkeeperProtocol.Request.Builder.class);
           internal_static_ReadRequest_descriptor =
@@ -4742,7 +5461,7 @@ public final class BookkeeperProtocol {
           internal_static_Response_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_Response_descriptor,
-              new java.lang.String[] { "Header", "Status", "ReadResponse", "AddResponse", },
+              new java.lang.String[] { "Header", "Status", "ReadResponse", "AddResponse", "AuthResponse", },
               org.apache.bookkeeper.proto.BookkeeperProtocol.Response.class,
               org.apache.bookkeeper.proto.BookkeeperProtocol.Response.Builder.class);
           internal_static_ReadResponse_descriptor =
@@ -4761,6 +5480,14 @@ public final class BookkeeperProtocol {
               new java.lang.String[] { "Status", "LedgerId", "EntryId", },
               org.apache.bookkeeper.proto.BookkeeperProtocol.AddResponse.class,
               org.apache.bookkeeper.proto.BookkeeperProtocol.AddResponse.Builder.class);
+          internal_static_AuthMessage_descriptor =
+            getDescriptor().getMessageTypes().get(7);
+          internal_static_AuthMessage_fieldAccessorTable = new
+            com.google.protobuf.GeneratedMessage.FieldAccessorTable(
+              internal_static_AuthMessage_descriptor,
+              new java.lang.String[] { "AuthPluginName", },
+              org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.class,
+              org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.Builder.class);
           return null;
         }
       };

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 2bd4e9b..0f9feea 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import org.apache.bookkeeper.auth.ClientAuthProvider;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeperClientStats;
 import org.apache.bookkeeper.conf.ClientConfiguration;
@@ -79,6 +80,7 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Sets;
 import com.google.protobuf.ByteString;
+import com.google.protobuf.ExtensionRegistry;
 
 /**
  * This class manages all details of connection to a particular bookie. It also
@@ -134,15 +136,27 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
     private final ClientConfiguration conf;
 
     private final PerChannelBookieClientPool pcbcPool;
+    private final ClientAuthProvider.Factory authProviderFactory;
+    private final ExtensionRegistry extRegistry;
 
     public PerChannelBookieClient(OrderedSafeExecutor executor, ClientSocketChannelFactory channelFactory,
                                   BookieSocketAddress addr) {
-        this(new ClientConfiguration(), executor, channelFactory, addr, null, NullStatsLogger.INSTANCE, null);
+        this(new ClientConfiguration(), executor, channelFactory, addr, null, NullStatsLogger.INSTANCE, null, null, null);
+    }
+
+    public PerChannelBookieClient(OrderedSafeExecutor executor, ClientSocketChannelFactory channelFactory,
+                                  BookieSocketAddress addr,
+                                  ClientAuthProvider.Factory authProviderFactory,
+                                  ExtensionRegistry extRegistry) {
+        this(new ClientConfiguration(), executor, channelFactory, addr, null, NullStatsLogger.INSTANCE,
+                authProviderFactory, extRegistry, null);
     }
 
     public PerChannelBookieClient(ClientConfiguration conf, OrderedSafeExecutor executor,
                                   ClientSocketChannelFactory channelFactory, BookieSocketAddress addr,
                                   HashedWheelTimer requestTimer, StatsLogger parentStatsLogger,
+                                  ClientAuthProvider.Factory authProviderFactory,
+                                  ExtensionRegistry extRegistry,
                                   PerChannelBookieClientPool pcbcPool) {
         this.conf = conf;
         this.addr = addr;
@@ -153,6 +167,9 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
         this.addEntryTimeout = conf.getAddEntryTimeout();
         this.readEntryTimeout = conf.getReadEntryTimeout();
 
+        this.authProviderFactory = authProviderFactory;
+        this.extRegistry = extRegistry;
+
         StringBuilder nameBuilder = new StringBuilder();
         nameBuilder.append(addr.getHostname().replace('.', '_').replace('-', '_'))
             .append("_").append(addr.getPort());
@@ -563,8 +580,8 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
                     bAddress = c.getRemoteAddress().toString();
                 }
 
-                LOG.debug("Could not write request for reading entry: {} ledger-id: {} bookie: {}",
-                          new Object[]{ readCompletion.entryId, readCompletion.ledgerId, bAddress });
+                LOG.debug("Could not write request for reading entry: {} ledger-id: {} bookie: {} rc: {}",
+                        new Object[]{ readCompletion.entryId, readCompletion.ledgerId, bAddress, rc });
 
                 readCompletion.cb.readEntryComplete(rc, readCompletion.ledgerId, readCompletion.entryId,
                                                     null, readCompletion.ctx);
@@ -594,8 +611,8 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
                 if(c != null) {
                     bAddress = c.getRemoteAddress().toString();
                 }
-                LOG.debug("Could not write request for adding entry: {} ledger-id: {} bookie: {}",
-                          new Object[] { addCompletion.entryId, addCompletion.ledgerId, bAddress });
+                LOG.debug("Could not write request for adding entry: {} ledger-id: {} bookie: {} rc: {}",
+                          new Object[] { addCompletion.entryId, addCompletion.ledgerId, bAddress, rc });
 
                 addCompletion.cb.writeComplete(rc, addCompletion.ledgerId, addCompletion.entryId,
                                                addr, addCompletion.ctx);
@@ -656,8 +673,9 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
 
         pipeline.addLast("lengthbasedframedecoder", new LengthFieldBasedFrameDecoder(MAX_FRAME_LENGTH, 0, 4, 0, 4));
         pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
-        pipeline.addLast("bookieProtoEncoder", new BookieProtoEncoding.RequestEncoder());
-        pipeline.addLast("bookieProtoDecoder", new BookieProtoEncoding.ResponseDecoder());
+        pipeline.addLast("bookieProtoEncoder", new BookieProtoEncoding.RequestEncoder(extRegistry));
+        pipeline.addLast("bookieProtoDecoder", new BookieProtoEncoding.ResponseDecoder(extRegistry));
+        pipeline.addLast("authHandler", new AuthHandler.ClientSideHandler(authProviderFactory, txnIdGenerator));
         pipeline.addLast("mainhandler", this);
         return pipeline;
     }
@@ -699,6 +717,16 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
             return;
         }
 
+        if (t instanceof AuthHandler.AuthenticationException) {
+            LOG.error("Error authenticating connection", t);
+            errorOutOutstandingEntries(BKException.Code.UnauthorizedAccessException);
+            Channel c = ctx.getChannel();
+            if (c != null) {
+                closeChannel(c);
+            }
+            return;
+        }
+
         if (t instanceof IOException) {
             // these are thrown when a bookie fails, logging them just pollutes
             // the logs (the failure is logged from the listeners on the write
@@ -739,7 +767,6 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
                 LOG.debug("Unexpected response received from bookie : " + addr + " for type : " + header.             getOperation() +
                         " and txnId : " + header.getTxnId());
             }
-
         } else {
             long orderingKey = completionValue.ledgerId;
             executor.submitOrdered(orderingKey, new SafeRunnable() {
@@ -748,10 +775,10 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
                     OperationType type = header.getOperation();
                     switch (type) {
                         case ADD_ENTRY:
-                            handleAddResponse(response.getAddResponse(), completionValue);
+                            handleAddResponse(response, completionValue);
                             break;
                         case READ_ENTRY:
-                            handleReadResponse(response.getReadResponse(), completionValue);
+                            handleReadResponse(response, completionValue);
                             break;
                         default:
                             LOG.error("Unexpected response, type:{} received from bookie:{}, ignoring",
@@ -770,13 +797,14 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
         }
     }
 
-    void handleAddResponse(AddResponse response, CompletionValue completionValue) {
+    void handleAddResponse(Response response, CompletionValue completionValue) {
         // The completion value should always be an instance of an AddCompletion object when we reach here.
         AddCompletion ac = (AddCompletion)completionValue;
+        AddResponse addResponse = response.getAddResponse();
 
-        long ledgerId = response.getLedgerId();
-        long entryId = response.getEntryId();
-        StatusCode status = response.getStatus();
+        long ledgerId = addResponse.getLedgerId();
+        long entryId = addResponse.getEntryId();
+        StatusCode status = response.getStatus() == StatusCode.EOK ? addResponse.getStatus() : response.getStatus();
 
         if (LOG.isDebugEnabled()) {
             LOG.debug("Got response for add request from bookie: " + addr + " for ledger: " + ledgerId + " entry: "
@@ -796,17 +824,19 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
         ac.cb.writeComplete(rcToRet, ledgerId, entryId, addr, ac.ctx);
     }
 
-    void handleReadResponse(ReadResponse response, CompletionValue completionValue) {
+    void handleReadResponse(Response response, CompletionValue completionValue) {
         // The completion value should always be an instance of a ReadCompletion object when we reach here.
         ReadCompletion rc = (ReadCompletion)completionValue;
+        ReadResponse readResponse = response.getReadResponse();
+
+        long ledgerId = readResponse.getLedgerId();
+        long entryId = readResponse.getEntryId();
+        StatusCode status = response.getStatus() == StatusCode.EOK ? readResponse.getStatus() : response.getStatus();
 
-        long ledgerId = response.getLedgerId();
-        long entryId = response.getEntryId();
-        StatusCode status = response.getStatus();
         ChannelBuffer buffer = ChannelBuffers.buffer(0);
 
-        if (response.hasBody()) {
-            buffer = ChannelBuffers.copiedBuffer(response.getBody().asReadOnlyByteBuffer());
+        if (readResponse.hasBody()) {
+            buffer = ChannelBuffers.copiedBuffer(readResponse.getBody().asReadOnlyByteBuffer());
         }
 
         if (LOG.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/main/proto/BookkeeperProtocol.proto
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/proto/BookkeeperProtocol.proto b/bookkeeper-server/src/main/proto/BookkeeperProtocol.proto
index 56ba581..7aeadfc 100644
--- a/bookkeeper-server/src/main/proto/BookkeeperProtocol.proto
+++ b/bookkeeper-server/src/main/proto/BookkeeperProtocol.proto
@@ -55,6 +55,8 @@ enum OperationType {
     // Not supported yet.
     RANGE_READ_ENTRY = 3;
     RANGE_ADD_ENTRY = 4;
+
+    AUTH = 5;
 }
 
 /**
@@ -71,6 +73,7 @@ message Request {
     // Requests
     optional ReadRequest readRequest = 100;
     optional AddRequest addRequest = 101;
+    optional AuthMessage authRequest = 102;
 }
 
 message ReadRequest {
@@ -105,7 +108,7 @@ message Response {
     // Response
     optional ReadResponse readResponse = 100;
     optional AddResponse addResponse = 101;
-
+    optional AuthMessage authResponse = 102;
 }
 
 message ReadResponse {
@@ -120,3 +123,12 @@ message AddResponse {
     required int64 ledgerId = 2;
     required int64 entryId = 3;
 }
+
+/**
+ * Extendible message which auth mechanisms
+ * can use to carry their payload.
+ */
+message AuthMessage {
+    required string authPluginName = 1;
+    extensions 1000 to max;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/test/java/org/apache/bookkeeper/auth/TestAuth.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/auth/TestAuth.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/auth/TestAuth.java
new file mode 100644
index 0000000..a57bfe9
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/auth/TestAuth.java
@@ -0,0 +1,654 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.auth;
+
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.Enumeration;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.proto.BookieServer;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage;
+import org.apache.bookkeeper.proto.TestDataFormats;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+import com.google.protobuf.ExtensionRegistry;
+
+public class TestAuth extends BookKeeperClusterTestCase {
+    static final Logger LOG = LoggerFactory.getLogger(TestAuth.class);
+    public static final String TEST_AUTH_PROVIDER_PLUGIN_NAME = "TestAuthProviderPlugin";
+    private static final byte[] PASSWD = "testPasswd".getBytes();
+    private static final byte[] ENTRY = "TestEntry".getBytes();
+
+    public TestAuth() {
+        super(0); // start them later when auth providers are configured
+    }
+
+    // we pass in ledgerId because the method may throw exceptions
+    private void connectAndWriteToBookie(ClientConfiguration conf, AtomicLong ledgerWritten)
+            throws Exception {
+        LOG.info("Connecting to bookie");
+        BookKeeper bkc = new BookKeeper(conf, zkc);
+        LedgerHandle l = bkc.createLedger(1, 1, DigestType.CRC32,
+                PASSWD);
+        ledgerWritten.set(l.getId());
+        l.addEntry(ENTRY);
+        l.close();
+        bkc.close();
+    }
+
+    /**
+     * check if the entry exists. Restart the bookie to allow
+     * access
+     */
+    private int entryCount(long ledgerId, ServerConfiguration bookieConf,
+                           ClientConfiguration clientConf) throws Exception {
+        LOG.info("Counting entries in {}", ledgerId);
+        for (ServerConfiguration conf : bsConfs) {
+            conf.setBookieAuthProviderFactoryClass(
+                    AlwaysSucceedBookieAuthProviderFactory.class.getName());
+        }
+        clientConf.setClientAuthProviderFactoryClass(
+                SendUntilCompleteClientAuthProviderFactory.class.getName());
+        
+        restartBookies();
+
+        BookKeeper bkc = new BookKeeper(clientConf, zkc);
+        LedgerHandle lh = bkc.openLedger(ledgerId, DigestType.CRC32,
+                                         PASSWD);
+        if (lh.getLastAddConfirmed() < 0) {
+            return 0;
+        }
+        Enumeration<LedgerEntry> e = lh.readEntries(0, lh.getLastAddConfirmed());
+        int count = 0;
+        while (e.hasMoreElements()) {
+            count++;
+            assertTrue("Should match what we wrote",
+                       Arrays.equals(e.nextElement().getEntry(), ENTRY));
+        }
+        return count;
+    }
+
+    /**
+     * Test an connection will authorize with a single message
+     * to the server and a single response.
+     */
+    @Test(timeout=30000)
+    public void testSingleMessageAuth() throws Exception {
+        ServerConfiguration bookieConf = newServerConfiguration();
+        bookieConf.setBookieAuthProviderFactoryClass(
+                AlwaysSucceedBookieAuthProviderFactory.class.getName());
+        
+        ClientConfiguration clientConf = newClientConfiguration();
+        clientConf.setClientAuthProviderFactoryClass(
+                SendUntilCompleteClientAuthProviderFactory.class.getName());
+        
+        startAndStoreBookie(bookieConf);
+
+        AtomicLong ledgerId = new AtomicLong(-1);
+        connectAndWriteToBookie(clientConf, ledgerId); // should succeed
+
+        assertFalse(ledgerId.get() == -1);
+        assertEquals("Should have entry", 1, entryCount(ledgerId.get(), bookieConf, clientConf));
+    }
+    
+    /**
+     * Test that when the bookie provider sends a failure message
+     * the client will not be able to write
+     */
+    @Test(timeout=30000)
+    public void testSingleMessageAuthFailure() throws Exception {
+        ServerConfiguration bookieConf = newServerConfiguration();
+        bookieConf.setBookieAuthProviderFactoryClass(
+                AlwaysFailBookieAuthProviderFactory.class.getName());
+        
+        ClientConfiguration clientConf = newClientConfiguration();
+        clientConf.setClientAuthProviderFactoryClass(
+                SendUntilCompleteClientAuthProviderFactory.class.getName());
+        
+        startAndStoreBookie(bookieConf);
+
+        AtomicLong ledgerId = new AtomicLong(-1);
+        try {
+            connectAndWriteToBookie(clientConf, ledgerId); // should fail
+            fail("Shouldn't get this far");
+        } catch (BKException.BKUnauthorizedAccessException bke) {
+            // client shouldnt be able to find enough bookies to
+            // write
+        }
+        assertFalse(ledgerId.get() == -1);
+        assertEquals("Shouldn't have entry", 0, entryCount(ledgerId.get(), bookieConf, clientConf));
+    }
+
+    /**
+     * Test that authentication works when the providers
+     * exchange multiple messages
+     */
+    @Test(timeout=30000)
+    public void testMultiMessageAuth() throws Exception {
+        ServerConfiguration bookieConf = newServerConfiguration();
+        bookieConf.setBookieAuthProviderFactoryClass(
+                SucceedAfter3BookieAuthProviderFactory.class.getName());
+        
+        ClientConfiguration clientConf = newClientConfiguration();
+        clientConf.setClientAuthProviderFactoryClass(
+                SendUntilCompleteClientAuthProviderFactory.class.getName());
+        
+        AtomicLong ledgerId = new AtomicLong(-1);
+        startAndStoreBookie(bookieConf);
+        connectAndWriteToBookie(clientConf, ledgerId); // should succeed
+
+        assertFalse(ledgerId.get() == -1);
+        assertEquals("Should have entry", 1, entryCount(ledgerId.get(), bookieConf, clientConf));
+    }
+    
+    /**
+     * Test that when the bookie provider sends a failure message
+     * the client will not be able to write
+     */
+    @Test(timeout=30000)
+    public void testMultiMessageAuthFailure() throws Exception {
+        ServerConfiguration bookieConf = newServerConfiguration();
+        bookieConf.setBookieAuthProviderFactoryClass(
+                FailAfter3BookieAuthProviderFactory.class.getName());
+        
+        ClientConfiguration clientConf = newClientConfiguration();
+        clientConf.setClientAuthProviderFactoryClass(
+                SendUntilCompleteClientAuthProviderFactory.class.getName());
+        
+        startAndStoreBookie(bookieConf);
+
+        AtomicLong ledgerId = new AtomicLong(-1);
+        try {
+            connectAndWriteToBookie(clientConf, ledgerId); // should fail
+            fail("Shouldn't get this far");
+        } catch (BKException.BKUnauthorizedAccessException bke) {
+            // bookie should have sent a negative response before
+            // breaking the conneciton
+        }
+        assertFalse(ledgerId.get() == -1);
+        assertEquals("Shouldn't have entry", 0, entryCount(ledgerId.get(), bookieConf, clientConf));
+    }
+
+    /**
+     * Test that when the bookie and the client have a different
+     * plugin configured, no messages will get through.
+     */
+    @Test(timeout=30000)
+    public void testDifferentPluginFailure() throws Exception {
+        ServerConfiguration bookieConf = newServerConfiguration();
+        bookieConf.setBookieAuthProviderFactoryClass(
+                DifferentPluginBookieAuthProviderFactory.class.getName());
+        
+        ClientConfiguration clientConf = newClientConfiguration();
+        clientConf.setClientAuthProviderFactoryClass(
+                SendUntilCompleteClientAuthProviderFactory.class.getName());
+        
+        startAndStoreBookie(bookieConf);
+        AtomicLong ledgerId = new AtomicLong(-1);
+        try {
+            connectAndWriteToBookie(clientConf, ledgerId); // should fail
+            fail("Shouldn't get this far");
+        } catch (BKException.BKUnauthorizedAccessException bke) {
+            // bookie should have sent a negative response before
+            // breaking the conneciton
+        }
+        assertFalse(ledgerId.get() == -1);
+        assertEquals("Shouldn't have entry", 0, entryCount(ledgerId.get(), bookieConf, clientConf));
+    }
+
+    /**
+     * Test that when the plugin class does exist, but
+     * doesn't implement the interface, we fail predictably
+     */
+    @Test(timeout=30000)
+    public void testExistantButNotValidPlugin() throws Exception {
+        ServerConfiguration bookieConf = newServerConfiguration();
+        bookieConf.setBookieAuthProviderFactoryClass(
+                "java.lang.String");
+
+        ClientConfiguration clientConf = newClientConfiguration();
+        clientConf.setClientAuthProviderFactoryClass(
+                "java.lang.String");
+        try {
+            startAndStoreBookie(bookieConf);
+            fail("Shouldn't get this far");
+        } catch (RuntimeException e) {
+            // received correct exception
+            assertTrue("Wrong exception thrown",
+                    e.getMessage().contains("not "
+                            + BookieAuthProvider.Factory.class.getName()));
+        }
+
+        try {
+            BookKeeper bkc = new BookKeeper(clientConf, zkc);
+            fail("Shouldn't get this far");
+        } catch (RuntimeException e) {
+            // received correct exception
+            assertTrue("Wrong exception thrown",
+                    e.getMessage().contains("not "
+                            + ClientAuthProvider.Factory.class.getName()));
+        }
+    }
+
+    /**
+     * Test that when the plugin class does not exist,
+     * the bookie will not start and the client will
+     * break.
+     */
+    @Test(timeout=30000)
+    public void testNonExistantPlugin() throws Exception {
+        ServerConfiguration bookieConf = newServerConfiguration();
+        bookieConf.setBookieAuthProviderFactoryClass(
+                "NonExistantClassNameForTestingAuthPlugins");
+        
+        ClientConfiguration clientConf = newClientConfiguration();
+        clientConf.setClientAuthProviderFactoryClass(
+                "NonExistantClassNameForTestingAuthPlugins");
+        try {
+            startAndStoreBookie(bookieConf);
+            fail("Shouldn't get this far");
+        } catch (RuntimeException e) {
+            // received correct exception
+            assertEquals("Wrong exception thrown",
+                    e.getCause().getClass(), ClassNotFoundException.class);
+        }
+
+        try {
+            BookKeeper bkc = new BookKeeper(clientConf, zkc);
+            fail("Shouldn't get this far");
+        } catch (RuntimeException e) {
+            // received correct exception
+            assertEquals("Wrong exception thrown",
+                    e.getCause().getClass(), ClassNotFoundException.class);
+        }
+    }
+
+    /**
+     * Test that when the plugin on the bookie crashes, the client doesn't
+     * hang also, but it cannot write in any case.
+     */
+    @Test(timeout=30000)
+    public void testCrashDuringAuth() throws Exception {
+        ServerConfiguration bookieConf = newServerConfiguration();
+        bookieConf.setBookieAuthProviderFactoryClass(
+                CrashAfter3BookieAuthProviderFactory.class.getName());
+        
+        ClientConfiguration clientConf = newClientConfiguration();
+        clientConf.setClientAuthProviderFactoryClass(
+                SendUntilCompleteClientAuthProviderFactory.class.getName());
+
+        startAndStoreBookie(bookieConf);
+
+        AtomicLong ledgerId = new AtomicLong(-1);
+        try {
+            connectAndWriteToBookie(clientConf, ledgerId);
+            fail("Shouldn't get this far");
+        } catch (BKException.BKNotEnoughBookiesException bke) {
+            // bookie won't respond, request will timeout, and then
+            // we wont be able to find a replacement
+        }
+        assertFalse(ledgerId.get() == -1);
+        assertEquals("Shouldn't have entry", 0, entryCount(ledgerId.get(), bookieConf, clientConf));
+    }
+
+    /**
+     * Test that when a bookie simply stops replying during auth, the client doesn't
+     * hang also, but it cannot write in any case.
+     */
+    @Test(timeout=30000)
+    public void testCrashType2DuringAuth() throws Exception {
+        ServerConfiguration bookieConf = newServerConfiguration();
+        bookieConf.setBookieAuthProviderFactoryClass(
+                CrashType2After3BookieAuthProviderFactory.class.getName());
+        
+        ClientConfiguration clientConf = newClientConfiguration();
+        clientConf.setClientAuthProviderFactoryClass(
+                SendUntilCompleteClientAuthProviderFactory.class.getName());
+        crashType2bookieInstance = startAndStoreBookie(bookieConf);
+
+        AtomicLong ledgerId = new AtomicLong(-1);
+        try {
+            connectAndWriteToBookie(clientConf, ledgerId);
+            fail("Shouldn't get this far");
+        } catch (BKException.BKNotEnoughBookiesException bke) {
+            // bookie won't respond, request will timeout, and then
+            // we wont be able to find a replacement
+        }
+        assertFalse(ledgerId.get() == -1);
+        assertEquals("Shouldn't have entry", 0, entryCount(ledgerId.get(), bookieConf, clientConf));
+    }
+
+    BookieServer startAndStoreBookie(ServerConfiguration conf) throws Exception {
+        bsConfs.add(conf);
+        BookieServer s = startBookie(conf);
+        bs.add(s);
+        return s;
+    }
+
+    public static class AlwaysSucceedBookieAuthProviderFactory
+        implements BookieAuthProvider.Factory {
+        @Override
+        public String getPluginName() {
+            return TEST_AUTH_PROVIDER_PLUGIN_NAME;
+        }
+
+        @Override
+        public void init(ServerConfiguration conf, ExtensionRegistry registry) {
+            TestDataFormats.registerAllExtensions(registry);
+        }
+
+        @Override
+        public BookieAuthProvider newProvider(InetSocketAddress addr,
+                                              final GenericCallback<Void> completeCb) {
+            return new BookieAuthProvider() {
+                public void process(AuthMessage m, GenericCallback<AuthMessage> cb) {
+
+                    AuthMessage.Builder builder
+                        = AuthMessage.newBuilder()
+                        .setAuthPluginName(getPluginName());
+                    builder.setExtension(TestDataFormats.messageType, 
+                            TestDataFormats.AuthMessageType.SUCCESS_RESPONSE);
+
+                    cb.operationComplete(BKException.Code.OK, builder.build());
+                    completeCb.operationComplete(BKException.Code.OK, null);
+                }
+            };
+        }
+    }
+
+    public static class AlwaysFailBookieAuthProviderFactory
+        implements BookieAuthProvider.Factory {
+        @Override
+        public String getPluginName() {
+            return TEST_AUTH_PROVIDER_PLUGIN_NAME;
+        }
+
+        @Override
+        public void init(ServerConfiguration conf, ExtensionRegistry registry) {
+            TestDataFormats.registerAllExtensions(registry);
+        }
+
+        @Override
+        public BookieAuthProvider newProvider(InetSocketAddress addr,
+                                              final GenericCallback<Void> completeCb) {
+            return new BookieAuthProvider() {
+                public void process(AuthMessage m, GenericCallback<AuthMessage> cb) {
+                    AuthMessage.Builder builder
+                        = AuthMessage.newBuilder()
+                        .setAuthPluginName(getPluginName());
+                    builder.setExtension(TestDataFormats.messageType, 
+                            TestDataFormats.AuthMessageType.FAILURE_RESPONSE);
+
+                    cb.operationComplete(BKException.Code.OK, builder.build());
+                    completeCb.operationComplete(
+                            BKException.Code.UnauthorizedAccessException, null);
+                }
+            };
+        }
+    }
+
+    private static class SendUntilCompleteClientAuthProviderFactory
+        implements ClientAuthProvider.Factory {
+        
+        @Override
+        public String getPluginName() {
+            return TEST_AUTH_PROVIDER_PLUGIN_NAME;
+        }
+
+        @Override
+        public void init(ClientConfiguration conf, ExtensionRegistry registry) {
+            TestDataFormats.registerAllExtensions(registry);
+        }
+
+        @Override
+        public ClientAuthProvider newProvider(InetSocketAddress addr,
+                final GenericCallback<Void> completeCb) {
+            AuthMessage.Builder builder
+                = AuthMessage.newBuilder()
+                .setAuthPluginName(getPluginName());
+            builder.setExtension(TestDataFormats.messageType, 
+                                 TestDataFormats.AuthMessageType.PAYLOAD_MESSAGE);
+            final AuthMessage message = builder.build();
+
+            return new ClientAuthProvider() {
+                public void init(GenericCallback<AuthMessage> cb) {
+                    cb.operationComplete(BKException.Code.OK, message);
+                }
+
+                public void process(AuthMessage m, GenericCallback<AuthMessage> cb) {
+                    if (m.hasExtension(TestDataFormats.messageType)) {
+                        TestDataFormats.AuthMessageType type
+                            = m.getExtension(TestDataFormats.messageType);
+                        if (type == TestDataFormats.AuthMessageType.SUCCESS_RESPONSE) {
+                            completeCb.operationComplete(BKException.Code.OK, null);
+                        } else if (type == TestDataFormats.AuthMessageType.FAILURE_RESPONSE) {
+                            completeCb.operationComplete(BKException.Code.UnauthorizedAccessException, null);
+                        } else {
+                            cb.operationComplete(BKException.Code.OK, message);
+                        }
+                    } else {
+                        completeCb.operationComplete(BKException.Code.UnauthorizedAccessException, null);
+                    }
+                }
+            };
+        }
+    }
+
+    public static class SucceedAfter3BookieAuthProviderFactory
+        implements BookieAuthProvider.Factory {
+        AtomicInteger numMessages = new AtomicInteger(0);
+
+        @Override
+        public String getPluginName() {
+            return TEST_AUTH_PROVIDER_PLUGIN_NAME;
+        }
+
+        @Override
+        public void init(ServerConfiguration conf, ExtensionRegistry registry) {
+            TestDataFormats.registerAllExtensions(registry);
+        }
+
+        @Override
+        public BookieAuthProvider newProvider(InetSocketAddress addr,
+                                              final GenericCallback<Void> completeCb) {
+            return new BookieAuthProvider() {
+                public void process(AuthMessage m, GenericCallback<AuthMessage> cb) {
+                    AuthMessage.Builder builder
+                        = AuthMessage.newBuilder()
+                        .setAuthPluginName(getPluginName());
+                    if (numMessages.incrementAndGet() == 3) {
+                        builder.setExtension(TestDataFormats.messageType, 
+                                TestDataFormats.AuthMessageType.SUCCESS_RESPONSE);
+
+                        cb.operationComplete(BKException.Code.OK, builder.build());
+                        completeCb.operationComplete(BKException.Code.OK, null);
+                    } else {
+                        builder.setExtension(TestDataFormats.messageType, 
+                                TestDataFormats.AuthMessageType.PAYLOAD_MESSAGE);
+
+                        cb.operationComplete(BKException.Code.OK, builder.build());
+                    }
+                }
+            };
+        }
+    }
+
+    public static class FailAfter3BookieAuthProviderFactory
+        implements BookieAuthProvider.Factory {
+        AtomicInteger numMessages = new AtomicInteger(0);
+
+        @Override
+        public String getPluginName() {
+            return TEST_AUTH_PROVIDER_PLUGIN_NAME;
+        }
+
+        @Override
+        public void init(ServerConfiguration conf, ExtensionRegistry registry) {
+            TestDataFormats.registerAllExtensions(registry);
+        }
+
+        @Override
+        public BookieAuthProvider newProvider(InetSocketAddress addr,
+                                              final GenericCallback<Void> completeCb) {
+            return new BookieAuthProvider() {
+                public void process(AuthMessage m, GenericCallback<AuthMessage> cb) {
+                    AuthMessage.Builder builder
+                        = AuthMessage.newBuilder()
+                        .setAuthPluginName(getPluginName());
+                    if (numMessages.incrementAndGet() == 3) {
+                        builder.setExtension(TestDataFormats.messageType, 
+                                TestDataFormats.AuthMessageType.FAILURE_RESPONSE);
+
+                        cb.operationComplete(BKException.Code.OK, builder.build());
+                        completeCb.operationComplete(BKException.Code.UnauthorizedAccessException,
+                                                     null);
+                    } else {
+                        builder.setExtension(TestDataFormats.messageType, 
+                                TestDataFormats.AuthMessageType.PAYLOAD_MESSAGE);
+
+                        cb.operationComplete(BKException.Code.OK, builder.build());
+                    }
+                }
+            };
+        }
+    }
+
+    public static class CrashAfter3BookieAuthProviderFactory
+        implements BookieAuthProvider.Factory {
+        AtomicInteger numMessages = new AtomicInteger(0);
+
+        @Override
+        public String getPluginName() {
+            return TEST_AUTH_PROVIDER_PLUGIN_NAME;
+        }
+
+        @Override
+        public void init(ServerConfiguration conf, ExtensionRegistry registry) {
+            TestDataFormats.registerAllExtensions(registry);
+        }
+
+        @Override
+        public BookieAuthProvider newProvider(InetSocketAddress addr,
+                                              final GenericCallback<Void> completeCb) {
+            return new BookieAuthProvider() {
+                public void process(AuthMessage m, GenericCallback<AuthMessage> cb) {
+                    AuthMessage.Builder builder
+                        = AuthMessage.newBuilder()
+                        .setAuthPluginName(getPluginName());
+                    if (numMessages.incrementAndGet() == 3) {
+                        throw new RuntimeException("Do bad things to the bookie");
+                    } else {
+                        builder.setExtension(TestDataFormats.messageType, 
+                                TestDataFormats.AuthMessageType.PAYLOAD_MESSAGE);
+
+                        cb.operationComplete(BKException.Code.OK, builder.build());
+                    }
+                }
+            };
+        }
+    }
+
+    private static BookieServer crashType2bookieInstance = null;
+    public static class CrashType2After3BookieAuthProviderFactory
+        implements BookieAuthProvider.Factory {
+        AtomicInteger numMessages = new AtomicInteger(0);
+
+        @Override
+        public String getPluginName() {
+            return TEST_AUTH_PROVIDER_PLUGIN_NAME;
+        }
+
+        @Override
+        public void init(ServerConfiguration conf, ExtensionRegistry registry) {
+            TestDataFormats.registerAllExtensions(registry);
+        }
+
+        @Override
+        public BookieAuthProvider newProvider(InetSocketAddress addr,
+                                              final GenericCallback<Void> completeCb) {
+            return new BookieAuthProvider() {
+                public void process(AuthMessage m, GenericCallback<AuthMessage> cb) {
+                    AuthMessage.Builder builder
+                        = AuthMessage.newBuilder()
+                        .setAuthPluginName(getPluginName());
+                    if (numMessages.incrementAndGet() != 3) {
+                        builder.setExtension(TestDataFormats.messageType,
+                                TestDataFormats.AuthMessageType.PAYLOAD_MESSAGE);
+                        cb.operationComplete(BKException.Code.OK, builder.build());
+                        return;
+                    }
+
+                    crashType2bookieInstance.suspendProcessing();
+                }
+            };
+        }
+    }
+
+    public static class DifferentPluginBookieAuthProviderFactory
+        implements BookieAuthProvider.Factory {
+        @Override
+        public String getPluginName() {
+            return "DifferentAuthProviderPlugin";
+        }
+
+        @Override
+        public void init(ServerConfiguration conf, ExtensionRegistry registry) {
+            TestDataFormats.registerAllExtensions(registry);
+        }
+
+        @Override
+        public BookieAuthProvider newProvider(InetSocketAddress addr,
+                                              final GenericCallback<Void> completeCb) {
+            return new BookieAuthProvider() {
+                public void process(AuthMessage m, GenericCallback<AuthMessage> cb) {
+
+                    AuthMessage.Builder builder
+                        = AuthMessage.newBuilder()
+                        .setAuthPluginName(getPluginName());
+                    builder.setExtension(TestDataFormats.messageType, 
+                            TestDataFormats.AuthMessageType.FAILURE_RESPONSE);
+
+                    cb.operationComplete(BKException.Code.OK, builder.build());
+                    completeCb.operationComplete(BKException.Code.OK, null);
+                }
+            };
+        }
+    }
+
+}


[3/3] bookkeeper git commit: BOOKKEEPER-901: Authentication framework

Posted by si...@apache.org.
BOOKKEEPER-901: Authentication framework

Author: Ivan Kelly <iv...@yahoo-inc.com>

Reviewers: Sijie Guo<si...@apache.org>

Closes #23 from merlimat/authentication-framework and squashes the following commits:

aa01548 [Ivan Kelly] BOOKKEEPER-901: Add an authentication framework
f930fbd [Ivan Kelly] BOOKKEEPER-794 BookkeeperProtocol.Response.status is completely ignored


Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/b1c12c0f
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/b1c12c0f
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/b1c12c0f

Branch: refs/heads/master
Commit: b1c12c0f41b7c27b2452fef311f12077d771f431
Parents: e32c388
Author: Ivan Kelly <iv...@yahoo-inc.com>
Authored: Mon Apr 4 23:45:51 2016 -0700
Committer: Sijie Guo <si...@apache.org>
Committed: Mon Apr 4 23:45:51 2016 -0700

----------------------------------------------------------------------
 bookkeeper-server/pom.xml                       |   9 +
 .../auth/AuthProviderFactoryFactory.java        | 111 +++
 .../bookkeeper/auth/BookieAuthProvider.java     |  83 ++
 .../bookkeeper/auth/ClientAuthProvider.java     |  89 +++
 .../apache/bookkeeper/client/PendingAddOp.java  |   4 +-
 .../bookkeeper/conf/ClientConfiguration.java    |  53 +-
 .../bookkeeper/conf/ServerConfiguration.java    |  24 +
 .../apache/bookkeeper/proto/AuthHandler.java    | 356 +++++++++
 .../apache/bookkeeper/proto/BookieClient.java   |  32 +-
 .../bookkeeper/proto/BookieNettyServer.java     |  21 +-
 .../bookkeeper/proto/BookieProtoEncoding.java   | 125 ++-
 .../apache/bookkeeper/proto/BookieProtocol.java |  36 +
 .../proto/BookieRequestProcessor.java           |   1 +
 .../bookkeeper/proto/BookkeeperProtocol.java    | 787 ++++++++++++++++++-
 .../proto/PerChannelBookieClient.java           |  70 +-
 .../src/main/proto/BookkeeperProtocol.proto     |  14 +-
 .../org/apache/bookkeeper/auth/TestAuth.java    | 654 +++++++++++++++
 .../proto/TestBackwardCompatCMS42.java          | 239 ++++++
 .../bookkeeper/proto/TestDataFormats.java       | 126 +++
 .../proto/TestPerChannelBookieClient.java       |  25 +-
 .../test/BookKeeperClusterTestCase.java         |  10 +
 .../src/test/proto/TestDataFormats.proto        |  34 +
 22 files changed, 2803 insertions(+), 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/pom.xml
----------------------------------------------------------------------
diff --git a/bookkeeper-server/pom.xml b/bookkeeper-server/pom.xml
index a1a74e0..eb67fd8 100644
--- a/bookkeeper-server/pom.xml
+++ b/bookkeeper-server/pom.xml
@@ -262,6 +262,7 @@
             <!-- exclude generated file //-->
             <exclude>**/DataFormats.java</exclude>
             <exclude>**/BookkeeperProtocol.java</exclude>
+            <exclude>**/TestDataFormats.java</exclude>
           </excludes>
         </configuration>
       </plugin>
@@ -324,6 +325,14 @@
                       <arg value="--java_out=src/main/java" />
                       <arg value="src/main/proto/DataFormats.proto" />
                     </exec>
+                    <exec executable="protoc" failonerror="true">
+                      <arg value="--java_out=src/main/java" />
+                      <arg value="src/main/proto/BookkeeperProtocol.proto" />
+                    </exec>
+                    <exec executable="protoc" failonerror="true">
+                      <arg value="--java_out=src/test/java" />
+                      <arg value="src/test/proto/TestDataFormats.proto" />
+                    </exec>
                   </target>
                 </configuration>
                 <goals>

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/AuthProviderFactoryFactory.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/AuthProviderFactoryFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/AuthProviderFactoryFactory.java
new file mode 100644
index 0000000..d05c475
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/AuthProviderFactoryFactory.java
@@ -0,0 +1,111 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.auth;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage;
+import org.apache.bookkeeper.util.ReflectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.protobuf.ExtensionRegistry;
+
+
+
+public class AuthProviderFactoryFactory {
+    static Logger LOG = LoggerFactory.getLogger(AuthProviderFactoryFactory.class);
+
+    public static BookieAuthProvider.Factory newBookieAuthProviderFactory(ServerConfiguration conf,
+                                                                          ExtensionRegistry registry) throws IOException {
+        String factoryClassName = conf.getBookieAuthProviderFactoryClass();
+
+        if (factoryClassName == null || factoryClassName.length() == 0) {
+            return new NullBookieAuthProviderFactory();
+        }
+
+        BookieAuthProvider.Factory factory = ReflectionUtils.newInstance(factoryClassName,
+                                                                         BookieAuthProvider.Factory.class);
+        factory.init(conf, registry);
+        return factory;
+    }
+
+    public static ClientAuthProvider.Factory newClientAuthProviderFactory(ClientConfiguration conf,
+                                                                          ExtensionRegistry registry) throws IOException {
+        String factoryClassName = conf.getClientAuthProviderFactoryClass();
+
+        if (factoryClassName == null || factoryClassName.length() == 0) {
+            return new NullClientAuthProviderFactory();
+        }
+
+        ClientAuthProvider.Factory factory = ReflectionUtils.newInstance(factoryClassName,
+                                                                         ClientAuthProvider.Factory.class);
+        factory.init(conf, registry);
+        return factory;
+    }
+
+    private final static String nullPluginName = "NULLPlugin";
+
+    private static class NullBookieAuthProviderFactory implements BookieAuthProvider.Factory {
+        @Override
+        public String getPluginName() {
+            return nullPluginName;
+        }
+
+        @Override
+        public void init(ServerConfiguration conf, ExtensionRegistry registry) {}
+
+        @Override
+        public BookieAuthProvider newProvider(InetSocketAddress addr,
+                                              GenericCallback<Void> completeCb) {
+            completeCb.operationComplete(BKException.Code.OK, null);
+            return new BookieAuthProvider() {
+                public void process(AuthMessage m, GenericCallback<AuthMessage> cb) {}
+            };
+        }
+    }
+
+    private static class NullClientAuthProviderFactory implements ClientAuthProvider.Factory {
+        @Override
+        public String getPluginName() {
+            return nullPluginName;
+        }
+
+        @Override
+        public void init(ClientConfiguration conf, ExtensionRegistry registry) {}
+
+        @Override
+        public ClientAuthProvider newProvider(InetSocketAddress addr,
+                                              GenericCallback<Void> completeCb) {
+            completeCb.operationComplete(BKException.Code.OK, null);
+            return new ClientAuthProvider() {
+                public void init(GenericCallback<AuthMessage> cb) {}
+                public void process(AuthMessage m, GenericCallback<AuthMessage> cb) {}
+            };
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/BookieAuthProvider.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/BookieAuthProvider.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/BookieAuthProvider.java
new file mode 100644
index 0000000..4fb7d07
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/BookieAuthProvider.java
@@ -0,0 +1,83 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.auth;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage;
+
+import com.google.protobuf.ExtensionRegistry;
+
+/**
+ * Bookie authentication provider interface.
+ * This must be implemented by any party wishing to implement
+ * an authentication mechanism for bookkeeper connections.
+ */
+public interface BookieAuthProvider {
+    interface Factory {
+        /**
+         * Initialize the factory with the server configuration
+         * and protobuf message registry. Implementors must
+         * add any extention messages which contain the auth
+         * payload, so that the server can decode auth messages
+         * it receives from the client.
+         */
+        void init(ServerConfiguration conf,
+                  ExtensionRegistry registry) throws IOException;
+
+        /**
+         * Create a new instance of a bookie auth provider.
+         * Each connection should get its own instance, as they
+         * can hold connection specific state.
+         * The completeCb is used to notify the server that
+         * the authentication handshake is complete.
+         * CompleteCb should be called only once.
+         * If the authentication was successful, BKException.Code.OK
+         * should be passed as the return code. Otherwise, another
+         * error code should be passed.
+         * If authentication fails, the server will close the
+         * connection.
+         * @param addr the address of the client being authenticated
+         * @param completeCb callback to be notified when authentication
+         *                   is complete.
+         */
+        BookieAuthProvider newProvider(InetSocketAddress addr,
+                                       GenericCallback<Void> completeCb);
+
+        /**
+         * Get Auth provider plugin name.
+         * Used as a sanity check to ensure that the bookie and the client.
+         * are using the same auth provider.
+         */
+        String getPluginName();
+    }
+
+    /**
+     * Process a request from the client. cb will receive the next
+     * message to be sent to the client. If there are no more messages
+     * to send to the client, cb should not be called, and completeCb
+     * must be called instead.
+     */
+    void process(AuthMessage m, GenericCallback<AuthMessage> cb);
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/ClientAuthProvider.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/ClientAuthProvider.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/ClientAuthProvider.java
new file mode 100644
index 0000000..fba2264
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/ClientAuthProvider.java
@@ -0,0 +1,89 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.auth;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage;
+
+import com.google.protobuf.ExtensionRegistry;
+
+/**
+ * Client authentication provider interface.
+ * This must be implemented by any party wishing to implement
+ * an authentication mechanism for bookkeeper connections.
+ */
+public interface ClientAuthProvider {
+    interface Factory {
+        /**
+         * Initialize the factory with the client configuration
+         * and protobuf message registry. Implementors must
+         * add any extention messages which contain the auth
+         * payload, so that the client can decode auth messages
+         * it receives from the server.
+         */
+        void init(ClientConfiguration conf,
+                  ExtensionRegistry registry) throws IOException;
+
+        /**
+         * Create a new instance of a client auth provider.
+         * Each connection should get its own instance, as they
+         * can hold connection specific state.
+         * The completeCb is used to notify the client that
+         * the authentication handshake is complete.
+         * CompleteCb should be called only once.
+         * If the authentication was successful, BKException.Code.OK
+         * should be passed as the return code. Otherwise, another
+         * error code should be passed.
+         * @param addr the address of the socket being authenticated
+         * @param completeCb callback to be notified when authentication
+         *                   is complete.
+         */
+        ClientAuthProvider newProvider(InetSocketAddress addr,
+                                       GenericCallback<Void> completeCb);
+
+        /**
+         * Get Auth provider plugin name.
+         * Used as a sanity check to ensure that the bookie and the client.
+         * are using the same auth provider.
+         */
+        String getPluginName();
+    }
+
+    /**
+     * Initiate the authentication. cb will receive the initial
+     * authentication message which should be sent to the server.
+     * cb may not be called if authentication is not requires. In
+     * this case, completeCb should be called.
+     */
+    void init(GenericCallback<AuthMessage> cb);
+
+    /**
+     * Process a response from the server. cb will receive the next
+     * message to be sent to the server. If there are no more messages
+     * to send to the server, cb should not be called, and completeCb
+     * must be called instead.
+     */
+    void process(AuthMessage m, GenericCallback<AuthMessage> cb);
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
index bc487f6..1946069 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
@@ -208,8 +208,8 @@ class PendingAddOp implements WriteCallback, TimerTask {
             lh.handleUnrecoverableErrorDuringAdd(rc);
             return;
         default:
-            LOG.warn("Write did not succeed: L{} E{} on {}",
-                     new Object[] { ledgerId, entryId, addr });
+            LOG.warn("Write did not succeed: L{} E{} on {}, rc = {}",
+                     new Object[] { ledgerId, entryId, addr, rc });
             lh.handleBookieFailure(addr, bookieIndex);
             return;
         }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
index d0750d3..b8d738b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
@@ -81,6 +81,9 @@ public class ClientConfiguration extends AbstractConfiguration {
     protected final static String ENABLE_TASK_EXECUTION_STATS = "enableTaskExecutionStats";
     protected final static String TASK_EXECUTION_WARN_TIME_MICROS = "taskExecutionWarnTimeMicros";
 
+    // Client auth provider factory class name
+    protected final static String CLIENT_AUTH_PROVIDER_FACTORY_CLASS = "clientAuthProviderFactoryClass";
+
     /**
      * Construct a default client-side configuration
      */
@@ -700,7 +703,7 @@ public class ClientConfiguration extends AbstractConfiguration {
 
     /**
      * Check if bookie health check is enabled.
-     * 
+     *
      * @return
      */
     public boolean isBookieHealthCheckEnabled() {
@@ -709,15 +712,15 @@ public class ClientConfiguration extends AbstractConfiguration {
 
     /**
      * Enables the bookie health check.
-     * 
+     *
      * <p>
      * If the number of read/write errors for a bookie exceeds {@link #getBookieErrorThresholdPerInterval()} per
      * interval, that bookie is quarantined for {@link #getBookieQuarantineTimeSeconds()} seconds. During this
      * quarantined period, the client will try not to use this bookie when creating new ensembles.
      * </p>
-     * 
+     *
      * By default, the bookie health check is <b>disabled</b>.
-     * 
+     *
      * @return client configuration
      */
     public ClientConfiguration enableBookieHealthCheck() {
@@ -727,7 +730,7 @@ public class ClientConfiguration extends AbstractConfiguration {
 
     /**
      * Get the bookie health check interval in seconds.
-     * 
+     *
      * @return
      */
     public int getBookieHealthCheckIntervalSeconds() {
@@ -736,11 +739,11 @@ public class ClientConfiguration extends AbstractConfiguration {
 
     /**
      * Set the bookie health check interval. Default is 60 seconds.
-     * 
+     *
      * <p>
      * Note: Please {@link #enableBookieHealthCheck()} to use this configuration.
      * </p>
-     * 
+     *
      * @param interval
      * @param unit
      * @return client configuration
@@ -752,7 +755,7 @@ public class ClientConfiguration extends AbstractConfiguration {
 
     /**
      * Get the error threshold for a bookie to be quarantined.
-     * 
+     *
      * @return
      */
     public long getBookieErrorThresholdPerInterval() {
@@ -762,11 +765,11 @@ public class ClientConfiguration extends AbstractConfiguration {
     /**
      * Set the error threshold per interval ({@link #getBookieHealthCheckIntervalSeconds()}) for a bookie before it is
      * quarantined. Default is 100 errors per minute.
-     * 
+     *
      * <p>
      * Note: Please {@link #enableBookieHealthCheck()} to use this configuration.
      * </p>
-     * 
+     *
      * @param threshold
      * @param unit
      * @return client configuration
@@ -778,7 +781,7 @@ public class ClientConfiguration extends AbstractConfiguration {
 
     /**
      * Get the time for which a bookie will be quarantined.
-     * 
+     *
      * @return
      */
     public int getBookieQuarantineTimeSeconds() {
@@ -787,11 +790,11 @@ public class ClientConfiguration extends AbstractConfiguration {
 
     /**
      * Set the time for which a bookie will be quarantined. Default is 30 minutes.
-     * 
+     *
      * <p>
      * Note: Please {@link #enableBookieHealthCheck()} to use this configuration.
      * </p>
-     * 
+     *
      * @param quarantineTime
      * @param unit
      * @return client configuration
@@ -800,4 +803,28 @@ public class ClientConfiguration extends AbstractConfiguration {
         setProperty(BOOKIE_QUARANTINE_TIME_SECONDS, unit.toSeconds(quarantineTime));
         return this;
     }
+
+    /**
+     * Set the client authentication provider factory class name.
+     * If this is not set, no authentication will be used
+     *
+     * @param factoryClass
+     *          the client authentication provider factory class name
+     * @return client configuration
+     */
+    public ClientConfiguration setClientAuthProviderFactoryClass(
+            String factoryClass) {
+        setProperty(CLIENT_AUTH_PROVIDER_FACTORY_CLASS, factoryClass);
+        return this;
+    }
+
+    /**
+     * Get the client authentication provider factory class name. If this returns null, no authentication will take
+     * place.
+     *
+     * @return the client authentication provider factory class name or null.
+     */
+    public String getClientAuthProviderFactoryClass() {
+        return getString(CLIENT_AUTH_PROVIDER_FACTORY_CLASS, null);
+    }
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index 76e5037..d770650 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -122,6 +122,9 @@ public class ServerConfiguration extends AbstractConfiguration {
 
     protected final static String LEDGER_STORAGE_CLASS = "ledgerStorageClass";
 
+    // Bookie auth provider factory class name
+    protected final static String BOOKIE_AUTH_PROVIDER_FACTORY_CLASS = "bookieAuthProviderFactoryClass";
+
     /**
      * Construct a default configuration object
      */
@@ -1566,4 +1569,25 @@ public class ServerConfiguration extends AbstractConfiguration {
         }
     }
 
+    /*
+     * Set the bookie authentication provider factory class name.
+     * If this is not set, no authentication will be used
+     *
+     * @param factoryClass
+     *          the bookie authentication provider factory class name
+     * @return void
+     */
+    public void setBookieAuthProviderFactoryClass(String factoryClass) {
+        setProperty(BOOKIE_AUTH_PROVIDER_FACTORY_CLASS, factoryClass);
+    }
+
+    /**
+     * Get the bookie authentication provider factory class name.
+     * If this returns null, no authentication will take place.
+     *
+     * @return the bookie authentication provider factory class name or null.
+     */
+    public String getBookieAuthProviderFactoryClass() {
+        return getString(BOOKIE_AUTH_PROVIDER_FACTORY_CLASS, null);
+    }
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
new file mode 100644
index 0000000..522bc0b
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
@@ -0,0 +1,356 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.proto;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.jboss.netty.channel.SimpleChannelHandler;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.DefaultExceptionEvent;
+import org.jboss.netty.channel.ExceptionEvent;
+
+import org.apache.bookkeeper.auth.BookieAuthProvider;
+import org.apache.bookkeeper.auth.ClientAuthProvider;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.AddRequest;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.AddResponse;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.OperationType;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class AuthHandler {
+    static final Logger LOG = LoggerFactory.getLogger(AuthHandler.class);
+
+    static class ServerSideHandler extends SimpleChannelHandler {
+        volatile boolean authenticated = false;
+        final BookieAuthProvider.Factory authProviderFactory;
+        BookieAuthProvider authProvider;
+
+        ServerSideHandler(BookieAuthProvider.Factory authProviderFactory) {
+            this.authProviderFactory = authProviderFactory;
+            authProvider = null;
+        }
+
+        @Override
+        public void channelOpen(ChannelHandlerContext ctx,
+                                ChannelStateEvent e) throws Exception {
+            LOG.info("Channel open {}", ctx.getChannel());
+            SocketAddress remote  = ctx.getChannel().getRemoteAddress();
+            if (remote instanceof InetSocketAddress) {
+                authProvider = authProviderFactory.newProvider((InetSocketAddress)remote,
+                        new AuthHandshakeCompleteCallback());
+            } else {
+                LOG.error("Unknown socket type {} for {}", remote.getClass(), remote);
+            }
+            super.channelOpen(ctx, e);
+        }
+
+        @Override
+        public void messageReceived(ChannelHandlerContext ctx,
+                                    MessageEvent e)
+                throws Exception {
+            if (authProvider == null) {
+                // close the channel, authProvider should only be
+                // null if the other end of line is an InetSocketAddress
+                // anything else is strange, and we don't want to deal
+                // with it
+                ctx.getChannel().close();
+                return;
+            }
+
+            Object event = e.getMessage();
+            if (authenticated) {
+                super.messageReceived(ctx, e);
+            } else if (event instanceof BookieProtocol.AuthRequest) { // pre-PB-client
+                BookieProtocol.AuthRequest req = (BookieProtocol.AuthRequest)event;
+                assert (req.getOpCode() == BookieProtocol.AUTH);
+                if (checkAuthPlugin(req.getAuthMessage(), ctx.getChannel())) {
+                    authProvider.process(req.getAuthMessage(),
+                                new AuthResponseCallbackLegacy(req, ctx.getChannel()));
+                } else {
+                    ctx.getChannel().close();
+                }
+            } else if (event instanceof BookieProtocol.Request) {
+                BookieProtocol.Request req = (BookieProtocol.Request)event;
+                if (req.getOpCode() == BookieProtocol.ADDENTRY) {
+                    ctx.getChannel().write(
+                            new BookieProtocol.AddResponse(
+                                    req.getProtocolVersion(), BookieProtocol.EUA,
+                                    req.getLedgerId(), req.getEntryId()));
+                } else if (req.getOpCode() == BookieProtocol.READENTRY) {
+                    ctx.getChannel().write(
+                            new BookieProtocol.ReadResponse(
+                                    req.getProtocolVersion(), BookieProtocol.EUA,
+                                    req.getLedgerId(), req.getEntryId()));
+                } else {
+                    ctx.getChannel().close();
+                }
+            } else if (event instanceof BookkeeperProtocol.Request) { // post-PB-client
+                BookkeeperProtocol.Request req = (BookkeeperProtocol.Request)event;
+                if (req.getHeader().getOperation() == BookkeeperProtocol.OperationType.AUTH
+                        && req.hasAuthRequest()
+                        && checkAuthPlugin(req.getAuthRequest(), ctx.getChannel())) {
+                    authProvider.process(req.getAuthRequest(),
+                                         new AuthResponseCallback(req, ctx.getChannel()));
+                } else {
+                    BookkeeperProtocol.Response.Builder builder
+                        = BookkeeperProtocol.Response.newBuilder()
+                        .setHeader(req.getHeader())
+                        .setStatus(BookkeeperProtocol.StatusCode.EUA);
+
+                    ctx.getChannel().write(builder.build());
+                }
+            } else {
+                // close the channel, junk coming over it
+                ctx.getChannel().close();
+            }
+        }
+
+        private boolean checkAuthPlugin(AuthMessage am, final Channel src) {
+            if (!am.hasAuthPluginName()
+                || !am.getAuthPluginName().equals(authProviderFactory.getPluginName())) {
+                LOG.error("Received message from incompatible auth plugin. Local = {},"
+                          + " Remote = {}, Channel = {}",
+                          authProviderFactory.getPluginName(), am.getAuthPluginName());
+                return false;
+            }
+            return true;
+        }
+
+        static class AuthResponseCallbackLegacy implements GenericCallback<AuthMessage> {
+            final BookieProtocol.AuthRequest req;
+            final Channel channel;
+
+            AuthResponseCallbackLegacy(BookieProtocol.AuthRequest req, Channel channel) {
+                this.req = req;
+                this.channel = channel;
+            }
+
+            public void operationComplete(int rc, AuthMessage newam) {
+                if (rc != BKException.Code.OK) {
+                    LOG.error("Error processing auth message, closing connection");
+                    channel.close();
+                    return;
+                }
+                channel.write(new BookieProtocol.AuthResponse(req.getProtocolVersion(),
+                                                              newam));
+            }
+        }
+
+        static class AuthResponseCallback implements GenericCallback<AuthMessage> {
+            final BookkeeperProtocol.Request req;
+            final Channel channel;
+
+            AuthResponseCallback(BookkeeperProtocol.Request req, Channel channel) {
+                this.req = req;
+                this.channel = channel;
+            }
+
+            public void operationComplete(int rc, AuthMessage newam) {
+                BookkeeperProtocol.Response.Builder builder
+                    = BookkeeperProtocol.Response.newBuilder()
+                    .setHeader(req.getHeader());
+
+                if (rc != BKException.Code.OK) {
+                    LOG.error("Error processing auth message, closing connection");
+
+                    builder.setStatus(BookkeeperProtocol.StatusCode.EUA);
+                    channel.write(builder.build());
+                    channel.close();
+                    return;
+                } else {
+                    builder.setStatus(BookkeeperProtocol.StatusCode.EOK)
+                        .setAuthResponse(newam);
+                    channel.write(builder.build());
+                }
+            }
+        }
+
+        class AuthHandshakeCompleteCallback implements GenericCallback<Void> {
+            @Override
+            public void operationComplete(int rc, Void v) {
+                if (rc == BKException.Code.OK) {
+                    authenticated = true;
+                } else {
+                    LOG.debug("Authentication failed on server side");
+                }
+            }
+        }
+    }
+
+    static class ClientSideHandler extends SimpleChannelHandler {
+        volatile boolean authenticated = false;
+        final ClientAuthProvider.Factory authProviderFactory;
+        ClientAuthProvider authProvider;
+        AtomicLong transactionIdGenerator;
+        Queue<MessageEvent> waitingForAuth = new ConcurrentLinkedQueue<MessageEvent>();
+
+        ClientSideHandler(ClientAuthProvider.Factory authProviderFactory,
+                          AtomicLong transactionIdGenerator) {
+            this.authProviderFactory = authProviderFactory;
+            this.transactionIdGenerator = transactionIdGenerator;
+            authProvider = null;
+        }
+
+        @Override
+        public void channelConnected(ChannelHandlerContext ctx,
+                                     ChannelStateEvent e)
+                throws Exception {
+            SocketAddress remote  = ctx.getChannel().getRemoteAddress();
+            if (remote instanceof InetSocketAddress) {
+                authProvider = authProviderFactory.newProvider((InetSocketAddress)remote,
+                        new AuthHandshakeCompleteCallback(ctx));
+                authProvider.init(new AuthRequestCallback(ctx));
+            } else {
+                LOG.error("Unknown socket type {} for {}", remote.getClass(), remote);
+            }
+            super.channelConnected(ctx, e);
+        }
+
+        @Override
+        public void messageReceived(ChannelHandlerContext ctx,
+                                    MessageEvent e)
+                throws Exception {
+            assert (authProvider != null);
+
+            Object event = e.getMessage();
+
+            if (authenticated) {
+                super.messageReceived(ctx, e);
+            } else if (event instanceof BookkeeperProtocol.Response) {
+                BookkeeperProtocol.Response resp = (BookkeeperProtocol.Response)event;
+                if (resp.getHeader().getOperation() == BookkeeperProtocol.OperationType.AUTH) {
+                    if (resp.getStatus() != BookkeeperProtocol.StatusCode.EOK) {
+                        authenticationError(ctx, resp.getStatus().getNumber());
+                    } else {
+                        assert (resp.hasAuthResponse());
+                        BookkeeperProtocol.AuthMessage am = resp.getAuthResponse();
+                        authProvider.process(am, new AuthRequestCallback(ctx));
+                    }
+                } else {
+                    // else just drop the message,
+                    // we're not authenticated so nothing should be coming through
+                }
+            }
+        }
+
+        @Override
+        public void writeRequested(ChannelHandlerContext ctx,
+                                   MessageEvent e)
+                throws Exception {
+            synchronized (waitingForAuth) {
+                if (authenticated) {
+                    super.writeRequested(ctx, e);
+                } else if (e.getMessage() instanceof BookkeeperProtocol.Request) {
+                    // let auth messages through, queue the rest
+                    BookkeeperProtocol.Request req = (BookkeeperProtocol.Request)e.getMessage();
+                    if (req.getHeader().getOperation()
+                            == BookkeeperProtocol.OperationType.AUTH) {
+                        super.writeRequested(ctx, e);
+                    } else {
+                        waitingForAuth.add(e);
+                    }
+                } // else just drop
+            }
+        }
+
+        long newTxnId() {
+            return transactionIdGenerator.incrementAndGet();
+        }
+
+        void authenticationError(ChannelHandlerContext ctx, int errorCode) {
+            LOG.error("Error processing auth message, erroring connection {}", errorCode);
+            ctx.sendUpstream(new DefaultExceptionEvent(ctx.getChannel(),
+                                     new AuthenticationException(
+                                             "Auth failed with error " + errorCode)));
+        }
+
+        class AuthRequestCallback implements GenericCallback<AuthMessage> {
+            Channel channel;
+            ChannelHandlerContext ctx;
+
+            AuthRequestCallback(ChannelHandlerContext ctx) {
+                this.channel = ctx.getChannel();
+                this.ctx = ctx;
+            }
+
+            public void operationComplete(int rc, AuthMessage newam) {
+                if (rc != BKException.Code.OK) {
+                    authenticationError(ctx, rc);
+                    return;
+                }
+
+                BookkeeperProtocol.BKPacketHeader header
+                    = BookkeeperProtocol.BKPacketHeader.newBuilder()
+                    .setVersion(BookkeeperProtocol.ProtocolVersion.VERSION_THREE)
+                    .setOperation(BookkeeperProtocol.OperationType.AUTH)
+                    .setTxnId(newTxnId()).build();
+                BookkeeperProtocol.Request.Builder builder
+                    = BookkeeperProtocol.Request.newBuilder()
+                    .setHeader(header)
+                    .setAuthRequest(newam);
+
+                channel.write(builder.build());
+            }
+        }
+
+        class AuthHandshakeCompleteCallback implements GenericCallback<Void> {
+            ChannelHandlerContext ctx;
+            AuthHandshakeCompleteCallback(ChannelHandlerContext ctx) {
+                this.ctx = ctx;
+            }
+
+            @Override
+            public void operationComplete(int rc, Void v) {
+                if (rc == BKException.Code.OK) {
+                    synchronized (waitingForAuth) {
+                        authenticated = true;
+                        MessageEvent e = waitingForAuth.poll();
+                        while (e != null) {
+                            ctx.sendDownstream(e);
+                            e = waitingForAuth.poll();
+                        }
+                    }
+                } else {
+                    authenticationError(ctx, rc);
+                    LOG.debug("Authentication failed on server side");
+                }
+            }
+        }
+    }
+
+    static class AuthenticationException extends IOException {
+        AuthenticationException(String reason) {
+            super(reason);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
index 8a79547..d0052d6 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
@@ -28,8 +28,11 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import org.apache.bookkeeper.auth.AuthProviderFactoryFactory;
+import org.apache.bookkeeper.auth.ClientAuthProvider;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.net.BookieSocketAddress;
@@ -52,6 +55,7 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.protobuf.ExtensionRegistry;
 
 /**
  * Implements the client-side part of the BookKeeper protocol.
@@ -60,11 +64,18 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
 public class BookieClient implements PerChannelBookieClientFactory {
     static final Logger LOG = LoggerFactory.getLogger(BookieClient.class);
 
-    final OrderedSafeExecutor executor;
-    final ClientSocketChannelFactory channelFactory;
+    // This is global state that should be across all BookieClients
+    AtomicLong totalBytesOutstanding = new AtomicLong();
+
+    OrderedSafeExecutor executor;
+    ClientSocketChannelFactory channelFactory;
     final ConcurrentHashMap<BookieSocketAddress, PerChannelBookieClientPool> channels =
             new ConcurrentHashMap<BookieSocketAddress, PerChannelBookieClientPool>();
     final HashedWheelTimer requestTimer;
+
+    final private ClientAuthProvider.Factory authProviderFactory;
+    final private ExtensionRegistry registry;
+
     private final ClientConfiguration conf;
     private volatile boolean closed;
     private final ReentrantReadWriteLock closeLock;
@@ -73,17 +84,22 @@ public class BookieClient implements PerChannelBookieClientFactory {
 
     private final long bookieErrorThresholdPerInterval;
 
-    public BookieClient(ClientConfiguration conf, ClientSocketChannelFactory channelFactory, OrderedSafeExecutor executor) {
+    public BookieClient(ClientConfiguration conf, ClientSocketChannelFactory channelFactory,
+            OrderedSafeExecutor executor) throws IOException {
         this(conf, channelFactory, executor, NullStatsLogger.INSTANCE);
     }
 
-    public BookieClient(ClientConfiguration conf, ClientSocketChannelFactory channelFactory, OrderedSafeExecutor executor,
-                        StatsLogger statsLogger) {
+    public BookieClient(ClientConfiguration conf, ClientSocketChannelFactory channelFactory,
+                        OrderedSafeExecutor executor, StatsLogger statsLogger) throws IOException {
         this.conf = conf;
         this.channelFactory = channelFactory;
         this.executor = executor;
         this.closed = false;
         this.closeLock = new ReentrantReadWriteLock();
+
+        this.registry = ExtensionRegistry.newInstance();
+        this.authProviderFactory = AuthProviderFactoryFactory.newClientAuthProviderFactory(conf, registry);
+
         this.statsLogger = statsLogger;
         this.numConnectionsPerBookie = conf.getNumChannelsPerBookie();
         this.requestTimer = new HashedWheelTimer(
@@ -120,8 +136,8 @@ public class BookieClient implements PerChannelBookieClientFactory {
 
     @Override
     public PerChannelBookieClient create(BookieSocketAddress address, PerChannelBookieClientPool pcbcPool) {
-        return new PerChannelBookieClient(conf, executor, channelFactory, address,
-                                          requestTimer, statsLogger, pcbcPool);
+        return new PerChannelBookieClient(conf, executor, channelFactory, address, requestTimer, statsLogger,
+                authProviderFactory, registry, pcbcPool);
     }
 
     private PerChannelBookieClientPool lookupClient(BookieSocketAddress addr, Object key) {
@@ -133,7 +149,7 @@ public class BookieClient implements PerChannelBookieClientFactory {
                     return null;
                 }
                 PerChannelBookieClientPool newClientPool =
-                        new DefaultPerChannelBookieClientPool(this, addr, numConnectionsPerBookie);
+                    new DefaultPerChannelBookieClientPool(this, addr, numConnectionsPerBookie);
                 PerChannelBookieClientPool oldClientPool = channels.putIfAbsent(addr, newClientPool);
                 if (null == oldClientPool) {
                     clientPool = newClientPool;

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
index b623998..bb1b207 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
@@ -28,6 +28,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.bookie.BookieException;
 import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.auth.BookieAuthProvider;
+import org.apache.bookkeeper.auth.AuthProviderFactoryFactory;
 import org.apache.bookkeeper.processor.RequestProcessor;
 import org.apache.zookeeper.KeeperException;
 import org.jboss.netty.bootstrap.ServerBootstrap;
@@ -48,6 +50,7 @@ import org.jboss.netty.handler.codec.frame.LengthFieldPrepender;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.protobuf.ExtensionRegistry;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
@@ -66,11 +69,21 @@ class BookieNettyServer {
     Object suspensionLock = new Object();
     boolean suspended = false;
 
+    final BookieAuthProvider.Factory authProviderFactory;
+    final BookieProtoEncoding.ResponseEncoder responseEncoder;
+    final BookieProtoEncoding.RequestDecoder requestDecoder;
+
     BookieNettyServer(ServerConfiguration conf, RequestProcessor processor)
             throws IOException, KeeperException, InterruptedException, BookieException  {
         this.conf = conf;
         this.requestProcessor = processor;
 
+        ExtensionRegistry registry = ExtensionRegistry.newInstance();
+        authProviderFactory = AuthProviderFactoryFactory.newBookieAuthProviderFactory(conf, registry);
+
+        responseEncoder = new BookieProtoEncoding.ResponseEncoder(registry);
+        requestDecoder = new BookieProtoEncoding.RequestDecoder(registry);
+
         ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
         String base = "bookie-" + conf.getBookiePort() + "-netty";
         serverChannelFactory = new NioServerSocketChannelFactory(
@@ -140,11 +153,15 @@ class BookieNettyServer {
                              new LengthFieldBasedFrameDecoder(maxMessageSize, 0, 4, 0, 4));
             pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
 
-            pipeline.addLast("bookieProtoDecoder", new BookieProtoEncoding.RequestDecoder());
-            pipeline.addLast("bookieProtoEncoder", new BookieProtoEncoding.ResponseEncoder());
+            pipeline.addLast("bookieProtoDecoder", requestDecoder);
+            pipeline.addLast("bookieProtoEncoder", responseEncoder);
+            pipeline.addLast("bookieAuthHandler",
+                             new AuthHandler.ServerSideHandler(authProviderFactory));
+
             SimpleChannelHandler requestHandler = isRunning.get() ?
                     new BookieRequestHandler(conf, requestProcessor, allChannels)
                     : new RejectRequestHandler();
+
             pipeline.addLast("bookieRequestHandler", requestHandler);
             return pipeline;
         }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
index 6ece56e..683a6fb 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
@@ -20,29 +20,26 @@
  */
 package org.apache.bookkeeper.proto;
 
+import com.google.protobuf.ByteString;
+import com.google.protobuf.ExtensionRegistry;
 import com.google.protobuf.InvalidProtocolBufferException;
+
 import org.jboss.netty.buffer.ChannelBufferFactory;
 import org.jboss.netty.buffer.ChannelBufferInputStream;
+import org.jboss.netty.buffer.ChannelBufferOutputStream;
 import org.jboss.netty.buffer.ChannelBuffers;
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelHandlerContext;
-
 import org.apache.bookkeeper.proto.BookieProtocol.PacketHeader;
 import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
 import org.jboss.netty.handler.codec.oneone.OneToOneDecoder;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class BookieProtoEncoding {
     private final static Logger LOG = LoggerFactory.getLogger(BookieProtoEncoding.class);
 
-    static final EnDecoder REQ_PREV3 = new RequestEnDeCoderPreV3();
-    static final EnDecoder REP_PREV3 = new ResponseEnDeCoderPreV3();
-    static final EnDecoder REQ_V3 = new RequestEnDecoderV3();
-    static final EnDecoder REP_V3 = new ResponseEnDecoderV3();
-
     static interface EnDecoder {
 
         /**
@@ -68,6 +65,12 @@ public class BookieProtoEncoding {
     }
 
     static class RequestEnDeCoderPreV3 implements EnDecoder {
+        final ExtensionRegistry extensionRegistry;
+
+        RequestEnDeCoderPreV3(ExtensionRegistry extensionRegistry) {
+            this.extensionRegistry = extensionRegistry;
+        }
+
         @Override
         public Object encode(Object msg, ChannelBufferFactory bufferFactory)
                 throws Exception {
@@ -83,8 +86,7 @@ public class BookieProtoEncoding {
                 buf.writeInt(new PacketHeader(r.getProtocolVersion(), r.getOpCode(), r.getFlags()).toInt());
                 buf.writeBytes(r.getMasterKey(), 0, BookieProtocol.MASTER_KEY_LENGTH);
                 return ChannelBuffers.wrappedBuffer(buf, ar.getData());
-            } else {
-                assert(r instanceof BookieProtocol.ReadRequest);
+            } else if (r instanceof BookieProtocol.ReadRequest) {
                 int totalHeaderSize = 4 // for request type
                     + 8 // for ledgerId
                     + 8; // for entryId
@@ -101,6 +103,19 @@ public class BookieProtoEncoding {
                 }
 
                 return buf;
+            } else if (r instanceof BookieProtocol.AuthRequest) {
+                BookkeeperProtocol.AuthMessage am = ((BookieProtocol.AuthRequest)r).getAuthMessage();
+                int totalHeaderSize = 4; // for request type
+                int totalSize = totalHeaderSize + am.getSerializedSize();
+                ChannelBuffer buf = bufferFactory.getBuffer(totalSize);
+                buf.writeInt(new PacketHeader(r.getProtocolVersion(),
+                                              r.getOpCode(),
+                                              r.getFlags()).toInt());
+                ChannelBufferOutputStream bufStream = new ChannelBufferOutputStream(buf);
+                am.writeTo(bufStream);
+                return buf;
+            } else {
+                return msg;
             }
         }
 
@@ -141,12 +156,23 @@ public class BookieProtoEncoding {
                 } else {
                     return new BookieProtocol.ReadRequest(h.getVersion(), ledgerId, entryId, flags);
                 }
+            case BookieProtocol.AUTH:
+                BookkeeperProtocol.AuthMessage.Builder builder
+                    = BookkeeperProtocol.AuthMessage.newBuilder();
+                builder.mergeFrom(new ChannelBufferInputStream(packet), extensionRegistry);
+                return new BookieProtocol.AuthRequest(h.getVersion(), builder.build());
             }
             return packet;
         }
     }
 
     static class ResponseEnDeCoderPreV3 implements EnDecoder {
+        final ExtensionRegistry extensionRegistry;
+
+        ResponseEnDeCoderPreV3(ExtensionRegistry extensionRegistry) {
+            this.extensionRegistry = extensionRegistry;
+        }
+
         @Override
         public Object encode(Object msg, ChannelBufferFactory bufferFactory)
                 throws Exception {
@@ -157,12 +183,13 @@ public class BookieProtoEncoding {
             ChannelBuffer buf = bufferFactory.getBuffer(24);
             buf.writeInt(new PacketHeader(r.getProtocolVersion(),
                                           r.getOpCode(), (short)0).toInt());
-            buf.writeInt(r.getErrorCode());
-            buf.writeLong(r.getLedgerId());
-            buf.writeLong(r.getEntryId());
 
             ServerStats.getInstance().incrementPacketsSent();
             if (msg instanceof BookieProtocol.ReadResponse) {
+                buf.writeInt(r.getErrorCode());
+                buf.writeLong(r.getLedgerId());
+                buf.writeLong(r.getEntryId());
+
                 BookieProtocol.ReadResponse rr = (BookieProtocol.ReadResponse)r;
                 if (rr.hasData()) {
                     return ChannelBuffers.wrappedBuffer(buf,
@@ -171,7 +198,15 @@ public class BookieProtoEncoding {
                     return buf;
                 }
             } else if (msg instanceof BookieProtocol.AddResponse) {
+                buf.writeInt(r.getErrorCode());
+                buf.writeLong(r.getLedgerId());
+                buf.writeLong(r.getEntryId());
+
                 return buf;
+            } else if (msg instanceof BookieProtocol.AuthResponse) {
+                BookkeeperProtocol.AuthMessage am = ((BookieProtocol.AuthResponse)r).getAuthMessage();
+                return ChannelBuffers.wrappedBuffer(buf,
+                        ChannelBuffers.wrappedBuffer(am.toByteArray()));
             } else {
                 LOG.error("Cannot encode unknown response type {}", msg.getClass().getName());
                 return msg;
@@ -180,19 +215,23 @@ public class BookieProtoEncoding {
         @Override
         public Object decode(ChannelBuffer buffer)
                 throws Exception {
-            final int rc;
-            final long ledgerId, entryId;
+            int rc;
+            long ledgerId, entryId;
             final PacketHeader header;
 
             header = PacketHeader.fromInt(buffer.readInt());
-            rc = buffer.readInt();
-            ledgerId = buffer.readLong();
-            entryId = buffer.readLong();
 
             switch (header.getOpCode()) {
             case BookieProtocol.ADDENTRY:
+                rc = buffer.readInt();
+                ledgerId = buffer.readLong();
+                entryId = buffer.readLong();
                 return new BookieProtocol.AddResponse(header.getVersion(), rc, ledgerId, entryId);
             case BookieProtocol.READENTRY:
+                rc = buffer.readInt();
+                ledgerId = buffer.readLong();
+                entryId = buffer.readLong();
+
                 if (rc == BookieProtocol.EOK) {
                     return new BookieProtocol.ReadResponse(header.getVersion(), rc,
                                                            ledgerId, entryId, buffer.slice());
@@ -200,6 +239,13 @@ public class BookieProtoEncoding {
                     return new BookieProtocol.ReadResponse(header.getVersion(), rc,
                                                            ledgerId, entryId);
                 }
+            case BookieProtocol.AUTH:
+                ChannelBufferInputStream bufStream = new ChannelBufferInputStream(buffer);
+                BookkeeperProtocol.AuthMessage.Builder builder
+                    = BookkeeperProtocol.AuthMessage.newBuilder();
+                builder.mergeFrom(bufStream, extensionRegistry);
+                BookkeeperProtocol.AuthMessage am = builder.build();
+                return new BookieProtocol.AuthResponse(header.getVersion(), am);
             default:
                 return buffer;
             }
@@ -207,10 +253,16 @@ public class BookieProtoEncoding {
     }
 
     static class RequestEnDecoderV3 implements EnDecoder {
+        final ExtensionRegistry extensionRegistry;
+
+        RequestEnDecoderV3(ExtensionRegistry extensionRegistry) {
+            this.extensionRegistry = extensionRegistry;
+        }
 
         @Override
         public Object decode(ChannelBuffer packet) throws Exception {
-            return BookkeeperProtocol.Request.parseFrom(new ChannelBufferInputStream(packet));
+            return BookkeeperProtocol.Request.parseFrom(new ChannelBufferInputStream(packet),
+                                                        extensionRegistry);
         }
 
         @Override
@@ -222,10 +274,16 @@ public class BookieProtoEncoding {
     }
 
     static class ResponseEnDecoderV3 implements EnDecoder {
+        final ExtensionRegistry extensionRegistry;
+
+        ResponseEnDecoderV3(ExtensionRegistry extensionRegistry) {
+            this.extensionRegistry = extensionRegistry;
+        }
 
         @Override
         public Object decode(ChannelBuffer packet) throws Exception {
-            return BookkeeperProtocol.Response.parseFrom(new ChannelBufferInputStream(packet));
+            return BookkeeperProtocol.Response.parseFrom(new ChannelBufferInputStream(packet),
+                                                         extensionRegistry);
         }
 
         @Override
@@ -238,6 +296,14 @@ public class BookieProtoEncoding {
 
     public static class RequestEncoder extends OneToOneEncoder {
 
+        final EnDecoder REQ_PREV3;
+        final EnDecoder REQ_V3;
+
+        RequestEncoder(ExtensionRegistry extensionRegistry) {
+            REQ_PREV3 = new RequestEnDeCoderPreV3(extensionRegistry);
+            REQ_V3 = new RequestEnDecoderV3(extensionRegistry);
+        }
+
         @Override
         protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg)
                 throws Exception {
@@ -256,6 +322,13 @@ public class BookieProtoEncoding {
     }
 
     public static class RequestDecoder extends OneToOneDecoder {
+        final EnDecoder REQ_PREV3;
+        final EnDecoder REQ_V3;
+
+        RequestDecoder(ExtensionRegistry extensionRegistry) {
+            REQ_PREV3 = new RequestEnDeCoderPreV3(extensionRegistry);
+            REQ_V3 = new RequestEnDecoderV3(extensionRegistry);
+        }
 
         @Override
         protected Object decode(ChannelHandlerContext ctx, Channel channel, Object msg)
@@ -283,6 +356,13 @@ public class BookieProtoEncoding {
     }
 
     public static class ResponseEncoder extends OneToOneEncoder {
+        final EnDecoder REP_PREV3;
+        final EnDecoder REP_V3;
+
+        ResponseEncoder(ExtensionRegistry extensionRegistry) {
+            REP_PREV3 = new ResponseEnDeCoderPreV3(extensionRegistry);
+            REP_V3 = new ResponseEnDecoderV3(extensionRegistry);
+        }
 
         @Override
         protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg)
@@ -302,6 +382,13 @@ public class BookieProtoEncoding {
     }
 
     public static class ResponseDecoder extends OneToOneDecoder {
+        final EnDecoder REP_PREV3;
+        final EnDecoder REP_V3;
+
+        ResponseDecoder(ExtensionRegistry extensionRegistry) {
+            REP_PREV3 = new ResponseEnDeCoderPreV3(extensionRegistry);
+            REP_V3 = new ResponseEnDecoderV3(extensionRegistry);
+        }
 
         @Override
         protected Object decode(ChannelHandlerContext ctx, Channel channel, Object msg)

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
index 4dd26d6..2ce5ed8 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
@@ -24,6 +24,8 @@ package org.apache.bookkeeper.proto;
 import org.jboss.netty.buffer.ChannelBuffer;
 import java.nio.ByteBuffer;
 
+import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage;
+
 /**
  * The packets of the Bookie protocol all have a 4-byte integer indicating the
  * type of request or response at the very beginning of the packet followed by a
@@ -133,6 +135,13 @@ public interface BookieProtocol {
     public static final byte READENTRY = 2;
 
     /**
+     * Auth message. This code is for passing auth messages between the auth
+     * providers on the client and bookie. The message payload is determined
+     * by the auth providers themselves.
+     */
+    public static final byte AUTH = 3;
+
+    /**
      * The error code that indicates success
      */
     public static final int EOK = 0;
@@ -273,6 +282,19 @@ public interface BookieProtocol {
         }
     }
 
+    static class AuthRequest extends Request {
+        final AuthMessage authMessage;
+
+        AuthRequest(byte protocolVersion, AuthMessage authMessage) {
+            super(protocolVersion, AUTH, -1, -1, FLAG_NONE, null);
+            this.authMessage = authMessage;
+        }
+
+        AuthMessage getAuthMessage() {
+            return authMessage;
+        }
+    }
+
     static class Response {
         final byte protocolVersion;
         final byte opCode;
@@ -343,4 +365,18 @@ public interface BookieProtocol {
             super(protocolVersion, ADDENTRY, errorCode, ledgerId, entryId);
         }
     }
+
+    static class AuthResponse extends Response {
+        final AuthMessage authMessage;
+
+        AuthResponse(byte protocolVersion, AuthMessage authMessage) {
+            super(protocolVersion, AUTH, EOK, -1, -1);
+            this.authMessage = authMessage;
+        }
+
+        AuthMessage getAuthMessage() {
+            return authMessage;
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
index 9fec15f..1608328 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
@@ -124,6 +124,7 @@ public class BookieRequestProcessor implements RequestProcessor {
                     processReadRequestV3(r, c);
                     break;
                 default:
+                    LOG.info("Unknown operation type {}", header.getOperation());
                     BookkeeperProtocol.Response.Builder response =
                             BookkeeperProtocol.Response.newBuilder().setHeader(r.getHeader())
                             .setStatus(BookkeeperProtocol.StatusCode.EBADREQ);