You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2017/01/31 01:14:31 UTC

[2/2] bookkeeper git commit: BOOKKEEPER-959: ClientAuthProvider and BookieAuthProvider Public API used Protobuf Shaded classes

BOOKKEEPER-959: ClientAuthProvider and BookieAuthProvider Public API used Protobuf Shaded classes

This fix removes the explicit usage of protobuf from ClientAuthProvider and BookieAuthProvider API, since protobuf library is shaded and relocated on the distributed public version of BookKeeper

Author: eolivelli <eo...@gmail.com>

Reviewers: Sijie Guo <si...@apache.org>

Closes #67 from eolivelli/BOOKKEEPER-959


Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/13d668f2
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/13d668f2
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/13d668f2

Branch: refs/heads/master
Commit: 13d668f2208bf472e4938cfdfd2de912eaa11275
Parents: 26b09ab
Author: eolivelli <eo...@gmail.com>
Authored: Mon Jan 30 17:14:22 2017 -0800
Committer: Sijie Guo <si...@apache.org>
Committed: Mon Jan 30 17:14:22 2017 -0800

----------------------------------------------------------------------
 bookkeeper-server/pom.xml                       |   5 -
 .../apache/bookkeeper/auth/AuthCallbacks.java   |  28 ++
 .../auth/AuthProviderFactoryFactory.java        |  52 +--
 .../org/apache/bookkeeper/auth/AuthToken.java   |  44 +++
 .../bookkeeper/auth/BookKeeperPrincipal.java    |  72 ++++
 .../bookkeeper/auth/BookieAuthProvider.java     |  26 +-
 .../bookkeeper/auth/ClientAuthProvider.java     |  29 +-
 .../bookkeeper/bookie/BookieConnectionPeer.java |  30 ++
 .../bookkeeper/client/ClientConnectionPeer.java |  30 ++
 .../bookkeeper/conf/AbstractConfiguration.java  |  28 ++
 .../bookkeeper/conf/ClientConfiguration.java    |  45 ++-
 .../apache/bookkeeper/proto/AuthHandler.java    | 139 ++++---
 .../apache/bookkeeper/proto/BookieClient.java   |   3 +-
 .../bookkeeper/proto/BookieNettyServer.java     |  80 +++-
 .../proto/BookieRequestProcessor.java           |  20 +
 .../bookkeeper/proto/BookkeeperProtocol.java    | 138 +++++--
 .../apache/bookkeeper/proto/ConnectionPeer.java |  63 +++
 .../proto/PerChannelBookieClient.java           |  48 ++-
 .../apache/bookkeeper/replication/Auditor.java  |   5 +-
 .../src/main/proto/BookkeeperProtocol.proto     |   6 +-
 .../src/main/resources/findbugsExclude.xml      |   5 +
 .../org/apache/bookkeeper/auth/TestAuth.java    | 381 +++++++++++++------
 .../proto/TestBackwardCompatCMS42.java          |  45 +--
 .../bookkeeper/proto/TestDataFormats.java       | 149 --------
 .../proto/TestPerChannelBookieClient.java       |   2 +-
 .../replication/AuthAutoRecoveryTest.java       | 111 ++++++
 .../src/test/proto/TestDataFormats.proto        |  34 --
 27 files changed, 1115 insertions(+), 503 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/13d668f2/bookkeeper-server/pom.xml
----------------------------------------------------------------------
diff --git a/bookkeeper-server/pom.xml b/bookkeeper-server/pom.xml
index b3d6970..25de77a 100644
--- a/bookkeeper-server/pom.xml
+++ b/bookkeeper-server/pom.xml
@@ -279,7 +279,6 @@
             <!-- exclude generated file //-->
             <exclude>**/DataFormats.java</exclude>
             <exclude>**/BookkeeperProtocol.java</exclude>
-            <exclude>**/TestDataFormats.java</exclude>
           </excludes>
         </configuration>
       </plugin>
@@ -346,10 +345,6 @@
                       <arg value="--java_out=src/main/java" />
                       <arg value="src/main/proto/BookkeeperProtocol.proto" />
                     </exec>
-                    <exec executable="protoc" failonerror="true">
-                      <arg value="--java_out=src/test/java" />
-                      <arg value="src/test/proto/TestDataFormats.proto" />
-                    </exec>
                   </target>
                 </configuration>
                 <goals>

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/13d668f2/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/AuthCallbacks.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/AuthCallbacks.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/AuthCallbacks.java
new file mode 100644
index 0000000..200420a
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/AuthCallbacks.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.bookkeeper.auth;
+
+/**
+ * Callbacks for AuthProviders
+ */
+public abstract class AuthCallbacks {
+
+    public interface GenericCallback<T> {
+
+        void operationComplete(int rc, T result);
+    }
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/13d668f2/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/AuthProviderFactoryFactory.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/AuthProviderFactoryFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/AuthProviderFactoryFactory.java
index d05c475..7254903 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/AuthProviderFactoryFactory.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/AuthProviderFactoryFactory.java
@@ -21,40 +21,34 @@
 package org.apache.bookkeeper.auth;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
 
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage;
 import org.apache.bookkeeper.util.ReflectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
-import com.google.protobuf.ExtensionRegistry;
-
+import org.apache.bookkeeper.client.ClientConnectionPeer;
+import org.apache.bookkeeper.bookie.BookieConnectionPeer;
 
 
 public class AuthProviderFactoryFactory {
     static Logger LOG = LoggerFactory.getLogger(AuthProviderFactoryFactory.class);
 
-    public static BookieAuthProvider.Factory newBookieAuthProviderFactory(ServerConfiguration conf,
-                                                                          ExtensionRegistry registry) throws IOException {
+    public static BookieAuthProvider.Factory newBookieAuthProviderFactory(ServerConfiguration conf) throws IOException {
         String factoryClassName = conf.getBookieAuthProviderFactoryClass();
 
         if (factoryClassName == null || factoryClassName.length() == 0) {
-            return new NullBookieAuthProviderFactory();
+            return new AuthenticationDisabledAuthProviderFactory();
         }
 
         BookieAuthProvider.Factory factory = ReflectionUtils.newInstance(factoryClassName,
                                                                          BookieAuthProvider.Factory.class);
-        factory.init(conf, registry);
+        factory.init(conf);
         return factory;
     }
 
-    public static ClientAuthProvider.Factory newClientAuthProviderFactory(ClientConfiguration conf,
-                                                                          ExtensionRegistry registry) throws IOException {
+    public static ClientAuthProvider.Factory newClientAuthProviderFactory(ClientConfiguration conf) throws IOException {
         String factoryClassName = conf.getClientAuthProviderFactoryClass();
 
         if (factoryClassName == null || factoryClassName.length() == 0) {
@@ -63,27 +57,32 @@ public class AuthProviderFactoryFactory {
 
         ClientAuthProvider.Factory factory = ReflectionUtils.newInstance(factoryClassName,
                                                                          ClientAuthProvider.Factory.class);
-        factory.init(conf, registry);
+        factory.init(conf);
         return factory;
     }
 
-    private final static String nullPluginName = "NULLPlugin";
+    public final static String authenticationDisabledPluginName = "AuthDisabledPlugin";
 
-    private static class NullBookieAuthProviderFactory implements BookieAuthProvider.Factory {
+    private static class AuthenticationDisabledAuthProviderFactory implements BookieAuthProvider.Factory {
         @Override
         public String getPluginName() {
-            return nullPluginName;
+            return authenticationDisabledPluginName;
         }
 
         @Override
-        public void init(ServerConfiguration conf, ExtensionRegistry registry) {}
+        public void init(ServerConfiguration conf) {}
 
         @Override
-        public BookieAuthProvider newProvider(InetSocketAddress addr,
-                                              GenericCallback<Void> completeCb) {
+        public BookieAuthProvider newProvider(BookieConnectionPeer addr,
+                                              AuthCallbacks.GenericCallback<Void> completeCb) {
             completeCb.operationComplete(BKException.Code.OK, null);
             return new BookieAuthProvider() {
-                public void process(AuthMessage m, GenericCallback<AuthMessage> cb) {}
+                public void process(AuthToken m, AuthCallbacks.GenericCallback<AuthToken> cb) {
+                    // any request of authentication for clients is going to be answered with a standard response
+                    // the client will d
+                    addr.setAuthorizedId(BookKeeperPrincipal.ANONYMOUS);
+                    cb.operationComplete(BKException.Code.OK, AuthToken.NULL);
+                }
             };
         }
     }
@@ -91,19 +90,20 @@ public class AuthProviderFactoryFactory {
     private static class NullClientAuthProviderFactory implements ClientAuthProvider.Factory {
         @Override
         public String getPluginName() {
-            return nullPluginName;
+            return authenticationDisabledPluginName;
         }
 
         @Override
-        public void init(ClientConfiguration conf, ExtensionRegistry registry) {}
+        public void init(ClientConfiguration conf) {}
 
         @Override
-        public ClientAuthProvider newProvider(InetSocketAddress addr,
-                                              GenericCallback<Void> completeCb) {
+        public ClientAuthProvider newProvider(ClientConnectionPeer addr,
+                                              AuthCallbacks.GenericCallback<Void> completeCb) {
+            addr.setAuthorizedId(BookKeeperPrincipal.ANONYMOUS);
             completeCb.operationComplete(BKException.Code.OK, null);
             return new ClientAuthProvider() {
-                public void init(GenericCallback<AuthMessage> cb) {}
-                public void process(AuthMessage m, GenericCallback<AuthMessage> cb) {}
+                public void init(AuthCallbacks.GenericCallback<AuthToken> cb) {}
+                public void process(AuthToken m, AuthCallbacks.GenericCallback<AuthToken> cb) {}
             };
         }
     }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/13d668f2/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/AuthToken.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/AuthToken.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/AuthToken.java
new file mode 100644
index 0000000..f4d0fab
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/AuthToken.java
@@ -0,0 +1,44 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.auth;
+
+/**
+ * A generic payload for Authentication Messages
+ */
+public class AuthToken {
+
+    public static final AuthToken NULL = wrap(new byte[0]);
+
+    private final byte[] data;
+
+    private AuthToken(byte[] data) {
+        this.data = data;
+    }
+
+    public static AuthToken wrap(byte[] data) {
+        return new AuthToken(data);
+    }
+
+    public byte[] getData() {
+        return data;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/13d668f2/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/BookKeeperPrincipal.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/BookKeeperPrincipal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/BookKeeperPrincipal.java
new file mode 100644
index 0000000..7f6e582
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/BookKeeperPrincipal.java
@@ -0,0 +1,72 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.auth;
+
+import java.util.Objects;
+
+/**
+ * A Principal is the user bound to the connection
+ */
+public class BookKeeperPrincipal {
+
+    private final String name;
+
+    public static final BookKeeperPrincipal ANONYMOUS = new BookKeeperPrincipal("ANONYMOUS");
+
+    public BookKeeperPrincipal(String name) {
+        this.name = name;
+    }
+
+    public final String getName() {
+        return name;
+    }
+
+    @Override
+    public String toString() {
+        return "BookKeeperPrincipal{" + name + '}';
+    }
+
+    @Override
+    public int hashCode() {
+        int hash = 7;
+        hash = 31 * hash + Objects.hashCode(this.name);
+        return hash;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+        if (getClass() != obj.getClass()) {
+            return false;
+        }
+        final BookKeeperPrincipal other = (BookKeeperPrincipal) obj;
+        if (!Objects.equals(this.name, other.name)) {
+            return false;
+        }
+        return true;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/13d668f2/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/BookieAuthProvider.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/BookieAuthProvider.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/BookieAuthProvider.java
index 4fb7d07..84577c7 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/BookieAuthProvider.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/BookieAuthProvider.java
@@ -21,13 +21,10 @@
 package org.apache.bookkeeper.auth;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
 
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage;
-
-import com.google.protobuf.ExtensionRegistry;
+import org.apache.bookkeeper.bookie.BookieConnectionPeer;
 
 /**
  * Bookie authentication provider interface.
@@ -43,8 +40,7 @@ public interface BookieAuthProvider {
          * payload, so that the server can decode auth messages
          * it receives from the client.
          */
-        void init(ServerConfiguration conf,
-                  ExtensionRegistry registry) throws IOException;
+        void init(ServerConfiguration conf) throws IOException;
 
         /**
          * Create a new instance of a bookie auth provider.
@@ -58,12 +54,12 @@ public interface BookieAuthProvider {
          * error code should be passed.
          * If authentication fails, the server will close the
          * connection.
-         * @param addr the address of the client being authenticated
+         * @param connection an handle to the connection
          * @param completeCb callback to be notified when authentication
          *                   is complete.
          */
-        BookieAuthProvider newProvider(InetSocketAddress addr,
-                                       GenericCallback<Void> completeCb);
+        BookieAuthProvider newProvider(BookieConnectionPeer connection,
+                                       AuthCallbacks.GenericCallback<Void> completeCb);
 
         /**
          * Get Auth provider plugin name.
@@ -71,6 +67,11 @@ public interface BookieAuthProvider {
          * are using the same auth provider.
          */
         String getPluginName();
+
+        /**
+        * Release resources
+        */
+        default void close() {}
     }
 
     /**
@@ -79,5 +80,10 @@ public interface BookieAuthProvider {
      * to send to the client, cb should not be called, and completeCb
      * must be called instead.
      */
-    void process(AuthMessage m, GenericCallback<AuthMessage> cb);
+    void process(AuthToken m, AuthCallbacks.GenericCallback<AuthToken> cb);
+
+    /**
+     * Release resources
+     */
+    default void close() {}
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/13d668f2/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/ClientAuthProvider.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/ClientAuthProvider.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/ClientAuthProvider.java
index fba2264..b24b1b4 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/ClientAuthProvider.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/ClientAuthProvider.java
@@ -21,13 +21,9 @@
 package org.apache.bookkeeper.auth;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
 
 import org.apache.bookkeeper.conf.ClientConfiguration;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
-import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage;
-
-import com.google.protobuf.ExtensionRegistry;
+import org.apache.bookkeeper.client.ClientConnectionPeer;
 
 /**
  * Client authentication provider interface.
@@ -43,8 +39,7 @@ public interface ClientAuthProvider {
          * payload, so that the client can decode auth messages
          * it receives from the server.
          */
-        void init(ClientConfiguration conf,
-                  ExtensionRegistry registry) throws IOException;
+        void init(ClientConfiguration conf) throws IOException;
 
         /**
          * Create a new instance of a client auth provider.
@@ -56,12 +51,12 @@ public interface ClientAuthProvider {
          * If the authentication was successful, BKException.Code.OK
          * should be passed as the return code. Otherwise, another
          * error code should be passed.
-         * @param addr the address of the socket being authenticated
+         * @param connection an handle to the connection
          * @param completeCb callback to be notified when authentication
          *                   is complete.
          */
-        ClientAuthProvider newProvider(InetSocketAddress addr,
-                                       GenericCallback<Void> completeCb);
+        ClientAuthProvider newProvider(ClientConnectionPeer connection,
+                                       AuthCallbacks.GenericCallback<Void> completeCb);
 
         /**
          * Get Auth provider plugin name.
@@ -69,6 +64,11 @@ public interface ClientAuthProvider {
          * are using the same auth provider.
          */
         String getPluginName();
+
+        /**
+        * Release resources
+        */
+        default void close() {}
     }
 
     /**
@@ -77,7 +77,7 @@ public interface ClientAuthProvider {
      * cb may not be called if authentication is not requires. In
      * this case, completeCb should be called.
      */
-    void init(GenericCallback<AuthMessage> cb);
+    void init(AuthCallbacks.GenericCallback<AuthToken> cb);
 
     /**
      * Process a response from the server. cb will receive the next
@@ -85,5 +85,10 @@ public interface ClientAuthProvider {
      * to send to the server, cb should not be called, and completeCb
      * must be called instead.
      */
-    void process(AuthMessage m, GenericCallback<AuthMessage> cb);
+    void process(AuthToken m, AuthCallbacks.GenericCallback<AuthToken> cb);
+
+    /**
+     * Release resources
+     */
+    default void close() {}
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/13d668f2/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieConnectionPeer.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieConnectionPeer.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieConnectionPeer.java
new file mode 100644
index 0000000..f7b917d
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieConnectionPeer.java
@@ -0,0 +1,30 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie;
+
+import org.apache.bookkeeper.proto.ConnectionPeer;
+
+/**
+ * Represents the connection to a BookKeeper client, from the Bookie side 
+ */
+public interface BookieConnectionPeer extends ConnectionPeer {
+
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/13d668f2/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ClientConnectionPeer.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ClientConnectionPeer.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ClientConnectionPeer.java
new file mode 100644
index 0000000..322cba9
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ClientConnectionPeer.java
@@ -0,0 +1,30 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client;
+
+import org.apache.bookkeeper.proto.ConnectionPeer;
+
+/**
+ * Represents the connection to a Bookie, from the client side
+ */
+public interface ClientConnectionPeer extends ConnectionPeer {
+
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/13d668f2/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
index adf08ff..7df41fc 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
@@ -18,6 +18,7 @@
 package org.apache.bookkeeper.conf;
 
 import java.net.URL;
+import static org.apache.bookkeeper.conf.ClientConfiguration.CLIENT_AUTH_PROVIDER_FACTORY_CLASS;
 
 import org.apache.commons.configuration.CompositeConfiguration;
 import org.apache.commons.configuration.Configuration;
@@ -66,6 +67,9 @@ public abstract class AbstractConfiguration extends CompositeConfiguration {
     protected final static String METASTORE_IMPL_CLASS = "metastoreImplClass";
     protected final static String METASTORE_MAX_ENTRIES_PER_SCAN = "metastoreMaxEntriesPerScan";
 
+    // Client auth provider factory class name. It must be configured on Bookies to for the Auditor
+    protected final static String CLIENT_AUTH_PROVIDER_FACTORY_CLASS = "clientAuthProviderFactoryClass";
+
     protected AbstractConfiguration() {
         super();
         if (READ_SYSTEM_PROPERTIES) {
@@ -256,4 +260,28 @@ public abstract class AbstractConfiguration extends CompositeConfiguration {
             return (Feature)getProperty(configProperty);
         }
     }
+
+    /**
+     * Set the client authentication provider factory class name.
+     * If this is not set, no authentication will be used
+     *
+     * @param factoryClass
+     *          the client authentication provider factory class name
+     * @return client configuration
+     */
+    public AbstractConfiguration setClientAuthProviderFactoryClass(
+            String factoryClass) {
+        setProperty(CLIENT_AUTH_PROVIDER_FACTORY_CLASS, factoryClass);
+        return this;
+    }
+
+    /**
+     * Get the client authentication provider factory class name. If this returns null, no authentication will take
+     * place.
+     *
+     * @return the client authentication provider factory class name or null.
+     */
+    public String getClientAuthProviderFactoryClass() {
+        return getString(CLIENT_AUTH_PROVIDER_FACTORY_CLASS, null);
+    }
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/13d668f2/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
index 8e76bb7..7353c3f 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
@@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
 import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy;
+import org.apache.bookkeeper.replication.Auditor;
 import org.apache.bookkeeper.util.ReflectionUtils;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.lang.StringUtils;
@@ -86,8 +87,18 @@ public class ClientConfiguration extends AbstractConfiguration {
     protected final static String ENABLE_TASK_EXECUTION_STATS = "enableTaskExecutionStats";
     protected final static String TASK_EXECUTION_WARN_TIME_MICROS = "taskExecutionWarnTimeMicros";
 
-    // Client auth provider factory class name
-    protected final static String CLIENT_AUTH_PROVIDER_FACTORY_CLASS = "clientAuthProviderFactoryClass";
+    // Role of the client
+    protected final static String CLIENT_ROLE = "clientRole";
+
+    /**
+     * This client will act as a standard client
+     */
+    public final static String CLIENT_ROLE_STANDARD = "standard";
+
+    /**
+     * This client will act as a system client, like the {@link Auditor}
+     */
+    public final static String CLIENT_ROLE_SYSTEM = "system";
 
     /**
      * Construct a default client-side configuration
@@ -896,26 +907,32 @@ public class ClientConfiguration extends AbstractConfiguration {
     }
 
     /**
-     * Set the client authentication provider factory class name.
-     * If this is not set, no authentication will be used
+     * Set the client role
      *
-     * @param factoryClass
-     *          the client authentication provider factory class name
+     * @param role defines how the client will act
      * @return client configuration
      */
-    public ClientConfiguration setClientAuthProviderFactoryClass(
-            String factoryClass) {
-        setProperty(CLIENT_AUTH_PROVIDER_FACTORY_CLASS, factoryClass);
+    public ClientConfiguration setClientRole(String role) {
+        if (role == null) {
+            throw new NullPointerException();
+        }
+        switch (role) {
+            case CLIENT_ROLE_STANDARD:
+            case CLIENT_ROLE_SYSTEM:
+                break;
+            default:
+                throw new IllegalArgumentException("invalid role "+role);
+        }
+        setProperty(CLIENT_ROLE, role);
         return this;
     }
 
     /**
-     * Get the client authentication provider factory class name. If this returns null, no authentication will take
-     * place.
+     * Get the role of the client
      *
-     * @return the client authentication provider factory class name or null.
+     * @return the type of client
      */
-    public String getClientAuthProviderFactoryClass() {
-        return getString(CLIENT_AUTH_PROVIDER_FACTORY_CLASS, null);
+    public String getClientRole() {
+        return getString(CLIENT_ROLE, CLIENT_ROLE_STANDARD);
     }
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/13d668f2/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
index dd51a68..75dced5 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
@@ -20,13 +20,15 @@
  */
 package org.apache.bookkeeper.proto;
 
+import com.google.protobuf.ByteString;
 import java.io.IOException;
-import java.net.Inet4Address;
-import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicLong;
+import org.apache.bookkeeper.auth.AuthCallbacks;
+import org.apache.bookkeeper.auth.AuthProviderFactoryFactory;
+import org.apache.bookkeeper.auth.AuthToken;
 
 import org.apache.bookkeeper.auth.BookieAuthProvider;
 import org.apache.bookkeeper.auth.ClientAuthProvider;
@@ -39,9 +41,10 @@ import org.jboss.netty.channel.ChannelStateEvent;
 import org.jboss.netty.channel.DefaultExceptionEvent;
 import org.jboss.netty.channel.MessageEvent;
 import org.jboss.netty.channel.SimpleChannelHandler;
-import org.jboss.netty.channel.local.LocalChannel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.bookkeeper.client.ClientConnectionPeer;
+import org.apache.bookkeeper.bookie.BookieConnectionPeer;
 
 class AuthHandler {
     static final Logger LOG = LoggerFactory.getLogger(AuthHandler.class);
@@ -49,32 +52,31 @@ class AuthHandler {
     static class ServerSideHandler extends SimpleChannelHandler {
         volatile boolean authenticated = false;
         final BookieAuthProvider.Factory authProviderFactory;
+        final BookieConnectionPeer connectionPeer;
         BookieAuthProvider authProvider;
 
-        ServerSideHandler(BookieAuthProvider.Factory authProviderFactory) {
+        ServerSideHandler(BookieConnectionPeer connectionPeer, BookieAuthProvider.Factory authProviderFactory) {
             this.authProviderFactory = authProviderFactory;
+            this.connectionPeer = connectionPeer;
             authProvider = null;
         }
 
         @Override
         public void channelOpen(ChannelHandlerContext ctx,
                                 ChannelStateEvent e) throws Exception {
-            LOG.info("Channel open {}", ctx.getChannel());
-            SocketAddress remote  = ctx.getChannel().getRemoteAddress();
-            if (remote instanceof InetSocketAddress) {
-                authProvider = authProviderFactory.newProvider((InetSocketAddress)remote,
-                        new AuthHandshakeCompleteCallback());
-            } else if (ctx.getChannel() instanceof LocalChannel) {
-                authProvider = authProviderFactory.newProvider(new InetSocketAddress(Inet4Address.getLocalHost(), 0),
-                        new AuthHandshakeCompleteCallback());
-            } else {
-                LOG.error("Unknown channel ({}) or socket type {} for {}",
-                        new Object[] { ctx.getChannel(), remote != null ? remote.getClass() : null, remote });
-            }
+            authProvider = authProviderFactory.newProvider(connectionPeer, new AuthHandshakeCompleteCallback());
             super.channelOpen(ctx, e);
         }
 
         @Override
+        public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
+            if (authProvider != null) {
+                authProvider.close();
+            }
+            super.channelClosed(ctx, e);
+        }
+
+        @Override
         public void messageReceived(ChannelHandlerContext ctx,
                                     MessageEvent e)
                 throws Exception {
@@ -94,7 +96,11 @@ class AuthHandler {
                 BookieProtocol.AuthRequest req = (BookieProtocol.AuthRequest)event;
                 assert (req.getOpCode() == BookieProtocol.AUTH);
                 if (checkAuthPlugin(req.getAuthMessage(), ctx.getChannel())) {
-                    authProvider.process(req.getAuthMessage(),
+                    byte[] payload = req
+                        .getAuthMessage()
+                        .getPayload()
+                        .toByteArray();
+                    authProvider.process(AuthToken.wrap(payload),
                                 new AuthResponseCallbackLegacy(req, ctx.getChannel()));
                 } else {
                     ctx.getChannel().close();
@@ -119,8 +125,12 @@ class AuthHandler {
                 if (req.getHeader().getOperation() == BookkeeperProtocol.OperationType.AUTH
                         && req.hasAuthRequest()
                         && checkAuthPlugin(req.getAuthRequest(), ctx.getChannel())) {
-                    authProvider.process(req.getAuthRequest(),
-                                         new AuthResponseCallback(req, ctx.getChannel()));
+                    byte[] payload = req
+                        .getAuthRequest()
+                        .getPayload()
+                        .toByteArray();
+                    authProvider.process(AuthToken.wrap(payload),
+                                         new AuthResponseCallback(req, ctx.getChannel(), authProviderFactory.getPluginName()));
                 } else {
                     BookkeeperProtocol.Response.Builder builder
                         = BookkeeperProtocol.Response.newBuilder()
@@ -146,7 +156,7 @@ class AuthHandler {
             return true;
         }
 
-        static class AuthResponseCallbackLegacy implements GenericCallback<AuthMessage> {
+        static class AuthResponseCallbackLegacy implements AuthCallbacks.GenericCallback<AuthToken> {
             final BookieProtocol.AuthRequest req;
             final Channel channel;
 
@@ -155,27 +165,35 @@ class AuthHandler {
                 this.channel = channel;
             }
 
-            public void operationComplete(int rc, AuthMessage newam) {
+            public void operationComplete(int rc, AuthToken newam) {
                 if (rc != BKException.Code.OK) {
                     LOG.error("Error processing auth message, closing connection");
                     channel.close();
                     return;
                 }
+                AuthMessage message =
+                    AuthMessage
+                        .newBuilder()
+                        .setAuthPluginName(req.authMessage.getAuthPluginName())
+                        .setPayload(ByteString.copyFrom(newam.getData()))
+                        .build();
                 channel.write(new BookieProtocol.AuthResponse(req.getProtocolVersion(),
-                                                              newam));
+                                                              message));
             }
         }
 
-        static class AuthResponseCallback implements GenericCallback<AuthMessage> {
+        static class AuthResponseCallback implements AuthCallbacks.GenericCallback<AuthToken> {
             final BookkeeperProtocol.Request req;
             final Channel channel;
+            final String pluginName;
 
-            AuthResponseCallback(BookkeeperProtocol.Request req, Channel channel) {
+            AuthResponseCallback(BookkeeperProtocol.Request req, Channel channel, String pluginName) {
                 this.req = req;
                 this.channel = channel;
+                this.pluginName = pluginName;
             }
 
-            public void operationComplete(int rc, AuthMessage newam) {
+            public void operationComplete(int rc, AuthToken newam) {
                 BookkeeperProtocol.Response.Builder builder
                     = BookkeeperProtocol.Response.newBuilder()
                     .setHeader(req.getHeader());
@@ -188,18 +206,25 @@ class AuthHandler {
                     channel.close();
                     return;
                 } else {
+                    AuthMessage message =
+                        AuthMessage
+                            .newBuilder()
+                            .setAuthPluginName(pluginName)
+                            .setPayload(ByteString.copyFrom(newam.getData()))
+                            .build();
                     builder.setStatus(BookkeeperProtocol.StatusCode.EOK)
-                        .setAuthResponse(newam);
+                        .setAuthResponse(message);
                     channel.write(builder.build());
                 }
             }
         }
 
-        class AuthHandshakeCompleteCallback implements GenericCallback<Void> {
+        class AuthHandshakeCompleteCallback implements AuthCallbacks.GenericCallback<Void> {
             @Override
             public void operationComplete(int rc, Void v) {
                 if (rc == BKException.Code.OK) {
                     authenticated = true;
+                    LOG.info("Authentication success on server side");
                 } else {
                     LOG.debug("Authentication failed on server side");
                 }
@@ -211,13 +236,16 @@ class AuthHandler {
         volatile boolean authenticated = false;
         final ClientAuthProvider.Factory authProviderFactory;
         ClientAuthProvider authProvider;
-        AtomicLong transactionIdGenerator;
-        Queue<MessageEvent> waitingForAuth = new ConcurrentLinkedQueue<MessageEvent>();
+        final AtomicLong transactionIdGenerator;
+        final Queue<MessageEvent> waitingForAuth = new ConcurrentLinkedQueue<MessageEvent>();
+        final ClientConnectionPeer connectionPeer;
 
         ClientSideHandler(ClientAuthProvider.Factory authProviderFactory,
-                          AtomicLong transactionIdGenerator) {
+                          AtomicLong transactionIdGenerator,
+                          ClientConnectionPeer connectionPeer) {
             this.authProviderFactory = authProviderFactory;
             this.transactionIdGenerator = transactionIdGenerator;
+            this.connectionPeer = connectionPeer;
             authProvider = null;
         }
 
@@ -225,22 +253,19 @@ class AuthHandler {
         public void channelConnected(ChannelHandlerContext ctx,
                                      ChannelStateEvent e)
                 throws Exception {
-            SocketAddress remote  = ctx.getChannel().getRemoteAddress();
-            if (remote instanceof InetSocketAddress) {
-                authProvider = authProviderFactory.newProvider((InetSocketAddress)remote,
-                        new AuthHandshakeCompleteCallback(ctx));
-            } else if (ctx.getChannel() instanceof LocalChannel) {
-                authProvider = authProviderFactory.newProvider(new InetSocketAddress(Inet4Address.getLocalHost(), 0),
+            authProvider = authProviderFactory.newProvider(connectionPeer,
                         new AuthHandshakeCompleteCallback(ctx));
-            } else {
-                LOG.error("Unknown channel ({}) or socket type {} for {}",
-                        new Object[] { ctx.getChannel(), remote != null ? remote.getClass() : null, remote });
-            }
+            authProvider.init(new AuthRequestCallback(ctx, authProviderFactory.getPluginName()));
+
+            super.channelConnected(ctx, e);
+        }
 
+        @Override
+        public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
             if (authProvider != null) {
-                authProvider.init(new AuthRequestCallback(ctx));
+                authProvider.close();
             }
-            super.channelConnected(ctx, e);
+            super.channelClosed(ctx, e);
         }
 
         @Override
@@ -261,7 +286,18 @@ class AuthHandler {
                     } else {
                         assert (resp.hasAuthResponse());
                         BookkeeperProtocol.AuthMessage am = resp.getAuthResponse();
-                        authProvider.process(am, new AuthRequestCallback(ctx));
+                        if (AuthProviderFactoryFactory.authenticationDisabledPluginName.equals(am.getAuthPluginName())){
+                            SocketAddress remote  = ctx.getChannel().getRemoteAddress();
+                            LOG.info("Authentication is not enabled."
+                                + "Considering this client {0} authenticated", remote);
+                            AuthHandshakeCompleteCallback authHandshakeCompleteCallback
+                                = new AuthHandshakeCompleteCallback(ctx);
+                            authHandshakeCompleteCallback.operationComplete(BKException.Code.OK, null);
+                            return;
+                        }
+                        byte[] payload = am.getPayload().toByteArray();
+                        authProvider.process(AuthToken.wrap(payload), new AuthRequestCallback(ctx,
+                            authProviderFactory.getPluginName()));
                     }
                 } else {
                     // else just drop the message,
@@ -301,20 +337,27 @@ class AuthHandler {
                                              "Auth failed with error " + errorCode)));
         }
 
-        class AuthRequestCallback implements GenericCallback<AuthMessage> {
+        class AuthRequestCallback implements AuthCallbacks.GenericCallback<AuthToken> {
             Channel channel;
             ChannelHandlerContext ctx;
+            String pluginName;
 
-            AuthRequestCallback(ChannelHandlerContext ctx) {
+            AuthRequestCallback(ChannelHandlerContext ctx, String pluginName) {
                 this.channel = ctx.getChannel();
                 this.ctx = ctx;
+                this.pluginName = pluginName;
             }
 
-            public void operationComplete(int rc, AuthMessage newam) {
+            public void operationComplete(int rc, AuthToken newam) {
                 if (rc != BKException.Code.OK) {
                     authenticationError(ctx, rc);
                     return;
                 }
+                AuthMessage message = AuthMessage
+                    .newBuilder()
+                    .setAuthPluginName(pluginName)
+                    .setPayload(ByteString.copyFrom(newam.getData()))
+                    .build();
 
                 BookkeeperProtocol.BKPacketHeader header
                     = BookkeeperProtocol.BKPacketHeader.newBuilder()
@@ -324,13 +367,13 @@ class AuthHandler {
                 BookkeeperProtocol.Request.Builder builder
                     = BookkeeperProtocol.Request.newBuilder()
                     .setHeader(header)
-                    .setAuthRequest(newam);
+                    .setAuthRequest(message);
 
                 channel.write(builder.build());
             }
         }
 
-        class AuthHandshakeCompleteCallback implements GenericCallback<Void> {
+        class AuthHandshakeCompleteCallback implements AuthCallbacks.GenericCallback<Void> {
             ChannelHandlerContext ctx;
             AuthHandshakeCompleteCallback(ChannelHandlerContext ctx) {
                 this.ctx = ctx;

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/13d668f2/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
index d0052d6..9b0865a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
@@ -98,7 +98,7 @@ public class BookieClient implements PerChannelBookieClientFactory {
         this.closeLock = new ReentrantReadWriteLock();
 
         this.registry = ExtensionRegistry.newInstance();
-        this.authProviderFactory = AuthProviderFactoryFactory.newClientAuthProviderFactory(conf, registry);
+        this.authProviderFactory = AuthProviderFactoryFactory.newClientAuthProviderFactory(conf);
 
         this.statsLogger = statsLogger;
         this.numConnectionsPerBookie = conf.getNumChannelsPerBookie();
@@ -295,6 +295,7 @@ public class BookieClient implements PerChannelBookieClientFactory {
                 pool.close(true);
             }
             channels.clear();
+            authProviderFactory.close();
         } finally {
             closeLock.writeLock().unlock();
         }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/13d668f2/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
index 5fcc64e..924c887 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
@@ -44,13 +44,19 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import com.google.protobuf.ExtensionRegistry;
 import com.google.common.annotations.VisibleForTesting;
+import java.net.SocketAddress;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
+import org.apache.bookkeeper.auth.BookKeeperPrincipal;
+import org.apache.bookkeeper.bookie.BookieConnectionPeer;
 
 /**
  * Netty server for serving bookie requests
  */
 class BookieNettyServer {
+
     private final static Logger LOG = LoggerFactory.getLogger(BookieNettyServer.class);
 
     final static int maxMessageSize = 0xfffff;
@@ -67,12 +73,12 @@ class BookieNettyServer {
     final BookieProtoEncoding.RequestDecoder requestDecoder;
 
     BookieNettyServer(ServerConfiguration conf, RequestProcessor processor)
-            throws IOException, KeeperException, InterruptedException, BookieException {
+        throws IOException, KeeperException, InterruptedException, BookieException {
         this.conf = conf;
         this.requestProcessor = processor;
 
         ExtensionRegistry registry = ExtensionRegistry.newInstance();
-        authProviderFactory = AuthProviderFactoryFactory.newBookieAuthProviderFactory(conf, registry);
+        authProviderFactory = AuthProviderFactoryFactory.newBookieAuthProviderFactory(conf);
 
         responseEncoder = new BookieProtoEncoding.ResponseEncoder(registry);
         requestDecoder = new BookieProtoEncoding.RequestDecoder(registry);
@@ -130,6 +136,64 @@ class BookieNettyServer {
         for (ChannelManager channel : channels) {
             channel.close();
         }
+        authProviderFactory.close();
+    }
+
+    class BookieSideConnectionPeerContextHandler extends SimpleChannelHandler {
+
+        final BookieConnectionPeer connectionPeer;
+        volatile Channel channel;
+        volatile BookKeeperPrincipal authorizedId = BookKeeperPrincipal.ANONYMOUS;
+
+        public BookieSideConnectionPeerContextHandler() {
+            this.connectionPeer = new BookieConnectionPeer() {
+                @Override
+                public SocketAddress getRemoteAddr() {
+                    Channel c = channel;
+                    if (c != null) {
+                        return c.getRemoteAddress();
+                    } else {
+                        return null;
+                    }
+                }
+
+                @Override
+                public Collection<Object> getProtocolPrincipals() {
+                    return Collections.emptyList();
+                }
+
+                @Override
+                public void disconnect() {
+                    Channel c = channel;
+                    if (c != null) {
+                        c.close();
+                    }
+                    LOG.info("authplugin disconnected channel {}", channel);
+                }
+
+                @Override
+                public BookKeeperPrincipal getAuthorizedId() {
+                    return authorizedId;
+                }
+
+                @Override
+                public void setAuthorizedId(BookKeeperPrincipal principal) {
+                    LOG.info("connection {} authenticated as {}", channel, principal);
+                    authorizedId = principal;
+                }
+
+            };
+        }
+
+        public BookieConnectionPeer getConnectionPeer() {
+            return connectionPeer;
+        }
+
+        @Override
+        public void channelBound(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
+            channel = ctx.getChannel();
+        }
+
     }
 
     class BookiePipelineFactory implements ChannelPipelineFactory {
@@ -140,21 +204,23 @@ class BookieNettyServer {
                     suspensionLock.wait();
                 }
             }
+            BookieSideConnectionPeerContextHandler contextHandler = new BookieSideConnectionPeerContextHandler();
             ChannelPipeline pipeline = Channels.pipeline();
             pipeline.addLast("lengthbaseddecoder",
-                    new LengthFieldBasedFrameDecoder(maxMessageSize, 0, 4, 0, 4));
+                new LengthFieldBasedFrameDecoder(maxMessageSize, 0, 4, 0, 4));
             pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
 
             pipeline.addLast("bookieProtoDecoder", requestDecoder);
             pipeline.addLast("bookieProtoEncoder", responseEncoder);
             pipeline.addLast("bookieAuthHandler",
-                    new AuthHandler.ServerSideHandler(authProviderFactory));
+                new AuthHandler.ServerSideHandler(contextHandler.getConnectionPeer(), authProviderFactory));
 
             SimpleChannelHandler requestHandler = isRunning.get()
-                    ? new BookieRequestHandler(conf, requestProcessor, allChannels)
-                    : new RejectRequestHandler();
+                ? new BookieRequestHandler(conf, requestProcessor, allChannels)
+                : new RejectRequestHandler();
 
             pipeline.addLast("bookieRequestHandler", requestHandler);
+            pipeline.addLast("contextHandler", contextHandler);
             return pipeline;
         }
     }
@@ -198,7 +264,7 @@ class BookieNettyServer {
             }
             CleanupChannelGroup other = (CleanupChannelGroup) o;
             return other.closed.get() == closed.get()
-                    && super.equals(other);
+                && super.equals(other);
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/13d668f2/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
index 4dec39a..7b227fa 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
@@ -20,6 +20,13 @@
  */
 package org.apache.bookkeeper.proto;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.protobuf.ByteString;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.bookkeeper.auth.AuthProviderFactoryFactory;
+import org.apache.bookkeeper.auth.AuthToken;
+
 import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.processor.RequestProcessor;
@@ -115,6 +122,19 @@ public class BookieRequestProcessor implements RequestProcessor {
                 case READ_ENTRY:
                     processReadRequestV3(r, c);
                     break;
+                case AUTH:
+                    LOG.info("Ignoring auth operation from client {}",c.getRemoteAddress());
+                    BookkeeperProtocol.AuthMessage message = BookkeeperProtocol.AuthMessage
+                        .newBuilder()
+                        .setAuthPluginName(AuthProviderFactoryFactory.authenticationDisabledPluginName)
+                        .setPayload(ByteString.copyFrom(AuthToken.NULL.getData()))
+                        .build();
+                    BookkeeperProtocol.Response.Builder authResponse =
+                            BookkeeperProtocol.Response.newBuilder().setHeader(r.getHeader())
+                            .setStatus(BookkeeperProtocol.StatusCode.EOK)
+                            .setAuthResponse(message);
+                    c.write(authResponse.build());
+                    break;
                 default:
                     LOG.info("Unknown operation type {}", header.getOperation());
                     BookkeeperProtocol.Response.Builder response =

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/13d668f2/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperProtocol.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperProtocol.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperProtocol.java
index 9156751..5fedfff 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperProtocol.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperProtocol.java
@@ -6386,8 +6386,7 @@ public final class BookkeeperProtocol {
 
   public interface AuthMessageOrBuilder extends
       // @@protoc_insertion_point(interface_extends:AuthMessage)
-      com.google.protobuf.GeneratedMessage.
-          ExtendableMessageOrBuilder<AuthMessage> {
+      com.google.protobuf.MessageOrBuilder {
 
     /**
      * <code>required string authPluginName = 1;</code>
@@ -6402,23 +6401,25 @@ public final class BookkeeperProtocol {
      */
     com.google.protobuf.ByteString
         getAuthPluginNameBytes();
+
+    /**
+     * <code>required bytes payload = 2;</code>
+     */
+    boolean hasPayload();
+    /**
+     * <code>required bytes payload = 2;</code>
+     */
+    com.google.protobuf.ByteString getPayload();
   }
   /**
    * Protobuf type {@code AuthMessage}
-   *
-   * <pre>
-   **
-   * Extendible message which auth mechanisms
-   * can use to carry their payload.
-   * </pre>
    */
   public static final class AuthMessage extends
-      com.google.protobuf.GeneratedMessage.ExtendableMessage<
-        AuthMessage> implements
+      com.google.protobuf.GeneratedMessage implements
       // @@protoc_insertion_point(message_implements:AuthMessage)
       AuthMessageOrBuilder {
     // Use AuthMessage.newBuilder() to construct.
-    private AuthMessage(com.google.protobuf.GeneratedMessage.ExtendableBuilder<org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage, ?> builder) {
+    private AuthMessage(com.google.protobuf.GeneratedMessage.Builder<?> builder) {
       super(builder);
       this.unknownFields = builder.getUnknownFields();
     }
@@ -6468,6 +6469,11 @@ public final class BookkeeperProtocol {
               authPluginName_ = bs;
               break;
             }
+            case 18: {
+              bitField0_ |= 0x00000002;
+              payload_ = input.readBytes();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -6550,8 +6556,24 @@ public final class BookkeeperProtocol {
       }
     }
 
+    public static final int PAYLOAD_FIELD_NUMBER = 2;
+    private com.google.protobuf.ByteString payload_;
+    /**
+     * <code>required bytes payload = 2;</code>
+     */
+    public boolean hasPayload() {
+      return ((bitField0_ & 0x00000002) == 0x00000002);
+    }
+    /**
+     * <code>required bytes payload = 2;</code>
+     */
+    public com.google.protobuf.ByteString getPayload() {
+      return payload_;
+    }
+
     private void initFields() {
       authPluginName_ = "";
+      payload_ = com.google.protobuf.ByteString.EMPTY;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -6563,7 +6585,7 @@ public final class BookkeeperProtocol {
         memoizedIsInitialized = 0;
         return false;
       }
-      if (!extensionsAreInitialized()) {
+      if (!hasPayload()) {
         memoizedIsInitialized = 0;
         return false;
       }
@@ -6574,13 +6596,12 @@ public final class BookkeeperProtocol {
     public void writeTo(com.google.protobuf.CodedOutputStream output)
                         throws java.io.IOException {
       getSerializedSize();
-      com.google.protobuf.GeneratedMessage
-        .ExtendableMessage<org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage>.ExtensionWriter extensionWriter =
-          newExtensionWriter();
       if (((bitField0_ & 0x00000001) == 0x00000001)) {
         output.writeBytes(1, getAuthPluginNameBytes());
       }
-      extensionWriter.writeUntil(536870912, output);
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        output.writeBytes(2, payload_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -6594,7 +6615,10 @@ public final class BookkeeperProtocol {
         size += com.google.protobuf.CodedOutputStream
           .computeBytesSize(1, getAuthPluginNameBytes());
       }
-      size += extensionsSerializedSize();
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(2, payload_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -6675,16 +6699,9 @@ public final class BookkeeperProtocol {
     }
     /**
      * Protobuf type {@code AuthMessage}
-     *
-     * <pre>
-     **
-     * Extendible message which auth mechanisms
-     * can use to carry their payload.
-     * </pre>
      */
     public static final class Builder extends
-        com.google.protobuf.GeneratedMessage.ExtendableBuilder<
-          org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage, Builder> implements
+        com.google.protobuf.GeneratedMessage.Builder<Builder> implements
         // @@protoc_insertion_point(builder_implements:AuthMessage)
         org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessageOrBuilder {
       public static final com.google.protobuf.Descriptors.Descriptor
@@ -6721,6 +6738,8 @@ public final class BookkeeperProtocol {
         super.clear();
         authPluginName_ = "";
         bitField0_ = (bitField0_ & ~0x00000001);
+        payload_ = com.google.protobuf.ByteString.EMPTY;
+        bitField0_ = (bitField0_ & ~0x00000002);
         return this;
       }
 
@@ -6753,6 +6772,10 @@ public final class BookkeeperProtocol {
           to_bitField0_ |= 0x00000001;
         }
         result.authPluginName_ = authPluginName_;
+        if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+          to_bitField0_ |= 0x00000002;
+        }
+        result.payload_ = payload_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -6774,7 +6797,9 @@ public final class BookkeeperProtocol {
           authPluginName_ = other.authPluginName_;
           onChanged();
         }
-        this.mergeExtensionFields(other);
+        if (other.hasPayload()) {
+          setPayload(other.getPayload());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -6784,7 +6809,7 @@ public final class BookkeeperProtocol {
           
           return false;
         }
-        if (!extensionsAreInitialized()) {
+        if (!hasPayload()) {
           
           return false;
         }
@@ -6886,6 +6911,41 @@ public final class BookkeeperProtocol {
         return this;
       }
 
+      private com.google.protobuf.ByteString payload_ = com.google.protobuf.ByteString.EMPTY;
+      /**
+       * <code>required bytes payload = 2;</code>
+       */
+      public boolean hasPayload() {
+        return ((bitField0_ & 0x00000002) == 0x00000002);
+      }
+      /**
+       * <code>required bytes payload = 2;</code>
+       */
+      public com.google.protobuf.ByteString getPayload() {
+        return payload_;
+      }
+      /**
+       * <code>required bytes payload = 2;</code>
+       */
+      public Builder setPayload(com.google.protobuf.ByteString value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000002;
+        payload_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>required bytes payload = 2;</code>
+       */
+      public Builder clearPayload() {
+        bitField0_ = (bitField0_ & ~0x00000002);
+        payload_ = getDefaultInstance().getPayload();
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:AuthMessage)
     }
 
@@ -6968,17 +7028,17 @@ public final class BookkeeperProtocol {
       "\022\033\n\006status\030\001 \002(\0162\013.StatusCode\022\020\n\010ledgerI",
       "d\030\002 \002(\003\022\017\n\007entryId\030\003 \002(\003\022\014\n\004body\030\004 \001(\014\"M" +
       "\n\013AddResponse\022\033\n\006status\030\001 \002(\0162\013.StatusCo" +
-      "de\022\020\n\010ledgerId\030\002 \002(\003\022\017\n\007entryId\030\003 \002(\003\"0\n" +
-      "\013AuthMessage\022\026\n\016authPluginName\030\001 \002(\t*\t\010\350" +
-      "\007\020\200\200\200\200\002*F\n\017ProtocolVersion\022\017\n\013VERSION_ON" +
-      "E\020\001\022\017\n\013VERSION_TWO\020\002\022\021\n\rVERSION_THREE\020\003*" +
-      "\206\001\n\nStatusCode\022\007\n\003EOK\020\000\022\016\n\tENOLEDGER\020\222\003\022" +
-      "\r\n\010ENOENTRY\020\223\003\022\014\n\007EBADREQ\020\224\003\022\010\n\003EIO\020\365\003\022\010" +
-      "\n\003EUA\020\366\003\022\020\n\013EBADVERSION\020\367\003\022\014\n\007EFENCED\020\370\003" +
-      "\022\016\n\tEREADONLY\020\371\003*c\n\rOperationType\022\016\n\nREA",
-      "D_ENTRY\020\001\022\r\n\tADD_ENTRY\020\002\022\024\n\020RANGE_READ_E" +
-      "NTRY\020\003\022\023\n\017RANGE_ADD_ENTRY\020\004\022\010\n\004AUTH\020\005B\037\n" +
-      "\033org.apache.bookkeeper.protoH\001"
+      "de\022\020\n\010ledgerId\030\002 \002(\003\022\017\n\007entryId\030\003 \002(\003\"6\n" +
+      "\013AuthMessage\022\026\n\016authPluginName\030\001 \002(\t\022\017\n\007" +
+      "payload\030\002 \002(\014*F\n\017ProtocolVersion\022\017\n\013VERS" +
+      "ION_ONE\020\001\022\017\n\013VERSION_TWO\020\002\022\021\n\rVERSION_TH" +
+      "REE\020\003*\206\001\n\nStatusCode\022\007\n\003EOK\020\000\022\016\n\tENOLEDG" +
+      "ER\020\222\003\022\r\n\010ENOENTRY\020\223\003\022\014\n\007EBADREQ\020\224\003\022\010\n\003EI" +
+      "O\020\365\003\022\010\n\003EUA\020\366\003\022\020\n\013EBADVERSION\020\367\003\022\014\n\007EFEN" +
+      "CED\020\370\003\022\016\n\tEREADONLY\020\371\003*c\n\rOperationType\022",
+      "\016\n\nREAD_ENTRY\020\001\022\r\n\tADD_ENTRY\020\002\022\024\n\020RANGE_" +
+      "READ_ENTRY\020\003\022\023\n\017RANGE_ADD_ENTRY\020\004\022\010\n\004AUT" +
+      "H\020\005B\037\n\033org.apache.bookkeeper.protoH\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
         new com.google.protobuf.Descriptors.FileDescriptor.    InternalDescriptorAssigner() {
@@ -7039,7 +7099,7 @@ public final class BookkeeperProtocol {
     internal_static_AuthMessage_fieldAccessorTable = new
       com.google.protobuf.GeneratedMessage.FieldAccessorTable(
         internal_static_AuthMessage_descriptor,
-        new java.lang.String[] { "AuthPluginName", });
+        new java.lang.String[] { "AuthPluginName", "Payload", });
   }
 
   // @@protoc_insertion_point(outer_class_scope)

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/13d668f2/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ConnectionPeer.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ConnectionPeer.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ConnectionPeer.java
new file mode 100644
index 0000000..11a1eb5
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ConnectionPeer.java
@@ -0,0 +1,63 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.proto;
+
+import java.net.SocketAddress;
+import java.util.Collection;
+import org.apache.bookkeeper.auth.BookKeeperPrincipal;
+
+/**
+ * Represents the connection to a BookKeeper client, from the Bookie side 
+ */
+public interface ConnectionPeer {
+
+    /**
+     * Address from which originated the connection
+     * @return
+     */
+    public SocketAddress getRemoteAddr();
+
+    /**
+     * Additional principals bound to the connection, like TLS certificates
+     * @return
+     */
+    public Collection<Object> getProtocolPrincipals();
+
+    /**
+     * Utility function to be used from AuthProviders to drop the connection
+     */
+    public void disconnect();
+
+    /**
+     * Returns the user which is bound to the connection
+     * @return the principal or null if no auth takes place
+     * or the auth plugin did not call {@link #setAuthorizedId(org.apache.bookkeeper.auth.BookKeeperPrincipal)}
+     * @see  #setAuthorizedId(org.apache.bookkeeper.auth.BookKeeperPrincipal)
+     */
+    public BookKeeperPrincipal getAuthorizedId();
+
+    /**
+     * Assign a principal to the current connection
+     * @param principal the id of the user
+     * @see #getAuthorizedId()
+     */
+    public void setAuthorizedId(BookKeeperPrincipal principal);
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/13d668f2/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 274fd36..a4fb761 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -67,7 +67,6 @@ import org.jboss.netty.channel.ExceptionEvent;
 import org.jboss.netty.channel.MessageEvent;
 import org.jboss.netty.channel.SimpleChannelHandler;
 import org.jboss.netty.channel.local.DefaultLocalClientChannelFactory;
-import org.jboss.netty.channel.local.LocalAddress;
 import org.jboss.netty.channel.local.LocalClientChannelFactory;
 import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
 import org.jboss.netty.handler.codec.frame.CorruptedFrameException;
@@ -85,7 +84,10 @@ import com.google.common.collect.Sets;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.ExtensionRegistry;
 import java.net.SocketAddress;
+import java.util.Collection;
+import org.apache.bookkeeper.auth.BookKeeperPrincipal;
 import org.jboss.netty.channel.ChannelFactory;
+import org.apache.bookkeeper.client.ClientConnectionPeer;
 
 /**
  * This class manages all details of connection to a particular bookie. It also
@@ -131,6 +133,8 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
     private volatile Queue<GenericCallback<PerChannelBookieClient>> pendingOps =
             new ArrayDeque<GenericCallback<PerChannelBookieClient>>();
     volatile Channel channel = null;
+    private final ClientConnectionPeer connectionPeer;
+    private volatile BookKeeperPrincipal authorizedId = BookKeeperPrincipal.ANONYMOUS;
 
     enum ConnectionState {
         DISCONNECTED, CONNECTING, CONNECTED, CLOSED
@@ -192,6 +196,45 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
         addTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_ADD);
 
         this.pcbcPool = pcbcPool;
+
+        this.connectionPeer = new ClientConnectionPeer() {
+
+            @Override
+            public SocketAddress getRemoteAddr() {
+                Channel c = channel;
+                if (c != null) {
+                    return c.getRemoteAddress();
+                } else {
+                    return null;
+                }
+            }
+
+            @Override
+            public Collection<Object> getProtocolPrincipals() {
+                return Collections.emptyList();
+            }
+
+            @Override
+            public void disconnect() {
+                Channel c = channel;
+                if (c != null) {
+                    c.close();
+                }
+                LOG.info("authplugin disconnected channel {}", channel);
+            }
+
+            @Override
+            public void setAuthorizedId(BookKeeperPrincipal principal) {
+                authorizedId = principal;
+                LOG.info("connection {} authenticated as {}", channel, principal);
+            }
+
+            @Override
+            public BookKeeperPrincipal getAuthorizedId() {
+                return authorizedId;
+            }
+
+        };
     }
 
     private void completeOperation(GenericCallback<PerChannelBookieClient> op, int rc) {
@@ -687,7 +730,8 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
         pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
         pipeline.addLast("bookieProtoEncoder", new BookieProtoEncoding.RequestEncoder(extRegistry));
         pipeline.addLast("bookieProtoDecoder", new BookieProtoEncoding.ResponseDecoder(extRegistry));
-        pipeline.addLast("authHandler", new AuthHandler.ClientSideHandler(authProviderFactory, txnIdGenerator));
+        pipeline.addLast("authHandler", new AuthHandler.ClientSideHandler(authProviderFactory, txnIdGenerator,
+            connectionPeer));
         pipeline.addLast("mainhandler", this);
         return pipeline;
     }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/13d668f2/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
index 5a4bdcc..037987b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
@@ -145,7 +145,10 @@ public class Auditor implements BookiesListener {
             this.ledgerUnderreplicationManager = ledgerManagerFactory
                     .newLedgerUnderreplicationManager();
 
-            this.bkc = new BookKeeper(new ClientConfiguration(conf), zkc);
+            ClientConfiguration clientConfiguration = new ClientConfiguration(conf);
+            clientConfiguration.setClientRole(ClientConfiguration.CLIENT_ROLE_SYSTEM);
+            LOG.info("AuthProvider used by the Auditor is "+clientConfiguration.getClientAuthProviderFactoryClass());
+            this.bkc = new BookKeeper(clientConfiguration, zkc);
             this.admin = new BookKeeperAdmin(bkc, statsLogger);
         } catch (CompatibilityException ce) {
             throw new UnavailableException(

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/13d668f2/bookkeeper-server/src/main/proto/BookkeeperProtocol.proto
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/proto/BookkeeperProtocol.proto b/bookkeeper-server/src/main/proto/BookkeeperProtocol.proto
index 7aeadfc..aabf80b 100644
--- a/bookkeeper-server/src/main/proto/BookkeeperProtocol.proto
+++ b/bookkeeper-server/src/main/proto/BookkeeperProtocol.proto
@@ -124,11 +124,7 @@ message AddResponse {
     required int64 entryId = 3;
 }
 
-/**
- * Extendible message which auth mechanisms
- * can use to carry their payload.
- */
 message AuthMessage {
     required string authPluginName = 1;
-    extensions 1000 to max;
+    required bytes payload = 2;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/13d668f2/bookkeeper-server/src/main/resources/findbugsExclude.xml
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/resources/findbugsExclude.xml b/bookkeeper-server/src/main/resources/findbugsExclude.xml
index fa94708..e8dad26 100644
--- a/bookkeeper-server/src/main/resources/findbugsExclude.xml
+++ b/bookkeeper-server/src/main/resources/findbugsExclude.xml
@@ -37,6 +37,11 @@
     <Method name="getBuffer" />
     <Bug pattern="EI_EXPOSE_REP" />
   </Match>
+  <Match>
+    <Class name="org.apache.bookkeeper.auth.AuthToken" />
+    <Method name="getData" />
+    <Bug pattern="EI_EXPOSE_REP" />
+  </Match>
   <And>
     <Bug category="MT_CORRECTNESS"/>
     <Class name="~org.apache.bookkeeper.util.collections\.[^.]+"/>