You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2016/04/05 08:46:02 UTC

[2/3] bookkeeper git commit: BOOKKEEPER-901: Authentication framework

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperProtocol.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperProtocol.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperProtocol.java
index 57d3503..7fbb2cd 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperProtocol.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperProtocol.java
@@ -176,12 +176,14 @@ public final class BookkeeperProtocol {
     ADD_ENTRY(1, 2),
     RANGE_READ_ENTRY(2, 3),
     RANGE_ADD_ENTRY(3, 4),
+    AUTH(4, 5),
     ;
     
     public static final int READ_ENTRY_VALUE = 1;
     public static final int ADD_ENTRY_VALUE = 2;
     public static final int RANGE_READ_ENTRY_VALUE = 3;
     public static final int RANGE_ADD_ENTRY_VALUE = 4;
+    public static final int AUTH_VALUE = 5;
     
     
     public final int getNumber() { return value; }
@@ -192,6 +194,7 @@ public final class BookkeeperProtocol {
         case 2: return ADD_ENTRY;
         case 3: return RANGE_READ_ENTRY;
         case 4: return RANGE_ADD_ENTRY;
+        case 5: return AUTH;
         default: return null;
       }
     }
@@ -222,7 +225,7 @@ public final class BookkeeperProtocol {
     }
     
     private static final OperationType[] VALUES = {
-      READ_ENTRY, ADD_ENTRY, RANGE_READ_ENTRY, RANGE_ADD_ENTRY, 
+      READ_ENTRY, ADD_ENTRY, RANGE_READ_ENTRY, RANGE_ADD_ENTRY, AUTH, 
     };
     
     public static OperationType valueOf(
@@ -756,6 +759,11 @@ public final class BookkeeperProtocol {
     boolean hasAddRequest();
     org.apache.bookkeeper.proto.BookkeeperProtocol.AddRequest getAddRequest();
     org.apache.bookkeeper.proto.BookkeeperProtocol.AddRequestOrBuilder getAddRequestOrBuilder();
+    
+    // optional .AuthMessage authRequest = 102;
+    boolean hasAuthRequest();
+    org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage getAuthRequest();
+    org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessageOrBuilder getAuthRequestOrBuilder();
   }
   public static final class Request extends
       com.google.protobuf.GeneratedMessage
@@ -825,10 +833,24 @@ public final class BookkeeperProtocol {
       return addRequest_;
     }
     
+    // optional .AuthMessage authRequest = 102;
+    public static final int AUTHREQUEST_FIELD_NUMBER = 102;
+    private org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage authRequest_;
+    public boolean hasAuthRequest() {
+      return ((bitField0_ & 0x00000008) == 0x00000008);
+    }
+    public org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage getAuthRequest() {
+      return authRequest_;
+    }
+    public org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessageOrBuilder getAuthRequestOrBuilder() {
+      return authRequest_;
+    }
+    
     private void initFields() {
       header_ = org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader.getDefaultInstance();
       readRequest_ = org.apache.bookkeeper.proto.BookkeeperProtocol.ReadRequest.getDefaultInstance();
       addRequest_ = org.apache.bookkeeper.proto.BookkeeperProtocol.AddRequest.getDefaultInstance();
+      authRequest_ = org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.getDefaultInstance();
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -855,6 +877,12 @@ public final class BookkeeperProtocol {
           return false;
         }
       }
+      if (hasAuthRequest()) {
+        if (!getAuthRequest().isInitialized()) {
+          memoizedIsInitialized = 0;
+          return false;
+        }
+      }
       memoizedIsInitialized = 1;
       return true;
     }
@@ -871,6 +899,9 @@ public final class BookkeeperProtocol {
       if (((bitField0_ & 0x00000004) == 0x00000004)) {
         output.writeMessage(101, addRequest_);
       }
+      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        output.writeMessage(102, authRequest_);
+      }
       getUnknownFields().writeTo(output);
     }
     
@@ -892,6 +923,10 @@ public final class BookkeeperProtocol {
         size += com.google.protobuf.CodedOutputStream
           .computeMessageSize(101, addRequest_);
       }
+      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeMessageSize(102, authRequest_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -1011,6 +1046,7 @@ public final class BookkeeperProtocol {
           getHeaderFieldBuilder();
           getReadRequestFieldBuilder();
           getAddRequestFieldBuilder();
+          getAuthRequestFieldBuilder();
         }
       }
       private static Builder create() {
@@ -1037,6 +1073,12 @@ public final class BookkeeperProtocol {
           addRequestBuilder_.clear();
         }
         bitField0_ = (bitField0_ & ~0x00000004);
+        if (authRequestBuilder_ == null) {
+          authRequest_ = org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.getDefaultInstance();
+        } else {
+          authRequestBuilder_.clear();
+        }
+        bitField0_ = (bitField0_ & ~0x00000008);
         return this;
       }
       
@@ -1099,6 +1141,14 @@ public final class BookkeeperProtocol {
         } else {
           result.addRequest_ = addRequestBuilder_.build();
         }
+        if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
+          to_bitField0_ |= 0x00000008;
+        }
+        if (authRequestBuilder_ == null) {
+          result.authRequest_ = authRequest_;
+        } else {
+          result.authRequest_ = authRequestBuilder_.build();
+        }
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -1124,6 +1174,9 @@ public final class BookkeeperProtocol {
         if (other.hasAddRequest()) {
           mergeAddRequest(other.getAddRequest());
         }
+        if (other.hasAuthRequest()) {
+          mergeAuthRequest(other.getAuthRequest());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -1149,6 +1202,12 @@ public final class BookkeeperProtocol {
             return false;
           }
         }
+        if (hasAuthRequest()) {
+          if (!getAuthRequest().isInitialized()) {
+            
+            return false;
+          }
+        }
         return true;
       }
       
@@ -1202,6 +1261,15 @@ public final class BookkeeperProtocol {
               setAddRequest(subBuilder.buildPartial());
               break;
             }
+            case 818: {
+              org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.Builder subBuilder = org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.newBuilder();
+              if (hasAuthRequest()) {
+                subBuilder.mergeFrom(getAuthRequest());
+              }
+              input.readMessage(subBuilder, extensionRegistry);
+              setAuthRequest(subBuilder.buildPartial());
+              break;
+            }
           }
         }
       }
@@ -1478,6 +1546,96 @@ public final class BookkeeperProtocol {
         return addRequestBuilder_;
       }
       
+      // optional .AuthMessage authRequest = 102;
+      private org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage authRequest_ = org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.getDefaultInstance();
+      private com.google.protobuf.SingleFieldBuilder<
+          org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage, org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.Builder, org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessageOrBuilder> authRequestBuilder_;
+      public boolean hasAuthRequest() {
+        return ((bitField0_ & 0x00000008) == 0x00000008);
+      }
+      public org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage getAuthRequest() {
+        if (authRequestBuilder_ == null) {
+          return authRequest_;
+        } else {
+          return authRequestBuilder_.getMessage();
+        }
+      }
+      public Builder setAuthRequest(org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage value) {
+        if (authRequestBuilder_ == null) {
+          if (value == null) {
+            throw new NullPointerException();
+          }
+          authRequest_ = value;
+          onChanged();
+        } else {
+          authRequestBuilder_.setMessage(value);
+        }
+        bitField0_ |= 0x00000008;
+        return this;
+      }
+      public Builder setAuthRequest(
+          org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.Builder builderForValue) {
+        if (authRequestBuilder_ == null) {
+          authRequest_ = builderForValue.build();
+          onChanged();
+        } else {
+          authRequestBuilder_.setMessage(builderForValue.build());
+        }
+        bitField0_ |= 0x00000008;
+        return this;
+      }
+      public Builder mergeAuthRequest(org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage value) {
+        if (authRequestBuilder_ == null) {
+          if (((bitField0_ & 0x00000008) == 0x00000008) &&
+              authRequest_ != org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.getDefaultInstance()) {
+            authRequest_ =
+              org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.newBuilder(authRequest_).mergeFrom(value).buildPartial();
+          } else {
+            authRequest_ = value;
+          }
+          onChanged();
+        } else {
+          authRequestBuilder_.mergeFrom(value);
+        }
+        bitField0_ |= 0x00000008;
+        return this;
+      }
+      public Builder clearAuthRequest() {
+        if (authRequestBuilder_ == null) {
+          authRequest_ = org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.getDefaultInstance();
+          onChanged();
+        } else {
+          authRequestBuilder_.clear();
+        }
+        bitField0_ = (bitField0_ & ~0x00000008);
+        return this;
+      }
+      public org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.Builder getAuthRequestBuilder() {
+        bitField0_ |= 0x00000008;
+        onChanged();
+        return getAuthRequestFieldBuilder().getBuilder();
+      }
+      public org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessageOrBuilder getAuthRequestOrBuilder() {
+        if (authRequestBuilder_ != null) {
+          return authRequestBuilder_.getMessageOrBuilder();
+        } else {
+          return authRequest_;
+        }
+      }
+      private com.google.protobuf.SingleFieldBuilder<
+          org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage, org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.Builder, org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessageOrBuilder> 
+          getAuthRequestFieldBuilder() {
+        if (authRequestBuilder_ == null) {
+          authRequestBuilder_ = new com.google.protobuf.SingleFieldBuilder<
+              org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage, org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.Builder, org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessageOrBuilder>(
+                  authRequest_,
+                  getParentForChildren(),
+                  isClean());
+          authRequest_ = null;
+        }
+        return authRequestBuilder_;
+      }
+      
       // @@protoc_insertion_point(builder_scope:Request)
     }
     
@@ -2792,6 +2950,11 @@ public final class BookkeeperProtocol {
     boolean hasAddResponse();
     org.apache.bookkeeper.proto.BookkeeperProtocol.AddResponse getAddResponse();
     org.apache.bookkeeper.proto.BookkeeperProtocol.AddResponseOrBuilder getAddResponseOrBuilder();
+    
+    // optional .AuthMessage authResponse = 102;
+    boolean hasAuthResponse();
+    org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage getAuthResponse();
+    org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessageOrBuilder getAuthResponseOrBuilder();
   }
   public static final class Response extends
       com.google.protobuf.GeneratedMessage
@@ -2871,11 +3034,25 @@ public final class BookkeeperProtocol {
       return addResponse_;
     }
     
+    // optional .AuthMessage authResponse = 102;
+    public static final int AUTHRESPONSE_FIELD_NUMBER = 102;
+    private org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage authResponse_;
+    public boolean hasAuthResponse() {
+      return ((bitField0_ & 0x00000010) == 0x00000010);
+    }
+    public org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage getAuthResponse() {
+      return authResponse_;
+    }
+    public org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessageOrBuilder getAuthResponseOrBuilder() {
+      return authResponse_;
+    }
+    
     private void initFields() {
       header_ = org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader.getDefaultInstance();
       status_ = org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode.EOK;
       readResponse_ = org.apache.bookkeeper.proto.BookkeeperProtocol.ReadResponse.getDefaultInstance();
       addResponse_ = org.apache.bookkeeper.proto.BookkeeperProtocol.AddResponse.getDefaultInstance();
+      authResponse_ = org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.getDefaultInstance();
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -2906,6 +3083,12 @@ public final class BookkeeperProtocol {
           return false;
         }
       }
+      if (hasAuthResponse()) {
+        if (!getAuthResponse().isInitialized()) {
+          memoizedIsInitialized = 0;
+          return false;
+        }
+      }
       memoizedIsInitialized = 1;
       return true;
     }
@@ -2925,6 +3108,9 @@ public final class BookkeeperProtocol {
       if (((bitField0_ & 0x00000008) == 0x00000008)) {
         output.writeMessage(101, addResponse_);
       }
+      if (((bitField0_ & 0x00000010) == 0x00000010)) {
+        output.writeMessage(102, authResponse_);
+      }
       getUnknownFields().writeTo(output);
     }
     
@@ -2950,6 +3136,10 @@ public final class BookkeeperProtocol {
         size += com.google.protobuf.CodedOutputStream
           .computeMessageSize(101, addResponse_);
       }
+      if (((bitField0_ & 0x00000010) == 0x00000010)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeMessageSize(102, authResponse_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -3069,6 +3259,7 @@ public final class BookkeeperProtocol {
           getHeaderFieldBuilder();
           getReadResponseFieldBuilder();
           getAddResponseFieldBuilder();
+          getAuthResponseFieldBuilder();
         }
       }
       private static Builder create() {
@@ -3097,6 +3288,12 @@ public final class BookkeeperProtocol {
           addResponseBuilder_.clear();
         }
         bitField0_ = (bitField0_ & ~0x00000008);
+        if (authResponseBuilder_ == null) {
+          authResponse_ = org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.getDefaultInstance();
+        } else {
+          authResponseBuilder_.clear();
+        }
+        bitField0_ = (bitField0_ & ~0x00000010);
         return this;
       }
       
@@ -3163,6 +3360,14 @@ public final class BookkeeperProtocol {
         } else {
           result.addResponse_ = addResponseBuilder_.build();
         }
+        if (((from_bitField0_ & 0x00000010) == 0x00000010)) {
+          to_bitField0_ |= 0x00000010;
+        }
+        if (authResponseBuilder_ == null) {
+          result.authResponse_ = authResponse_;
+        } else {
+          result.authResponse_ = authResponseBuilder_.build();
+        }
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -3191,6 +3396,9 @@ public final class BookkeeperProtocol {
         if (other.hasAddResponse()) {
           mergeAddResponse(other.getAddResponse());
         }
+        if (other.hasAuthResponse()) {
+          mergeAuthResponse(other.getAuthResponse());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -3220,6 +3428,12 @@ public final class BookkeeperProtocol {
             return false;
           }
         }
+        if (hasAuthResponse()) {
+          if (!getAuthResponse().isInitialized()) {
+            
+            return false;
+          }
+        }
         return true;
       }
       
@@ -3284,6 +3498,15 @@ public final class BookkeeperProtocol {
               setAddResponse(subBuilder.buildPartial());
               break;
             }
+            case 818: {
+              org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.Builder subBuilder = org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.newBuilder();
+              if (hasAuthResponse()) {
+                subBuilder.mergeFrom(getAuthResponse());
+              }
+              input.readMessage(subBuilder, extensionRegistry);
+              setAuthResponse(subBuilder.buildPartial());
+              break;
+            }
           }
         }
       }
@@ -3584,6 +3807,96 @@ public final class BookkeeperProtocol {
         return addResponseBuilder_;
       }
       
+      // optional .AuthMessage authResponse = 102;
+      private org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage authResponse_ = org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.getDefaultInstance();
+      private com.google.protobuf.SingleFieldBuilder<
+          org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage, org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.Builder, org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessageOrBuilder> authResponseBuilder_;
+      public boolean hasAuthResponse() {
+        return ((bitField0_ & 0x00000010) == 0x00000010);
+      }
+      public org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage getAuthResponse() {
+        if (authResponseBuilder_ == null) {
+          return authResponse_;
+        } else {
+          return authResponseBuilder_.getMessage();
+        }
+      }
+      public Builder setAuthResponse(org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage value) {
+        if (authResponseBuilder_ == null) {
+          if (value == null) {
+            throw new NullPointerException();
+          }
+          authResponse_ = value;
+          onChanged();
+        } else {
+          authResponseBuilder_.setMessage(value);
+        }
+        bitField0_ |= 0x00000010;
+        return this;
+      }
+      public Builder setAuthResponse(
+          org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.Builder builderForValue) {
+        if (authResponseBuilder_ == null) {
+          authResponse_ = builderForValue.build();
+          onChanged();
+        } else {
+          authResponseBuilder_.setMessage(builderForValue.build());
+        }
+        bitField0_ |= 0x00000010;
+        return this;
+      }
+      public Builder mergeAuthResponse(org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage value) {
+        if (authResponseBuilder_ == null) {
+          if (((bitField0_ & 0x00000010) == 0x00000010) &&
+              authResponse_ != org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.getDefaultInstance()) {
+            authResponse_ =
+              org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.newBuilder(authResponse_).mergeFrom(value).buildPartial();
+          } else {
+            authResponse_ = value;
+          }
+          onChanged();
+        } else {
+          authResponseBuilder_.mergeFrom(value);
+        }
+        bitField0_ |= 0x00000010;
+        return this;
+      }
+      public Builder clearAuthResponse() {
+        if (authResponseBuilder_ == null) {
+          authResponse_ = org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.getDefaultInstance();
+          onChanged();
+        } else {
+          authResponseBuilder_.clear();
+        }
+        bitField0_ = (bitField0_ & ~0x00000010);
+        return this;
+      }
+      public org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.Builder getAuthResponseBuilder() {
+        bitField0_ |= 0x00000010;
+        onChanged();
+        return getAuthResponseFieldBuilder().getBuilder();
+      }
+      public org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessageOrBuilder getAuthResponseOrBuilder() {
+        if (authResponseBuilder_ != null) {
+          return authResponseBuilder_.getMessageOrBuilder();
+        } else {
+          return authResponse_;
+        }
+      }
+      private com.google.protobuf.SingleFieldBuilder<
+          org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage, org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.Builder, org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessageOrBuilder> 
+          getAuthResponseFieldBuilder() {
+        if (authResponseBuilder_ == null) {
+          authResponseBuilder_ = new com.google.protobuf.SingleFieldBuilder<
+              org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage, org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.Builder, org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessageOrBuilder>(
+                  authResponse_,
+                  getParentForChildren(),
+                  isClean());
+          authResponse_ = null;
+        }
+        return authResponseBuilder_;
+      }
+      
       // @@protoc_insertion_point(builder_scope:Response)
     }
     
@@ -4625,6 +4938,404 @@ public final class BookkeeperProtocol {
     // @@protoc_insertion_point(class_scope:AddResponse)
   }
   
+  public interface AuthMessageOrBuilder extends
+      com.google.protobuf.GeneratedMessage.
+          ExtendableMessageOrBuilder<AuthMessage> {
+    
+    // required string authPluginName = 1;
+    boolean hasAuthPluginName();
+    String getAuthPluginName();
+  }
+  public static final class AuthMessage extends
+      com.google.protobuf.GeneratedMessage.ExtendableMessage<
+        AuthMessage> implements AuthMessageOrBuilder {
+    // Use AuthMessage.newBuilder() to construct.
+    private AuthMessage(Builder builder) {
+      super(builder);
+    }
+    private AuthMessage(boolean noInit) {}
+    
+    private static final AuthMessage defaultInstance;
+    public static AuthMessage getDefaultInstance() {
+      return defaultInstance;
+    }
+    
+    public AuthMessage getDefaultInstanceForType() {
+      return defaultInstance;
+    }
+    
+    public static final com.google.protobuf.Descriptors.Descriptor
+        getDescriptor() {
+      return org.apache.bookkeeper.proto.BookkeeperProtocol.internal_static_AuthMessage_descriptor;
+    }
+    
+    protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+        internalGetFieldAccessorTable() {
+      return org.apache.bookkeeper.proto.BookkeeperProtocol.internal_static_AuthMessage_fieldAccessorTable;
+    }
+    
+    private int bitField0_;
+    // required string authPluginName = 1;
+    public static final int AUTHPLUGINNAME_FIELD_NUMBER = 1;
+    private java.lang.Object authPluginName_;
+    public boolean hasAuthPluginName() {
+      return ((bitField0_ & 0x00000001) == 0x00000001);
+    }
+    public String getAuthPluginName() {
+      java.lang.Object ref = authPluginName_;
+      if (ref instanceof String) {
+        return (String) ref;
+      } else {
+        com.google.protobuf.ByteString bs = 
+            (com.google.protobuf.ByteString) ref;
+        String s = bs.toStringUtf8();
+        if (com.google.protobuf.Internal.isValidUtf8(bs)) {
+          authPluginName_ = s;
+        }
+        return s;
+      }
+    }
+    private com.google.protobuf.ByteString getAuthPluginNameBytes() {
+      java.lang.Object ref = authPluginName_;
+      if (ref instanceof String) {
+        com.google.protobuf.ByteString b = 
+            com.google.protobuf.ByteString.copyFromUtf8((String) ref);
+        authPluginName_ = b;
+        return b;
+      } else {
+        return (com.google.protobuf.ByteString) ref;
+      }
+    }
+    
+    private void initFields() {
+      authPluginName_ = "";
+    }
+    private byte memoizedIsInitialized = -1;
+    public final boolean isInitialized() {
+      byte isInitialized = memoizedIsInitialized;
+      if (isInitialized != -1) return isInitialized == 1;
+      
+      if (!hasAuthPluginName()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      if (!extensionsAreInitialized()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      memoizedIsInitialized = 1;
+      return true;
+    }
+    
+    public void writeTo(com.google.protobuf.CodedOutputStream output)
+                        throws java.io.IOException {
+      getSerializedSize();
+      com.google.protobuf.GeneratedMessage
+        .ExtendableMessage<org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage>.ExtensionWriter extensionWriter =
+          newExtensionWriter();
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        output.writeBytes(1, getAuthPluginNameBytes());
+      }
+      extensionWriter.writeUntil(536870912, output);
+      getUnknownFields().writeTo(output);
+    }
+    
+    private int memoizedSerializedSize = -1;
+    public int getSerializedSize() {
+      int size = memoizedSerializedSize;
+      if (size != -1) return size;
+    
+      size = 0;
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(1, getAuthPluginNameBytes());
+      }
+      size += extensionsSerializedSize();
+      size += getUnknownFields().getSerializedSize();
+      memoizedSerializedSize = size;
+      return size;
+    }
+    
+    private static final long serialVersionUID = 0L;
+    @java.lang.Override
+    protected java.lang.Object writeReplace()
+        throws java.io.ObjectStreamException {
+      return super.writeReplace();
+    }
+    
+    public static org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage parseFrom(
+        com.google.protobuf.ByteString data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data).buildParsed();
+    }
+    public static org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage parseFrom(
+        com.google.protobuf.ByteString data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data, extensionRegistry)
+               .buildParsed();
+    }
+    public static org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage parseFrom(byte[] data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data).buildParsed();
+    }
+    public static org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage parseFrom(
+        byte[] data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data, extensionRegistry)
+               .buildParsed();
+    }
+    public static org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage parseFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage parseFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    public static org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage parseDelimitedFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage parseDelimitedFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage parseFrom(
+        com.google.protobuf.CodedInputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage parseFrom(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    
+    public static Builder newBuilder() { return Builder.create(); }
+    public Builder newBuilderForType() { return newBuilder(); }
+    public static Builder newBuilder(org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage prototype) {
+      return newBuilder().mergeFrom(prototype);
+    }
+    public Builder toBuilder() { return newBuilder(this); }
+    
+    @java.lang.Override
+    protected Builder newBuilderForType(
+        com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+      Builder builder = new Builder(parent);
+      return builder;
+    }
+    public static final class Builder extends
+        com.google.protobuf.GeneratedMessage.ExtendableBuilder<
+          org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage, Builder> implements org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessageOrBuilder {
+      public static final com.google.protobuf.Descriptors.Descriptor
+          getDescriptor() {
+        return org.apache.bookkeeper.proto.BookkeeperProtocol.internal_static_AuthMessage_descriptor;
+      }
+      
+      protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+          internalGetFieldAccessorTable() {
+        return org.apache.bookkeeper.proto.BookkeeperProtocol.internal_static_AuthMessage_fieldAccessorTable;
+      }
+      
+      // Construct using org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.newBuilder()
+      private Builder() {
+        maybeForceBuilderInitialization();
+      }
+      
+      private Builder(BuilderParent parent) {
+        super(parent);
+        maybeForceBuilderInitialization();
+      }
+      private void maybeForceBuilderInitialization() {
+        if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
+        }
+      }
+      private static Builder create() {
+        return new Builder();
+      }
+      
+      public Builder clear() {
+        super.clear();
+        authPluginName_ = "";
+        bitField0_ = (bitField0_ & ~0x00000001);
+        return this;
+      }
+      
+      public Builder clone() {
+        return create().mergeFrom(buildPartial());
+      }
+      
+      public com.google.protobuf.Descriptors.Descriptor
+          getDescriptorForType() {
+        return org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.getDescriptor();
+      }
+      
+      public org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage getDefaultInstanceForType() {
+        return org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.getDefaultInstance();
+      }
+      
+      public org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage build() {
+        org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(result);
+        }
+        return result;
+      }
+      
+      private org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage buildParsed()
+          throws com.google.protobuf.InvalidProtocolBufferException {
+        org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(
+            result).asInvalidProtocolBufferException();
+        }
+        return result;
+      }
+      
+      public org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage buildPartial() {
+        org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage result = new org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage(this);
+        int from_bitField0_ = bitField0_;
+        int to_bitField0_ = 0;
+        if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+          to_bitField0_ |= 0x00000001;
+        }
+        result.authPluginName_ = authPluginName_;
+        result.bitField0_ = to_bitField0_;
+        onBuilt();
+        return result;
+      }
+      
+      public Builder mergeFrom(com.google.protobuf.Message other) {
+        if (other instanceof org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage) {
+          return mergeFrom((org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage)other);
+        } else {
+          super.mergeFrom(other);
+          return this;
+        }
+      }
+      
+      public Builder mergeFrom(org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage other) {
+        if (other == org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.getDefaultInstance()) return this;
+        if (other.hasAuthPluginName()) {
+          setAuthPluginName(other.getAuthPluginName());
+        }
+        this.mergeExtensionFields(other);
+        this.mergeUnknownFields(other.getUnknownFields());
+        return this;
+      }
+      
+      public final boolean isInitialized() {
+        if (!hasAuthPluginName()) {
+          
+          return false;
+        }
+        if (!extensionsAreInitialized()) {
+          
+          return false;
+        }
+        return true;
+      }
+      
+      public Builder mergeFrom(
+          com.google.protobuf.CodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws java.io.IOException {
+        com.google.protobuf.UnknownFieldSet.Builder unknownFields =
+          com.google.protobuf.UnknownFieldSet.newBuilder(
+            this.getUnknownFields());
+        while (true) {
+          int tag = input.readTag();
+          switch (tag) {
+            case 0:
+              this.setUnknownFields(unknownFields.build());
+              onChanged();
+              return this;
+            default: {
+              if (!parseUnknownField(input, unknownFields,
+                                     extensionRegistry, tag)) {
+                this.setUnknownFields(unknownFields.build());
+                onChanged();
+                return this;
+              }
+              break;
+            }
+            case 10: {
+              bitField0_ |= 0x00000001;
+              authPluginName_ = input.readBytes();
+              break;
+            }
+          }
+        }
+      }
+      
+      private int bitField0_;
+      
+      // required string authPluginName = 1;
+      private java.lang.Object authPluginName_ = "";
+      public boolean hasAuthPluginName() {
+        return ((bitField0_ & 0x00000001) == 0x00000001);
+      }
+      public String getAuthPluginName() {
+        java.lang.Object ref = authPluginName_;
+        if (!(ref instanceof String)) {
+          String s = ((com.google.protobuf.ByteString) ref).toStringUtf8();
+          authPluginName_ = s;
+          return s;
+        } else {
+          return (String) ref;
+        }
+      }
+      public Builder setAuthPluginName(String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000001;
+        authPluginName_ = value;
+        onChanged();
+        return this;
+      }
+      public Builder clearAuthPluginName() {
+        bitField0_ = (bitField0_ & ~0x00000001);
+        authPluginName_ = getDefaultInstance().getAuthPluginName();
+        onChanged();
+        return this;
+      }
+      void setAuthPluginName(com.google.protobuf.ByteString value) {
+        bitField0_ |= 0x00000001;
+        authPluginName_ = value;
+        onChanged();
+      }
+      
+      // @@protoc_insertion_point(builder_scope:AuthMessage)
+    }
+    
+    static {
+      defaultInstance = new AuthMessage(true);
+      defaultInstance.initFields();
+    }
+    
+    // @@protoc_insertion_point(class_scope:AuthMessage)
+  }
+  
   private static com.google.protobuf.Descriptors.Descriptor
     internal_static_BKPacketHeader_descriptor;
   private static
@@ -4660,6 +5371,11 @@ public final class BookkeeperProtocol {
   private static
     com.google.protobuf.GeneratedMessage.FieldAccessorTable
       internal_static_AddResponse_fieldAccessorTable;
+  private static com.google.protobuf.Descriptors.Descriptor
+    internal_static_AuthMessage_descriptor;
+  private static
+    com.google.protobuf.GeneratedMessage.FieldAccessorTable
+      internal_static_AuthMessage_fieldAccessorTable;
   
   public static com.google.protobuf.Descriptors.FileDescriptor
       getDescriptor() {
@@ -4672,33 +5388,36 @@ public final class BookkeeperProtocol {
       "\n\'src/main/proto/BookkeeperProtocol.prot" +
       "o\"e\n\016BKPacketHeader\022!\n\007version\030\001 \002(\0162\020.P" +
       "rotocolVersion\022!\n\toperation\030\002 \002(\0162\016.Oper" +
-      "ationType\022\r\n\005txnId\030\003 \002(\004\"n\n\007Request\022\037\n\006h" +
-      "eader\030\001 \002(\0132\017.BKPacketHeader\022!\n\013readRequ" +
-      "est\030d \001(\0132\014.ReadRequest\022\037\n\naddRequest\030e " +
-      "\001(\0132\013.AddRequest\"~\n\013ReadRequest\022\037\n\004flag\030" +
-      "d \001(\0162\021.ReadRequest.Flag\022\020\n\010ledgerId\030\001 \002" +
-      "(\003\022\017\n\007entryId\030\002 \002(\003\022\021\n\tmasterKey\030\003 \001(\014\"\030" +
-      "\n\004Flag\022\020\n\014FENCE_LEDGER\020\001\"\212\001\n\nAddRequest\022",
-      "\036\n\004flag\030d \001(\0162\020.AddRequest.Flag\022\020\n\010ledge" +
-      "rId\030\001 \002(\003\022\017\n\007entryId\030\002 \002(\003\022\021\n\tmasterKey\030" +
-      "\003 \002(\014\022\014\n\004body\030\004 \002(\014\"\030\n\004Flag\022\020\n\014RECOVERY_" +
-      "ADD\020\001\"\220\001\n\010Response\022\037\n\006header\030\001 \002(\0132\017.BKP" +
-      "acketHeader\022\033\n\006status\030\002 \002(\0162\013.StatusCode" +
-      "\022#\n\014readResponse\030d \001(\0132\r.ReadResponse\022!\n" +
-      "\013addResponse\030e \001(\0132\014.AddResponse\"\\\n\014Read" +
-      "Response\022\033\n\006status\030\001 \002(\0162\013.StatusCode\022\020\n" +
-      "\010ledgerId\030\002 \002(\003\022\017\n\007entryId\030\003 \002(\003\022\014\n\004body" +
-      "\030\004 \001(\014\"M\n\013AddResponse\022\033\n\006status\030\001 \002(\0162\013.",
-      "StatusCode\022\020\n\010ledgerId\030\002 \002(\003\022\017\n\007entryId\030" +
-      "\003 \002(\003*F\n\017ProtocolVersion\022\017\n\013VERSION_ONE\020" +
-      "\001\022\017\n\013VERSION_TWO\020\002\022\021\n\rVERSION_THREE\020\003*\206\001" +
-      "\n\nStatusCode\022\007\n\003EOK\020\000\022\016\n\tENOLEDGER\020\222\003\022\r\n" +
-      "\010ENOENTRY\020\223\003\022\014\n\007EBADREQ\020\224\003\022\010\n\003EIO\020\365\003\022\010\n\003" +
-      "EUA\020\366\003\022\020\n\013EBADVERSION\020\367\003\022\014\n\007EFENCED\020\370\003\022\016" +
-      "\n\tEREADONLY\020\371\003*Y\n\rOperationType\022\016\n\nREAD_" +
-      "ENTRY\020\001\022\r\n\tADD_ENTRY\020\002\022\024\n\020RANGE_READ_ENT" +
-      "RY\020\003\022\023\n\017RANGE_ADD_ENTRY\020\004B\037\n\033org.apache." +
-      "bookkeeper.protoH\001"
+      "ationType\022\r\n\005txnId\030\003 \002(\004\"\221\001\n\007Request\022\037\n\006" +
+      "header\030\001 \002(\0132\017.BKPacketHeader\022!\n\013readReq" +
+      "uest\030d \001(\0132\014.ReadRequest\022\037\n\naddRequest\030e" +
+      " \001(\0132\013.AddRequest\022!\n\013authRequest\030f \001(\0132\014" +
+      ".AuthMessage\"~\n\013ReadRequest\022\037\n\004flag\030d \001(" +
+      "\0162\021.ReadRequest.Flag\022\020\n\010ledgerId\030\001 \002(\003\022\017" +
+      "\n\007entryId\030\002 \002(\003\022\021\n\tmasterKey\030\003 \001(\014\"\030\n\004Fl",
+      "ag\022\020\n\014FENCE_LEDGER\020\001\"\212\001\n\nAddRequest\022\036\n\004f" +
+      "lag\030d \001(\0162\020.AddRequest.Flag\022\020\n\010ledgerId\030" +
+      "\001 \002(\003\022\017\n\007entryId\030\002 \002(\003\022\021\n\tmasterKey\030\003 \002(" +
+      "\014\022\014\n\004body\030\004 \002(\014\"\030\n\004Flag\022\020\n\014RECOVERY_ADD\020" +
+      "\001\"\264\001\n\010Response\022\037\n\006header\030\001 \002(\0132\017.BKPacke" +
+      "tHeader\022\033\n\006status\030\002 \002(\0162\013.StatusCode\022#\n\014" +
+      "readResponse\030d \001(\0132\r.ReadResponse\022!\n\013add" +
+      "Response\030e \001(\0132\014.AddResponse\022\"\n\014authResp" +
+      "onse\030f \001(\0132\014.AuthMessage\"\\\n\014ReadResponse" +
+      "\022\033\n\006status\030\001 \002(\0162\013.StatusCode\022\020\n\010ledgerI",
+      "d\030\002 \002(\003\022\017\n\007entryId\030\003 \002(\003\022\014\n\004body\030\004 \001(\014\"M" +
+      "\n\013AddResponse\022\033\n\006status\030\001 \002(\0162\013.StatusCo" +
+      "de\022\020\n\010ledgerId\030\002 \002(\003\022\017\n\007entryId\030\003 \002(\003\"0\n" +
+      "\013AuthMessage\022\026\n\016authPluginName\030\001 \002(\t*\t\010\350" +
+      "\007\020\200\200\200\200\002*F\n\017ProtocolVersion\022\017\n\013VERSION_ON" +
+      "E\020\001\022\017\n\013VERSION_TWO\020\002\022\021\n\rVERSION_THREE\020\003*" +
+      "\206\001\n\nStatusCode\022\007\n\003EOK\020\000\022\016\n\tENOLEDGER\020\222\003\022" +
+      "\r\n\010ENOENTRY\020\223\003\022\014\n\007EBADREQ\020\224\003\022\010\n\003EIO\020\365\003\022\010" +
+      "\n\003EUA\020\366\003\022\020\n\013EBADVERSION\020\367\003\022\014\n\007EFENCED\020\370\003" +
+      "\022\016\n\tEREADONLY\020\371\003*c\n\rOperationType\022\016\n\nREA",
+      "D_ENTRY\020\001\022\r\n\tADD_ENTRY\020\002\022\024\n\020RANGE_READ_E" +
+      "NTRY\020\003\022\023\n\017RANGE_ADD_ENTRY\020\004\022\010\n\004AUTH\020\005B\037\n" +
+      "\033org.apache.bookkeeper.protoH\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -4718,7 +5437,7 @@ public final class BookkeeperProtocol {
           internal_static_Request_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_Request_descriptor,
-              new java.lang.String[] { "Header", "ReadRequest", "AddRequest", },
+              new java.lang.String[] { "Header", "ReadRequest", "AddRequest", "AuthRequest", },
               org.apache.bookkeeper.proto.BookkeeperProtocol.Request.class,
               org.apache.bookkeeper.proto.BookkeeperProtocol.Request.Builder.class);
           internal_static_ReadRequest_descriptor =
@@ -4742,7 +5461,7 @@ public final class BookkeeperProtocol {
           internal_static_Response_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_Response_descriptor,
-              new java.lang.String[] { "Header", "Status", "ReadResponse", "AddResponse", },
+              new java.lang.String[] { "Header", "Status", "ReadResponse", "AddResponse", "AuthResponse", },
               org.apache.bookkeeper.proto.BookkeeperProtocol.Response.class,
               org.apache.bookkeeper.proto.BookkeeperProtocol.Response.Builder.class);
           internal_static_ReadResponse_descriptor =
@@ -4761,6 +5480,14 @@ public final class BookkeeperProtocol {
               new java.lang.String[] { "Status", "LedgerId", "EntryId", },
               org.apache.bookkeeper.proto.BookkeeperProtocol.AddResponse.class,
               org.apache.bookkeeper.proto.BookkeeperProtocol.AddResponse.Builder.class);
+          internal_static_AuthMessage_descriptor =
+            getDescriptor().getMessageTypes().get(7);
+          internal_static_AuthMessage_fieldAccessorTable = new
+            com.google.protobuf.GeneratedMessage.FieldAccessorTable(
+              internal_static_AuthMessage_descriptor,
+              new java.lang.String[] { "AuthPluginName", },
+              org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.class,
+              org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.Builder.class);
           return null;
         }
       };

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 2bd4e9b..0f9feea 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import org.apache.bookkeeper.auth.ClientAuthProvider;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeperClientStats;
 import org.apache.bookkeeper.conf.ClientConfiguration;
@@ -79,6 +80,7 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Sets;
 import com.google.protobuf.ByteString;
+import com.google.protobuf.ExtensionRegistry;
 
 /**
  * This class manages all details of connection to a particular bookie. It also
@@ -134,15 +136,27 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
     private final ClientConfiguration conf;
 
     private final PerChannelBookieClientPool pcbcPool;
+    private final ClientAuthProvider.Factory authProviderFactory;
+    private final ExtensionRegistry extRegistry;
 
     public PerChannelBookieClient(OrderedSafeExecutor executor, ClientSocketChannelFactory channelFactory,
                                   BookieSocketAddress addr) {
-        this(new ClientConfiguration(), executor, channelFactory, addr, null, NullStatsLogger.INSTANCE, null);
+        this(new ClientConfiguration(), executor, channelFactory, addr, null, NullStatsLogger.INSTANCE, null, null, null);
+    }
+
+    public PerChannelBookieClient(OrderedSafeExecutor executor, ClientSocketChannelFactory channelFactory,
+                                  BookieSocketAddress addr,
+                                  ClientAuthProvider.Factory authProviderFactory,
+                                  ExtensionRegistry extRegistry) {
+        this(new ClientConfiguration(), executor, channelFactory, addr, null, NullStatsLogger.INSTANCE,
+                authProviderFactory, extRegistry, null);
     }
 
     public PerChannelBookieClient(ClientConfiguration conf, OrderedSafeExecutor executor,
                                   ClientSocketChannelFactory channelFactory, BookieSocketAddress addr,
                                   HashedWheelTimer requestTimer, StatsLogger parentStatsLogger,
+                                  ClientAuthProvider.Factory authProviderFactory,
+                                  ExtensionRegistry extRegistry,
                                   PerChannelBookieClientPool pcbcPool) {
         this.conf = conf;
         this.addr = addr;
@@ -153,6 +167,9 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
         this.addEntryTimeout = conf.getAddEntryTimeout();
         this.readEntryTimeout = conf.getReadEntryTimeout();
 
+        this.authProviderFactory = authProviderFactory;
+        this.extRegistry = extRegistry;
+
         StringBuilder nameBuilder = new StringBuilder();
         nameBuilder.append(addr.getHostname().replace('.', '_').replace('-', '_'))
             .append("_").append(addr.getPort());
@@ -563,8 +580,8 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
                     bAddress = c.getRemoteAddress().toString();
                 }
 
-                LOG.debug("Could not write request for reading entry: {} ledger-id: {} bookie: {}",
-                          new Object[]{ readCompletion.entryId, readCompletion.ledgerId, bAddress });
+                LOG.debug("Could not write request for reading entry: {} ledger-id: {} bookie: {} rc: {}",
+                        new Object[]{ readCompletion.entryId, readCompletion.ledgerId, bAddress, rc });
 
                 readCompletion.cb.readEntryComplete(rc, readCompletion.ledgerId, readCompletion.entryId,
                                                     null, readCompletion.ctx);
@@ -594,8 +611,8 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
                 if(c != null) {
                     bAddress = c.getRemoteAddress().toString();
                 }
-                LOG.debug("Could not write request for adding entry: {} ledger-id: {} bookie: {}",
-                          new Object[] { addCompletion.entryId, addCompletion.ledgerId, bAddress });
+                LOG.debug("Could not write request for adding entry: {} ledger-id: {} bookie: {} rc: {}",
+                          new Object[] { addCompletion.entryId, addCompletion.ledgerId, bAddress, rc });
 
                 addCompletion.cb.writeComplete(rc, addCompletion.ledgerId, addCompletion.entryId,
                                                addr, addCompletion.ctx);
@@ -656,8 +673,9 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
 
         pipeline.addLast("lengthbasedframedecoder", new LengthFieldBasedFrameDecoder(MAX_FRAME_LENGTH, 0, 4, 0, 4));
         pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
-        pipeline.addLast("bookieProtoEncoder", new BookieProtoEncoding.RequestEncoder());
-        pipeline.addLast("bookieProtoDecoder", new BookieProtoEncoding.ResponseDecoder());
+        pipeline.addLast("bookieProtoEncoder", new BookieProtoEncoding.RequestEncoder(extRegistry));
+        pipeline.addLast("bookieProtoDecoder", new BookieProtoEncoding.ResponseDecoder(extRegistry));
+        pipeline.addLast("authHandler", new AuthHandler.ClientSideHandler(authProviderFactory, txnIdGenerator));
         pipeline.addLast("mainhandler", this);
         return pipeline;
     }
@@ -699,6 +717,16 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
             return;
         }
 
+        if (t instanceof AuthHandler.AuthenticationException) {
+            LOG.error("Error authenticating connection", t);
+            errorOutOutstandingEntries(BKException.Code.UnauthorizedAccessException);
+            Channel c = ctx.getChannel();
+            if (c != null) {
+                closeChannel(c);
+            }
+            return;
+        }
+
         if (t instanceof IOException) {
             // these are thrown when a bookie fails, logging them just pollutes
             // the logs (the failure is logged from the listeners on the write
@@ -739,7 +767,6 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
                 LOG.debug("Unexpected response received from bookie : " + addr + " for type : " + header.             getOperation() +
                         " and txnId : " + header.getTxnId());
             }
-
         } else {
             long orderingKey = completionValue.ledgerId;
             executor.submitOrdered(orderingKey, new SafeRunnable() {
@@ -748,10 +775,10 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
                     OperationType type = header.getOperation();
                     switch (type) {
                         case ADD_ENTRY:
-                            handleAddResponse(response.getAddResponse(), completionValue);
+                            handleAddResponse(response, completionValue);
                             break;
                         case READ_ENTRY:
-                            handleReadResponse(response.getReadResponse(), completionValue);
+                            handleReadResponse(response, completionValue);
                             break;
                         default:
                             LOG.error("Unexpected response, type:{} received from bookie:{}, ignoring",
@@ -770,13 +797,14 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
         }
     }
 
-    void handleAddResponse(AddResponse response, CompletionValue completionValue) {
+    void handleAddResponse(Response response, CompletionValue completionValue) {
         // The completion value should always be an instance of an AddCompletion object when we reach here.
         AddCompletion ac = (AddCompletion)completionValue;
+        AddResponse addResponse = response.getAddResponse();
 
-        long ledgerId = response.getLedgerId();
-        long entryId = response.getEntryId();
-        StatusCode status = response.getStatus();
+        long ledgerId = addResponse.getLedgerId();
+        long entryId = addResponse.getEntryId();
+        StatusCode status = response.getStatus() == StatusCode.EOK ? addResponse.getStatus() : response.getStatus();
 
         if (LOG.isDebugEnabled()) {
             LOG.debug("Got response for add request from bookie: " + addr + " for ledger: " + ledgerId + " entry: "
@@ -796,17 +824,19 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
         ac.cb.writeComplete(rcToRet, ledgerId, entryId, addr, ac.ctx);
     }
 
-    void handleReadResponse(ReadResponse response, CompletionValue completionValue) {
+    void handleReadResponse(Response response, CompletionValue completionValue) {
         // The completion value should always be an instance of a ReadCompletion object when we reach here.
         ReadCompletion rc = (ReadCompletion)completionValue;
+        ReadResponse readResponse = response.getReadResponse();
+
+        long ledgerId = readResponse.getLedgerId();
+        long entryId = readResponse.getEntryId();
+        StatusCode status = response.getStatus() == StatusCode.EOK ? readResponse.getStatus() : response.getStatus();
 
-        long ledgerId = response.getLedgerId();
-        long entryId = response.getEntryId();
-        StatusCode status = response.getStatus();
         ChannelBuffer buffer = ChannelBuffers.buffer(0);
 
-        if (response.hasBody()) {
-            buffer = ChannelBuffers.copiedBuffer(response.getBody().asReadOnlyByteBuffer());
+        if (readResponse.hasBody()) {
+            buffer = ChannelBuffers.copiedBuffer(readResponse.getBody().asReadOnlyByteBuffer());
         }
 
         if (LOG.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/main/proto/BookkeeperProtocol.proto
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/proto/BookkeeperProtocol.proto b/bookkeeper-server/src/main/proto/BookkeeperProtocol.proto
index 56ba581..7aeadfc 100644
--- a/bookkeeper-server/src/main/proto/BookkeeperProtocol.proto
+++ b/bookkeeper-server/src/main/proto/BookkeeperProtocol.proto
@@ -55,6 +55,8 @@ enum OperationType {
     // Not supported yet.
     RANGE_READ_ENTRY = 3;
     RANGE_ADD_ENTRY = 4;
+
+    AUTH = 5;
 }
 
 /**
@@ -71,6 +73,7 @@ message Request {
     // Requests
     optional ReadRequest readRequest = 100;
     optional AddRequest addRequest = 101;
+    optional AuthMessage authRequest = 102;
 }
 
 message ReadRequest {
@@ -105,7 +108,7 @@ message Response {
     // Response
     optional ReadResponse readResponse = 100;
     optional AddResponse addResponse = 101;
-
+    optional AuthMessage authResponse = 102;
 }
 
 message ReadResponse {
@@ -120,3 +123,12 @@ message AddResponse {
     required int64 ledgerId = 2;
     required int64 entryId = 3;
 }
+
+/**
+ * Extendible message which auth mechanisms
+ * can use to carry their payload.
+ */
+message AuthMessage {
+    required string authPluginName = 1;
+    extensions 1000 to max;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/test/java/org/apache/bookkeeper/auth/TestAuth.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/auth/TestAuth.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/auth/TestAuth.java
new file mode 100644
index 0000000..a57bfe9
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/auth/TestAuth.java
@@ -0,0 +1,654 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.auth;
+
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.Enumeration;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.proto.BookieServer;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage;
+import org.apache.bookkeeper.proto.TestDataFormats;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+import com.google.protobuf.ExtensionRegistry;
+
+public class TestAuth extends BookKeeperClusterTestCase {
+    static final Logger LOG = LoggerFactory.getLogger(TestAuth.class);
+    public static final String TEST_AUTH_PROVIDER_PLUGIN_NAME = "TestAuthProviderPlugin";
+    private static final byte[] PASSWD = "testPasswd".getBytes();
+    private static final byte[] ENTRY = "TestEntry".getBytes();
+
+    public TestAuth() {
+        super(0); // start them later when auth providers are configured
+    }
+
+    // we pass in ledgerId because the method may throw exceptions
+    private void connectAndWriteToBookie(ClientConfiguration conf, AtomicLong ledgerWritten)
+            throws Exception {
+        LOG.info("Connecting to bookie");
+        BookKeeper bkc = new BookKeeper(conf, zkc);
+        LedgerHandle l = bkc.createLedger(1, 1, DigestType.CRC32,
+                PASSWD);
+        ledgerWritten.set(l.getId());
+        l.addEntry(ENTRY);
+        l.close();
+        bkc.close();
+    }
+
+    /**
+     * check if the entry exists. Restart the bookie to allow
+     * access
+     */
+    private int entryCount(long ledgerId, ServerConfiguration bookieConf,
+                           ClientConfiguration clientConf) throws Exception {
+        LOG.info("Counting entries in {}", ledgerId);
+        for (ServerConfiguration conf : bsConfs) {
+            conf.setBookieAuthProviderFactoryClass(
+                    AlwaysSucceedBookieAuthProviderFactory.class.getName());
+        }
+        clientConf.setClientAuthProviderFactoryClass(
+                SendUntilCompleteClientAuthProviderFactory.class.getName());
+        
+        restartBookies();
+
+        BookKeeper bkc = new BookKeeper(clientConf, zkc);
+        LedgerHandle lh = bkc.openLedger(ledgerId, DigestType.CRC32,
+                                         PASSWD);
+        if (lh.getLastAddConfirmed() < 0) {
+            return 0;
+        }
+        Enumeration<LedgerEntry> e = lh.readEntries(0, lh.getLastAddConfirmed());
+        int count = 0;
+        while (e.hasMoreElements()) {
+            count++;
+            assertTrue("Should match what we wrote",
+                       Arrays.equals(e.nextElement().getEntry(), ENTRY));
+        }
+        return count;
+    }
+
+    /**
+     * Test an connection will authorize with a single message
+     * to the server and a single response.
+     */
+    @Test(timeout=30000)
+    public void testSingleMessageAuth() throws Exception {
+        ServerConfiguration bookieConf = newServerConfiguration();
+        bookieConf.setBookieAuthProviderFactoryClass(
+                AlwaysSucceedBookieAuthProviderFactory.class.getName());
+        
+        ClientConfiguration clientConf = newClientConfiguration();
+        clientConf.setClientAuthProviderFactoryClass(
+                SendUntilCompleteClientAuthProviderFactory.class.getName());
+        
+        startAndStoreBookie(bookieConf);
+
+        AtomicLong ledgerId = new AtomicLong(-1);
+        connectAndWriteToBookie(clientConf, ledgerId); // should succeed
+
+        assertFalse(ledgerId.get() == -1);
+        assertEquals("Should have entry", 1, entryCount(ledgerId.get(), bookieConf, clientConf));
+    }
+    
+    /**
+     * Test that when the bookie provider sends a failure message
+     * the client will not be able to write
+     */
+    @Test(timeout=30000)
+    public void testSingleMessageAuthFailure() throws Exception {
+        ServerConfiguration bookieConf = newServerConfiguration();
+        bookieConf.setBookieAuthProviderFactoryClass(
+                AlwaysFailBookieAuthProviderFactory.class.getName());
+        
+        ClientConfiguration clientConf = newClientConfiguration();
+        clientConf.setClientAuthProviderFactoryClass(
+                SendUntilCompleteClientAuthProviderFactory.class.getName());
+        
+        startAndStoreBookie(bookieConf);
+
+        AtomicLong ledgerId = new AtomicLong(-1);
+        try {
+            connectAndWriteToBookie(clientConf, ledgerId); // should fail
+            fail("Shouldn't get this far");
+        } catch (BKException.BKUnauthorizedAccessException bke) {
+            // client shouldnt be able to find enough bookies to
+            // write
+        }
+        assertFalse(ledgerId.get() == -1);
+        assertEquals("Shouldn't have entry", 0, entryCount(ledgerId.get(), bookieConf, clientConf));
+    }
+
+    /**
+     * Test that authentication works when the providers
+     * exchange multiple messages
+     */
+    @Test(timeout=30000)
+    public void testMultiMessageAuth() throws Exception {
+        ServerConfiguration bookieConf = newServerConfiguration();
+        bookieConf.setBookieAuthProviderFactoryClass(
+                SucceedAfter3BookieAuthProviderFactory.class.getName());
+        
+        ClientConfiguration clientConf = newClientConfiguration();
+        clientConf.setClientAuthProviderFactoryClass(
+                SendUntilCompleteClientAuthProviderFactory.class.getName());
+        
+        AtomicLong ledgerId = new AtomicLong(-1);
+        startAndStoreBookie(bookieConf);
+        connectAndWriteToBookie(clientConf, ledgerId); // should succeed
+
+        assertFalse(ledgerId.get() == -1);
+        assertEquals("Should have entry", 1, entryCount(ledgerId.get(), bookieConf, clientConf));
+    }
+    
+    /**
+     * Test that when the bookie provider sends a failure message
+     * the client will not be able to write
+     */
+    @Test(timeout=30000)
+    public void testMultiMessageAuthFailure() throws Exception {
+        ServerConfiguration bookieConf = newServerConfiguration();
+        bookieConf.setBookieAuthProviderFactoryClass(
+                FailAfter3BookieAuthProviderFactory.class.getName());
+        
+        ClientConfiguration clientConf = newClientConfiguration();
+        clientConf.setClientAuthProviderFactoryClass(
+                SendUntilCompleteClientAuthProviderFactory.class.getName());
+        
+        startAndStoreBookie(bookieConf);
+
+        AtomicLong ledgerId = new AtomicLong(-1);
+        try {
+            connectAndWriteToBookie(clientConf, ledgerId); // should fail
+            fail("Shouldn't get this far");
+        } catch (BKException.BKUnauthorizedAccessException bke) {
+            // bookie should have sent a negative response before
+            // breaking the conneciton
+        }
+        assertFalse(ledgerId.get() == -1);
+        assertEquals("Shouldn't have entry", 0, entryCount(ledgerId.get(), bookieConf, clientConf));
+    }
+
+    /**
+     * Test that when the bookie and the client have a different
+     * plugin configured, no messages will get through.
+     */
+    @Test(timeout=30000)
+    public void testDifferentPluginFailure() throws Exception {
+        ServerConfiguration bookieConf = newServerConfiguration();
+        bookieConf.setBookieAuthProviderFactoryClass(
+                DifferentPluginBookieAuthProviderFactory.class.getName());
+        
+        ClientConfiguration clientConf = newClientConfiguration();
+        clientConf.setClientAuthProviderFactoryClass(
+                SendUntilCompleteClientAuthProviderFactory.class.getName());
+        
+        startAndStoreBookie(bookieConf);
+        AtomicLong ledgerId = new AtomicLong(-1);
+        try {
+            connectAndWriteToBookie(clientConf, ledgerId); // should fail
+            fail("Shouldn't get this far");
+        } catch (BKException.BKUnauthorizedAccessException bke) {
+            // bookie should have sent a negative response before
+            // breaking the conneciton
+        }
+        assertFalse(ledgerId.get() == -1);
+        assertEquals("Shouldn't have entry", 0, entryCount(ledgerId.get(), bookieConf, clientConf));
+    }
+
+    /**
+     * Test that when the plugin class does exist, but
+     * doesn't implement the interface, we fail predictably
+     */
+    @Test(timeout=30000)
+    public void testExistantButNotValidPlugin() throws Exception {
+        ServerConfiguration bookieConf = newServerConfiguration();
+        bookieConf.setBookieAuthProviderFactoryClass(
+                "java.lang.String");
+
+        ClientConfiguration clientConf = newClientConfiguration();
+        clientConf.setClientAuthProviderFactoryClass(
+                "java.lang.String");
+        try {
+            startAndStoreBookie(bookieConf);
+            fail("Shouldn't get this far");
+        } catch (RuntimeException e) {
+            // received correct exception
+            assertTrue("Wrong exception thrown",
+                    e.getMessage().contains("not "
+                            + BookieAuthProvider.Factory.class.getName()));
+        }
+
+        try {
+            BookKeeper bkc = new BookKeeper(clientConf, zkc);
+            fail("Shouldn't get this far");
+        } catch (RuntimeException e) {
+            // received correct exception
+            assertTrue("Wrong exception thrown",
+                    e.getMessage().contains("not "
+                            + ClientAuthProvider.Factory.class.getName()));
+        }
+    }
+
+    /**
+     * Test that when the plugin class does not exist,
+     * the bookie will not start and the client will
+     * break.
+     */
+    @Test(timeout=30000)
+    public void testNonExistantPlugin() throws Exception {
+        ServerConfiguration bookieConf = newServerConfiguration();
+        bookieConf.setBookieAuthProviderFactoryClass(
+                "NonExistantClassNameForTestingAuthPlugins");
+        
+        ClientConfiguration clientConf = newClientConfiguration();
+        clientConf.setClientAuthProviderFactoryClass(
+                "NonExistantClassNameForTestingAuthPlugins");
+        try {
+            startAndStoreBookie(bookieConf);
+            fail("Shouldn't get this far");
+        } catch (RuntimeException e) {
+            // received correct exception
+            assertEquals("Wrong exception thrown",
+                    e.getCause().getClass(), ClassNotFoundException.class);
+        }
+
+        try {
+            BookKeeper bkc = new BookKeeper(clientConf, zkc);
+            fail("Shouldn't get this far");
+        } catch (RuntimeException e) {
+            // received correct exception
+            assertEquals("Wrong exception thrown",
+                    e.getCause().getClass(), ClassNotFoundException.class);
+        }
+    }
+
+    /**
+     * Test that when the plugin on the bookie crashes, the client doesn't
+     * hang also, but it cannot write in any case.
+     */
+    @Test(timeout=30000)
+    public void testCrashDuringAuth() throws Exception {
+        ServerConfiguration bookieConf = newServerConfiguration();
+        bookieConf.setBookieAuthProviderFactoryClass(
+                CrashAfter3BookieAuthProviderFactory.class.getName());
+        
+        ClientConfiguration clientConf = newClientConfiguration();
+        clientConf.setClientAuthProviderFactoryClass(
+                SendUntilCompleteClientAuthProviderFactory.class.getName());
+
+        startAndStoreBookie(bookieConf);
+
+        AtomicLong ledgerId = new AtomicLong(-1);
+        try {
+            connectAndWriteToBookie(clientConf, ledgerId);
+            fail("Shouldn't get this far");
+        } catch (BKException.BKNotEnoughBookiesException bke) {
+            // bookie won't respond, request will timeout, and then
+            // we wont be able to find a replacement
+        }
+        assertFalse(ledgerId.get() == -1);
+        assertEquals("Shouldn't have entry", 0, entryCount(ledgerId.get(), bookieConf, clientConf));
+    }
+
+    /**
+     * Test that when a bookie simply stops replying during auth, the client doesn't
+     * hang also, but it cannot write in any case.
+     */
+    @Test(timeout=30000)
+    public void testCrashType2DuringAuth() throws Exception {
+        ServerConfiguration bookieConf = newServerConfiguration();
+        bookieConf.setBookieAuthProviderFactoryClass(
+                CrashType2After3BookieAuthProviderFactory.class.getName());
+        
+        ClientConfiguration clientConf = newClientConfiguration();
+        clientConf.setClientAuthProviderFactoryClass(
+                SendUntilCompleteClientAuthProviderFactory.class.getName());
+        crashType2bookieInstance = startAndStoreBookie(bookieConf);
+
+        AtomicLong ledgerId = new AtomicLong(-1);
+        try {
+            connectAndWriteToBookie(clientConf, ledgerId);
+            fail("Shouldn't get this far");
+        } catch (BKException.BKNotEnoughBookiesException bke) {
+            // bookie won't respond, request will timeout, and then
+            // we wont be able to find a replacement
+        }
+        assertFalse(ledgerId.get() == -1);
+        assertEquals("Shouldn't have entry", 0, entryCount(ledgerId.get(), bookieConf, clientConf));
+    }
+
+    BookieServer startAndStoreBookie(ServerConfiguration conf) throws Exception {
+        bsConfs.add(conf);
+        BookieServer s = startBookie(conf);
+        bs.add(s);
+        return s;
+    }
+
+    public static class AlwaysSucceedBookieAuthProviderFactory
+        implements BookieAuthProvider.Factory {
+        @Override
+        public String getPluginName() {
+            return TEST_AUTH_PROVIDER_PLUGIN_NAME;
+        }
+
+        @Override
+        public void init(ServerConfiguration conf, ExtensionRegistry registry) {
+            TestDataFormats.registerAllExtensions(registry);
+        }
+
+        @Override
+        public BookieAuthProvider newProvider(InetSocketAddress addr,
+                                              final GenericCallback<Void> completeCb) {
+            return new BookieAuthProvider() {
+                public void process(AuthMessage m, GenericCallback<AuthMessage> cb) {
+
+                    AuthMessage.Builder builder
+                        = AuthMessage.newBuilder()
+                        .setAuthPluginName(getPluginName());
+                    builder.setExtension(TestDataFormats.messageType, 
+                            TestDataFormats.AuthMessageType.SUCCESS_RESPONSE);
+
+                    cb.operationComplete(BKException.Code.OK, builder.build());
+                    completeCb.operationComplete(BKException.Code.OK, null);
+                }
+            };
+        }
+    }
+
+    public static class AlwaysFailBookieAuthProviderFactory
+        implements BookieAuthProvider.Factory {
+        @Override
+        public String getPluginName() {
+            return TEST_AUTH_PROVIDER_PLUGIN_NAME;
+        }
+
+        @Override
+        public void init(ServerConfiguration conf, ExtensionRegistry registry) {
+            TestDataFormats.registerAllExtensions(registry);
+        }
+
+        @Override
+        public BookieAuthProvider newProvider(InetSocketAddress addr,
+                                              final GenericCallback<Void> completeCb) {
+            return new BookieAuthProvider() {
+                public void process(AuthMessage m, GenericCallback<AuthMessage> cb) {
+                    AuthMessage.Builder builder
+                        = AuthMessage.newBuilder()
+                        .setAuthPluginName(getPluginName());
+                    builder.setExtension(TestDataFormats.messageType, 
+                            TestDataFormats.AuthMessageType.FAILURE_RESPONSE);
+
+                    cb.operationComplete(BKException.Code.OK, builder.build());
+                    completeCb.operationComplete(
+                            BKException.Code.UnauthorizedAccessException, null);
+                }
+            };
+        }
+    }
+
+    private static class SendUntilCompleteClientAuthProviderFactory
+        implements ClientAuthProvider.Factory {
+        
+        @Override
+        public String getPluginName() {
+            return TEST_AUTH_PROVIDER_PLUGIN_NAME;
+        }
+
+        @Override
+        public void init(ClientConfiguration conf, ExtensionRegistry registry) {
+            TestDataFormats.registerAllExtensions(registry);
+        }
+
+        @Override
+        public ClientAuthProvider newProvider(InetSocketAddress addr,
+                final GenericCallback<Void> completeCb) {
+            AuthMessage.Builder builder
+                = AuthMessage.newBuilder()
+                .setAuthPluginName(getPluginName());
+            builder.setExtension(TestDataFormats.messageType, 
+                                 TestDataFormats.AuthMessageType.PAYLOAD_MESSAGE);
+            final AuthMessage message = builder.build();
+
+            return new ClientAuthProvider() {
+                public void init(GenericCallback<AuthMessage> cb) {
+                    cb.operationComplete(BKException.Code.OK, message);
+                }
+
+                public void process(AuthMessage m, GenericCallback<AuthMessage> cb) {
+                    if (m.hasExtension(TestDataFormats.messageType)) {
+                        TestDataFormats.AuthMessageType type
+                            = m.getExtension(TestDataFormats.messageType);
+                        if (type == TestDataFormats.AuthMessageType.SUCCESS_RESPONSE) {
+                            completeCb.operationComplete(BKException.Code.OK, null);
+                        } else if (type == TestDataFormats.AuthMessageType.FAILURE_RESPONSE) {
+                            completeCb.operationComplete(BKException.Code.UnauthorizedAccessException, null);
+                        } else {
+                            cb.operationComplete(BKException.Code.OK, message);
+                        }
+                    } else {
+                        completeCb.operationComplete(BKException.Code.UnauthorizedAccessException, null);
+                    }
+                }
+            };
+        }
+    }
+
+    public static class SucceedAfter3BookieAuthProviderFactory
+        implements BookieAuthProvider.Factory {
+        AtomicInteger numMessages = new AtomicInteger(0);
+
+        @Override
+        public String getPluginName() {
+            return TEST_AUTH_PROVIDER_PLUGIN_NAME;
+        }
+
+        @Override
+        public void init(ServerConfiguration conf, ExtensionRegistry registry) {
+            TestDataFormats.registerAllExtensions(registry);
+        }
+
+        @Override
+        public BookieAuthProvider newProvider(InetSocketAddress addr,
+                                              final GenericCallback<Void> completeCb) {
+            return new BookieAuthProvider() {
+                public void process(AuthMessage m, GenericCallback<AuthMessage> cb) {
+                    AuthMessage.Builder builder
+                        = AuthMessage.newBuilder()
+                        .setAuthPluginName(getPluginName());
+                    if (numMessages.incrementAndGet() == 3) {
+                        builder.setExtension(TestDataFormats.messageType, 
+                                TestDataFormats.AuthMessageType.SUCCESS_RESPONSE);
+
+                        cb.operationComplete(BKException.Code.OK, builder.build());
+                        completeCb.operationComplete(BKException.Code.OK, null);
+                    } else {
+                        builder.setExtension(TestDataFormats.messageType, 
+                                TestDataFormats.AuthMessageType.PAYLOAD_MESSAGE);
+
+                        cb.operationComplete(BKException.Code.OK, builder.build());
+                    }
+                }
+            };
+        }
+    }
+
+    public static class FailAfter3BookieAuthProviderFactory
+        implements BookieAuthProvider.Factory {
+        AtomicInteger numMessages = new AtomicInteger(0);
+
+        @Override
+        public String getPluginName() {
+            return TEST_AUTH_PROVIDER_PLUGIN_NAME;
+        }
+
+        @Override
+        public void init(ServerConfiguration conf, ExtensionRegistry registry) {
+            TestDataFormats.registerAllExtensions(registry);
+        }
+
+        @Override
+        public BookieAuthProvider newProvider(InetSocketAddress addr,
+                                              final GenericCallback<Void> completeCb) {
+            return new BookieAuthProvider() {
+                public void process(AuthMessage m, GenericCallback<AuthMessage> cb) {
+                    AuthMessage.Builder builder
+                        = AuthMessage.newBuilder()
+                        .setAuthPluginName(getPluginName());
+                    if (numMessages.incrementAndGet() == 3) {
+                        builder.setExtension(TestDataFormats.messageType, 
+                                TestDataFormats.AuthMessageType.FAILURE_RESPONSE);
+
+                        cb.operationComplete(BKException.Code.OK, builder.build());
+                        completeCb.operationComplete(BKException.Code.UnauthorizedAccessException,
+                                                     null);
+                    } else {
+                        builder.setExtension(TestDataFormats.messageType, 
+                                TestDataFormats.AuthMessageType.PAYLOAD_MESSAGE);
+
+                        cb.operationComplete(BKException.Code.OK, builder.build());
+                    }
+                }
+            };
+        }
+    }
+
+    public static class CrashAfter3BookieAuthProviderFactory
+        implements BookieAuthProvider.Factory {
+        AtomicInteger numMessages = new AtomicInteger(0);
+
+        @Override
+        public String getPluginName() {
+            return TEST_AUTH_PROVIDER_PLUGIN_NAME;
+        }
+
+        @Override
+        public void init(ServerConfiguration conf, ExtensionRegistry registry) {
+            TestDataFormats.registerAllExtensions(registry);
+        }
+
+        @Override
+        public BookieAuthProvider newProvider(InetSocketAddress addr,
+                                              final GenericCallback<Void> completeCb) {
+            return new BookieAuthProvider() {
+                public void process(AuthMessage m, GenericCallback<AuthMessage> cb) {
+                    AuthMessage.Builder builder
+                        = AuthMessage.newBuilder()
+                        .setAuthPluginName(getPluginName());
+                    if (numMessages.incrementAndGet() == 3) {
+                        throw new RuntimeException("Do bad things to the bookie");
+                    } else {
+                        builder.setExtension(TestDataFormats.messageType, 
+                                TestDataFormats.AuthMessageType.PAYLOAD_MESSAGE);
+
+                        cb.operationComplete(BKException.Code.OK, builder.build());
+                    }
+                }
+            };
+        }
+    }
+
+    private static BookieServer crashType2bookieInstance = null;
+    public static class CrashType2After3BookieAuthProviderFactory
+        implements BookieAuthProvider.Factory {
+        AtomicInteger numMessages = new AtomicInteger(0);
+
+        @Override
+        public String getPluginName() {
+            return TEST_AUTH_PROVIDER_PLUGIN_NAME;
+        }
+
+        @Override
+        public void init(ServerConfiguration conf, ExtensionRegistry registry) {
+            TestDataFormats.registerAllExtensions(registry);
+        }
+
+        @Override
+        public BookieAuthProvider newProvider(InetSocketAddress addr,
+                                              final GenericCallback<Void> completeCb) {
+            return new BookieAuthProvider() {
+                public void process(AuthMessage m, GenericCallback<AuthMessage> cb) {
+                    AuthMessage.Builder builder
+                        = AuthMessage.newBuilder()
+                        .setAuthPluginName(getPluginName());
+                    if (numMessages.incrementAndGet() != 3) {
+                        builder.setExtension(TestDataFormats.messageType,
+                                TestDataFormats.AuthMessageType.PAYLOAD_MESSAGE);
+                        cb.operationComplete(BKException.Code.OK, builder.build());
+                        return;
+                    }
+
+                    crashType2bookieInstance.suspendProcessing();
+                }
+            };
+        }
+    }
+
+    public static class DifferentPluginBookieAuthProviderFactory
+        implements BookieAuthProvider.Factory {
+        @Override
+        public String getPluginName() {
+            return "DifferentAuthProviderPlugin";
+        }
+
+        @Override
+        public void init(ServerConfiguration conf, ExtensionRegistry registry) {
+            TestDataFormats.registerAllExtensions(registry);
+        }
+
+        @Override
+        public BookieAuthProvider newProvider(InetSocketAddress addr,
+                                              final GenericCallback<Void> completeCb) {
+            return new BookieAuthProvider() {
+                public void process(AuthMessage m, GenericCallback<AuthMessage> cb) {
+
+                    AuthMessage.Builder builder
+                        = AuthMessage.newBuilder()
+                        .setAuthPluginName(getPluginName());
+                    builder.setExtension(TestDataFormats.messageType, 
+                            TestDataFormats.AuthMessageType.FAILURE_RESPONSE);
+
+                    cb.operationComplete(BKException.Code.OK, builder.build());
+                    completeCb.operationComplete(BKException.Code.OK, null);
+                }
+            };
+        }
+    }
+
+}