You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2016/04/05 08:46:01 UTC
[1/3] bookkeeper git commit: BOOKKEEPER-901: Authentication framework
Repository: bookkeeper
Updated Branches:
refs/heads/master e32c38890 -> b1c12c0f4
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java
new file mode 100644
index 0000000..fe87ac9
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java
@@ -0,0 +1,239 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.proto;
+
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.proto.BookieProtocol;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.protobuf.ExtensionRegistry;
+
+import org.apache.bookkeeper.auth.ClientAuthProvider;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.PerChannelBookieClient;
+import org.apache.bookkeeper.proto.BookieServer;
+import org.apache.bookkeeper.util.OrderedSafeExecutor;
+import org.apache.bookkeeper.auth.TestAuth;
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+import org.apache.bookkeeper.auth.AuthProviderFactoryFactory;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+
+import org.apache.bookkeeper.proto.BookieProtocol.*;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage;
+
+import java.util.concurrent.ArrayBlockingQueue;
+
+import static org.junit.Assert.*;
+
+public class TestBackwardCompatCMS42 extends BookKeeperClusterTestCase {
+ static final Logger LOG = LoggerFactory.getLogger(TestBackwardCompatCMS42.class);
+
+ ExtensionRegistry extRegistry = ExtensionRegistry.newInstance();
+ ClientAuthProvider.Factory authProvider;
+ ClientSocketChannelFactory channelFactory
+ = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
+ Executors.newCachedThreadPool());
+ OrderedSafeExecutor executor = OrderedSafeExecutor.newBuilder().numThreads(1).name("TestBackwardCompatClient")
+ .build();
+
+ public TestBackwardCompatCMS42() throws Exception {
+ super(0);
+
+ TestDataFormats.registerAllExtensions(extRegistry);
+ authProvider = AuthProviderFactoryFactory.newClientAuthProviderFactory(
+ new ClientConfiguration(), extRegistry);
+ }
+
+ @Test(timeout=60000)
+ public void testAuthSingleMessage() throws Exception {
+ ServerConfiguration bookieConf = newServerConfiguration();
+ bookieConf.setBookieAuthProviderFactoryClass(
+ TestAuth.AlwaysSucceedBookieAuthProviderFactory.class.getName());
+ BookieServer bookie1 = startAndStoreBookie(bookieConf);
+
+ AuthMessage.Builder builder = AuthMessage.newBuilder()
+ .setAuthPluginName(TestAuth.TEST_AUTH_PROVIDER_PLUGIN_NAME);
+ builder.setExtension(TestDataFormats.messageType,
+ TestDataFormats.AuthMessageType.PAYLOAD_MESSAGE);
+ final AuthMessage authMessage = builder.build();
+
+ CompatClient42 client = newCompatClient(bookie1.getLocalAddress());
+
+ Request request = new AuthRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, authMessage);
+ client.sendRequest(request);
+
+ Response response = client.takeResponse();
+ assertTrue("Should be auth response", response instanceof AuthResponse);
+ assertEquals("Should have succeeded", response.getErrorCode(), BookieProtocol.EOK);
+ }
+
+ @Test(timeout=60000)
+ public void testAuthMultiMessage() throws Exception {
+ ServerConfiguration bookieConf = newServerConfiguration();
+ bookieConf.setBookieAuthProviderFactoryClass(
+ TestAuth.SucceedAfter3BookieAuthProviderFactory.class.getName());
+ BookieServer bookie1 = startAndStoreBookie(bookieConf);
+
+ AuthMessage.Builder builder = AuthMessage.newBuilder()
+ .setAuthPluginName(TestAuth.TEST_AUTH_PROVIDER_PLUGIN_NAME);
+ builder.setExtension(TestDataFormats.messageType,
+ TestDataFormats.AuthMessageType.PAYLOAD_MESSAGE);
+ final AuthMessage authMessage = builder.build();
+ CompatClient42 client = newCompatClient(bookie1.getLocalAddress());
+
+ Request request = new AuthRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, authMessage);
+ for (int i = 0; i < 3 ; i++) {
+ client.sendRequest(request);
+ Response response = client.takeResponse();
+ assertTrue("Should be auth response", response instanceof AuthResponse);
+ AuthResponse authResponse = (AuthResponse)response;
+ assertEquals("Should have succeeded",
+ response.getErrorCode(), BookieProtocol.EOK);
+ TestDataFormats.AuthMessageType type = authResponse.getAuthMessage()
+ .getExtension(TestDataFormats.messageType);
+ if (i == 2) {
+ assertEquals("Should succeed after 3",
+ type, TestDataFormats.AuthMessageType.SUCCESS_RESPONSE);
+ } else {
+ assertEquals("Should be payload", type,
+ TestDataFormats.AuthMessageType.PAYLOAD_MESSAGE);
+ }
+ }
+ }
+
+ @Test(timeout=60000)
+ public void testAuthFail() throws Exception {
+ ServerConfiguration bookieConf = newServerConfiguration();
+ bookieConf.setBookieAuthProviderFactoryClass(
+ TestAuth.FailAfter3BookieAuthProviderFactory.class.getName());
+ BookieServer bookie1 = startAndStoreBookie(bookieConf);
+
+ AuthMessage.Builder builder = AuthMessage.newBuilder()
+ .setAuthPluginName(TestAuth.TEST_AUTH_PROVIDER_PLUGIN_NAME);
+ builder.setExtension(TestDataFormats.messageType,
+ TestDataFormats.AuthMessageType.PAYLOAD_MESSAGE);
+ final AuthMessage authMessage = builder.build();
+ CompatClient42 client = newCompatClient(bookie1.getLocalAddress());
+
+ Request request = new AuthRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, authMessage);
+ for (int i = 0; i < 3 ; i++) {
+ client.sendRequest(request);
+ Response response = client.takeResponse();
+ assertTrue("Should be auth response", response instanceof AuthResponse);
+ AuthResponse authResponse = (AuthResponse)response;
+ assertEquals("Should have succeeded",
+ response.getErrorCode(), BookieProtocol.EOK);
+ TestDataFormats.AuthMessageType type = authResponse.getAuthMessage()
+ .getExtension(TestDataFormats.messageType);
+ if (i == 2) {
+ assertEquals("Should fail after 3",
+ type, TestDataFormats.AuthMessageType.FAILURE_RESPONSE);
+ } else {
+ assertEquals("Should be payload", type,
+ TestDataFormats.AuthMessageType.PAYLOAD_MESSAGE);
+ }
+
+ }
+
+ client.sendRequest(new ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION,
+ 1L, 1L, (short)0));
+ Response response = client.takeResponse();
+ assertEquals("Should have failed",
+ response.getErrorCode(), BookieProtocol.EUA);
+ }
+
+ // copy from TestAuth
+ BookieServer startAndStoreBookie(ServerConfiguration conf) throws Exception {
+ bsConfs.add(conf);
+ BookieServer s = startBookie(conf);
+ bs.add(s);
+ return s;
+ }
+
+ CompatClient42 newCompatClient(BookieSocketAddress addr) throws Exception {
+ return new CompatClient42(executor, channelFactory, addr, authProvider, extRegistry);
+ }
+
+ // extending PerChannelBookieClient to get the pipeline factory
+ class CompatClient42 extends PerChannelBookieClient {
+ final ArrayBlockingQueue<Response> responses = new ArrayBlockingQueue<Response>(10);
+ final Channel channel;
+ final CountDownLatch connected = new CountDownLatch(1);
+
+ CompatClient42(OrderedSafeExecutor executor, ClientSocketChannelFactory channelFactory,
+ BookieSocketAddress addr,
+ ClientAuthProvider.Factory authProviderFactory,
+ ExtensionRegistry extRegistry) throws Exception {
+ super(executor, channelFactory, addr, authProviderFactory, extRegistry);
+
+ ClientBootstrap bootstrap = new ClientBootstrap(channelFactory);
+ bootstrap.setPipelineFactory(this);
+ bootstrap.setOption("tcpNoDelay", false);
+ bootstrap.setOption("keepAlive", true);
+ ChannelFuture f = bootstrap.connect(addr.getSocketAddress()).await();
+ channel = f.getChannel();
+ }
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
+ if (!(e.getMessage() instanceof Response)) {
+ LOG.error("Unknown message {}, passing upstream", e.getMessage());
+ ctx.sendUpstream(e);
+ return;
+ }
+ responses.add((Response)e.getMessage());
+ }
+
+ @Override
+ public void channelConnected(ChannelHandlerContext ctx,
+ ChannelStateEvent e)
+ throws Exception {
+ connected.countDown();
+ }
+
+ Response takeResponse() throws Exception {
+ return responses.take();
+ }
+
+ Response pollResponse() throws Exception {
+ return responses.poll();
+ }
+
+ void sendRequest(Request request) throws Exception {
+ connected.await();
+ channel.write(request);
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestDataFormats.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestDataFormats.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestDataFormats.java
new file mode 100644
index 0000000..c3f675f
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestDataFormats.java
@@ -0,0 +1,126 @@
+// Generated by the protocol buffer compiler. DO NOT EDIT!
+// source: src/test/proto/TestDataFormats.proto
+
+package org.apache.bookkeeper.proto;
+
+public final class TestDataFormats {
+ private TestDataFormats() {}
+ public static void registerAllExtensions(
+ com.google.protobuf.ExtensionRegistry registry) {
+ registry.add(org.apache.bookkeeper.proto.TestDataFormats.messageType);
+ }
+ public enum AuthMessageType
+ implements com.google.protobuf.ProtocolMessageEnum {
+ SUCCESS_RESPONSE(0, 1),
+ FAILURE_RESPONSE(1, 2),
+ PAYLOAD_MESSAGE(2, 3),
+ ;
+
+ public static final int SUCCESS_RESPONSE_VALUE = 1;
+ public static final int FAILURE_RESPONSE_VALUE = 2;
+ public static final int PAYLOAD_MESSAGE_VALUE = 3;
+
+
+ public final int getNumber() { return value; }
+
+ public static AuthMessageType valueOf(int value) {
+ switch (value) {
+ case 1: return SUCCESS_RESPONSE;
+ case 2: return FAILURE_RESPONSE;
+ case 3: return PAYLOAD_MESSAGE;
+ default: return null;
+ }
+ }
+
+ public static com.google.protobuf.Internal.EnumLiteMap<AuthMessageType>
+ internalGetValueMap() {
+ return internalValueMap;
+ }
+ private static com.google.protobuf.Internal.EnumLiteMap<AuthMessageType>
+ internalValueMap =
+ new com.google.protobuf.Internal.EnumLiteMap<AuthMessageType>() {
+ public AuthMessageType findValueByNumber(int number) {
+ return AuthMessageType.valueOf(number);
+ }
+ };
+
+ public final com.google.protobuf.Descriptors.EnumValueDescriptor
+ getValueDescriptor() {
+ return getDescriptor().getValues().get(index);
+ }
+ public final com.google.protobuf.Descriptors.EnumDescriptor
+ getDescriptorForType() {
+ return getDescriptor();
+ }
+ public static final com.google.protobuf.Descriptors.EnumDescriptor
+ getDescriptor() {
+ return org.apache.bookkeeper.proto.TestDataFormats.getDescriptor().getEnumTypes().get(0);
+ }
+
+ private static final AuthMessageType[] VALUES = {
+ SUCCESS_RESPONSE, FAILURE_RESPONSE, PAYLOAD_MESSAGE,
+ };
+
+ public static AuthMessageType valueOf(
+ com.google.protobuf.Descriptors.EnumValueDescriptor desc) {
+ if (desc.getType() != getDescriptor()) {
+ throw new java.lang.IllegalArgumentException(
+ "EnumValueDescriptor is not for this type.");
+ }
+ return VALUES[desc.getIndex()];
+ }
+
+ private final int index;
+ private final int value;
+
+ private AuthMessageType(int index, int value) {
+ this.index = index;
+ this.value = value;
+ }
+
+ // @@protoc_insertion_point(enum_scope:AuthMessageType)
+ }
+
+ public static final int MESSAGETYPE_FIELD_NUMBER = 1000;
+ public static final
+ com.google.protobuf.GeneratedMessage.GeneratedExtension<
+ org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage,
+ org.apache.bookkeeper.proto.TestDataFormats.AuthMessageType> messageType = com.google.protobuf.GeneratedMessage
+ .newFileScopedGeneratedExtension(
+ org.apache.bookkeeper.proto.TestDataFormats.AuthMessageType.class,
+ null);
+
+ public static com.google.protobuf.Descriptors.FileDescriptor
+ getDescriptor() {
+ return descriptor;
+ }
+ private static com.google.protobuf.Descriptors.FileDescriptor
+ descriptor;
+ static {
+ java.lang.String[] descriptorData = {
+ "\n$src/test/proto/TestDataFormats.proto\032\'" +
+ "src/main/proto/BookkeeperProtocol.proto*" +
+ "R\n\017AuthMessageType\022\024\n\020SUCCESS_RESPONSE\020\001" +
+ "\022\024\n\020FAILURE_RESPONSE\020\002\022\023\n\017PAYLOAD_MESSAG" +
+ "E\020\003:4\n\013messageType\022\014.AuthMessage\030\350\007 \002(\0162" +
+ "\020.AuthMessageTypeB\037\n\033org.apache.bookkeep" +
+ "er.protoH\001"
+ };
+ com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
+ new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
+ public com.google.protobuf.ExtensionRegistry assignDescriptors(
+ com.google.protobuf.Descriptors.FileDescriptor root) {
+ descriptor = root;
+ messageType.internalInit(descriptor.getExtensions().get(0));
+ return null;
+ }
+ };
+ com.google.protobuf.Descriptors.FileDescriptor
+ .internalBuildGeneratedFileFrom(descriptorData,
+ new com.google.protobuf.Descriptors.FileDescriptor[] {
+ org.apache.bookkeeper.proto.BookkeeperProtocol.getDescriptor(),
+ }, assigner);
+ }
+
+ // @@protoc_insertion_point(outer_class_scope)
+}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java
index ac6bd8d..b43f5cd 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java
@@ -20,9 +20,12 @@
*/
package org.apache.bookkeeper.proto;
+import org.apache.bookkeeper.auth.ClientAuthProvider;
+import org.apache.bookkeeper.auth.AuthProviderFactoryFactory;
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
@@ -38,6 +41,8 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.protobuf.ExtensionRegistry;
+
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
@@ -54,10 +59,16 @@ import static org.junit.Assert.*;
public class TestPerChannelBookieClient extends BookKeeperClusterTestCase {
private final static Logger LOG = LoggerFactory.getLogger(TestPerChannelBookieClient.class);
- public TestPerChannelBookieClient() {
+ ExtensionRegistry extRegistry = ExtensionRegistry.newInstance();
+ ClientAuthProvider.Factory authProvider;
+
+ public TestPerChannelBookieClient() throws Exception {
super(1);
+ authProvider = AuthProviderFactoryFactory.newClientAuthProviderFactory(
+ new ClientConfiguration(), extRegistry);
}
+
/**
* Test that a race does not exist between connection completion
* and client closure. If a race does exist, this test will simply
@@ -74,7 +85,8 @@ public class TestPerChannelBookieClient extends BookKeeperClusterTestCase {
BookieSocketAddress addr = getBookie(0);
for (int i = 0; i < 1000; i++) {
- PerChannelBookieClient client = new PerChannelBookieClient(executor, channelFactory, addr);
+ PerChannelBookieClient client = new PerChannelBookieClient(executor, channelFactory, addr,
+ authProvider, extRegistry);
client.connectIfNeededAndDoOp(new GenericCallback<PerChannelBookieClient>() {
@Override
public void operationComplete(int rc, PerChannelBookieClient client) {
@@ -118,7 +130,8 @@ public class TestPerChannelBookieClient extends BookKeeperClusterTestCase {
BookieSocketAddress addr = getBookie(0);
for (int i = 0; i < 100; i++) {
- PerChannelBookieClient client = new PerChannelBookieClient(executor, channelFactory, addr);
+ PerChannelBookieClient client = new PerChannelBookieClient(executor, channelFactory, addr,
+ authProvider, extRegistry);
for (int j = i; j < 10; j++) {
client.connectIfNeededAndDoOp(nullop);
}
@@ -150,7 +163,8 @@ public class TestPerChannelBookieClient extends BookKeeperClusterTestCase {
OrderedSafeExecutor executor = getOrderedSafeExecutor();
BookieSocketAddress addr = getBookie(0);
- final PerChannelBookieClient client = new PerChannelBookieClient(executor, channelFactory, addr);
+ final PerChannelBookieClient client = new PerChannelBookieClient(executor, channelFactory,
+ addr, authProvider, extRegistry);
final AtomicBoolean shouldFail = new AtomicBoolean(false);
final AtomicBoolean inconsistent = new AtomicBoolean(false);
final AtomicBoolean running = new AtomicBoolean(true);
@@ -247,7 +261,8 @@ public class TestPerChannelBookieClient extends BookKeeperClusterTestCase {
final OrderedSafeExecutor executor = getOrderedSafeExecutor();
BookieSocketAddress addr = getBookie(0);
- final PerChannelBookieClient client = new PerChannelBookieClient(executor, channelFactory, addr);
+ final PerChannelBookieClient client = new PerChannelBookieClient(executor, channelFactory,
+ addr, authProvider, extRegistry);
final CountDownLatch completion = new CountDownLatch(1);
final ReadEntryCallback cb = new ReadEntryCallback() {
@Override
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
index 278dc8c..28de0b2 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
@@ -195,6 +195,10 @@ public abstract class BookKeeperClusterTestCase {
f, new File[] { f });
}
+ protected ClientConfiguration newClientConfiguration() {
+ return new ClientConfiguration(baseConf);
+ }
+
protected ServerConfiguration newServerConfiguration(int port, String zkServers, File journalDir, File[] ledgerDirs) {
ServerConfiguration conf = new ServerConfiguration(baseConf);
conf.setBookiePort(port);
@@ -289,9 +293,11 @@ public abstract class BookKeeperClusterTestCase {
public void run() {
try {
bookie.suspendProcessing();
+ LOG.info("bookie {} is asleep", bookie.getLocalAddress());
l.countDown();
Thread.sleep(seconds*1000);
bookie.resumeProcessing();
+ LOG.info("bookie {} is awake", bookie.getLocalAddress());
} catch (Exception e) {
LOG.error("Error suspending bookie", e);
}
@@ -441,6 +447,10 @@ public abstract class BookKeeperClusterTestCase {
BookieServer server = new BookieServer(conf);
server.start();
+ if (bkc == null) {
+ bkc = new BookKeeperTestClient(baseClientConf);
+ }
+
int port = conf.getBookiePort();
String host = InetAddress.getLocalHost().getHostAddress();
if (conf.getUseHostNameAsBookieID()) {
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/test/proto/TestDataFormats.proto
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/proto/TestDataFormats.proto b/bookkeeper-server/src/test/proto/TestDataFormats.proto
new file mode 100644
index 0000000..0c616d7
--- /dev/null
+++ b/bookkeeper-server/src/test/proto/TestDataFormats.proto
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.bookkeeper.proto";
+option optimize_for = SPEED;
+
+import "src/main/proto/BookkeeperProtocol.proto";
+
+enum AuthMessageType {
+ SUCCESS_RESPONSE = 1;
+ FAILURE_RESPONSE = 2;
+ PAYLOAD_MESSAGE = 3;
+}
+
+/**
+ *
+ */
+extend AuthMessage {
+ required AuthMessageType messageType = 1000;
+}
[2/3] bookkeeper git commit: BOOKKEEPER-901: Authentication framework
Posted by si...@apache.org.
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperProtocol.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperProtocol.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperProtocol.java
index 57d3503..7fbb2cd 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperProtocol.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperProtocol.java
@@ -176,12 +176,14 @@ public final class BookkeeperProtocol {
ADD_ENTRY(1, 2),
RANGE_READ_ENTRY(2, 3),
RANGE_ADD_ENTRY(3, 4),
+ AUTH(4, 5),
;
public static final int READ_ENTRY_VALUE = 1;
public static final int ADD_ENTRY_VALUE = 2;
public static final int RANGE_READ_ENTRY_VALUE = 3;
public static final int RANGE_ADD_ENTRY_VALUE = 4;
+ public static final int AUTH_VALUE = 5;
public final int getNumber() { return value; }
@@ -192,6 +194,7 @@ public final class BookkeeperProtocol {
case 2: return ADD_ENTRY;
case 3: return RANGE_READ_ENTRY;
case 4: return RANGE_ADD_ENTRY;
+ case 5: return AUTH;
default: return null;
}
}
@@ -222,7 +225,7 @@ public final class BookkeeperProtocol {
}
private static final OperationType[] VALUES = {
- READ_ENTRY, ADD_ENTRY, RANGE_READ_ENTRY, RANGE_ADD_ENTRY,
+ READ_ENTRY, ADD_ENTRY, RANGE_READ_ENTRY, RANGE_ADD_ENTRY, AUTH,
};
public static OperationType valueOf(
@@ -756,6 +759,11 @@ public final class BookkeeperProtocol {
boolean hasAddRequest();
org.apache.bookkeeper.proto.BookkeeperProtocol.AddRequest getAddRequest();
org.apache.bookkeeper.proto.BookkeeperProtocol.AddRequestOrBuilder getAddRequestOrBuilder();
+
+ // optional .AuthMessage authRequest = 102;
+ boolean hasAuthRequest();
+ org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage getAuthRequest();
+ org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessageOrBuilder getAuthRequestOrBuilder();
}
public static final class Request extends
com.google.protobuf.GeneratedMessage
@@ -825,10 +833,24 @@ public final class BookkeeperProtocol {
return addRequest_;
}
+ // optional .AuthMessage authRequest = 102;
+ public static final int AUTHREQUEST_FIELD_NUMBER = 102;
+ private org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage authRequest_;
+ public boolean hasAuthRequest() {
+ return ((bitField0_ & 0x00000008) == 0x00000008);
+ }
+ public org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage getAuthRequest() {
+ return authRequest_;
+ }
+ public org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessageOrBuilder getAuthRequestOrBuilder() {
+ return authRequest_;
+ }
+
private void initFields() {
header_ = org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader.getDefaultInstance();
readRequest_ = org.apache.bookkeeper.proto.BookkeeperProtocol.ReadRequest.getDefaultInstance();
addRequest_ = org.apache.bookkeeper.proto.BookkeeperProtocol.AddRequest.getDefaultInstance();
+ authRequest_ = org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.getDefaultInstance();
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -855,6 +877,12 @@ public final class BookkeeperProtocol {
return false;
}
}
+ if (hasAuthRequest()) {
+ if (!getAuthRequest().isInitialized()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ }
memoizedIsInitialized = 1;
return true;
}
@@ -871,6 +899,9 @@ public final class BookkeeperProtocol {
if (((bitField0_ & 0x00000004) == 0x00000004)) {
output.writeMessage(101, addRequest_);
}
+ if (((bitField0_ & 0x00000008) == 0x00000008)) {
+ output.writeMessage(102, authRequest_);
+ }
getUnknownFields().writeTo(output);
}
@@ -892,6 +923,10 @@ public final class BookkeeperProtocol {
size += com.google.protobuf.CodedOutputStream
.computeMessageSize(101, addRequest_);
}
+ if (((bitField0_ & 0x00000008) == 0x00000008)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeMessageSize(102, authRequest_);
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -1011,6 +1046,7 @@ public final class BookkeeperProtocol {
getHeaderFieldBuilder();
getReadRequestFieldBuilder();
getAddRequestFieldBuilder();
+ getAuthRequestFieldBuilder();
}
}
private static Builder create() {
@@ -1037,6 +1073,12 @@ public final class BookkeeperProtocol {
addRequestBuilder_.clear();
}
bitField0_ = (bitField0_ & ~0x00000004);
+ if (authRequestBuilder_ == null) {
+ authRequest_ = org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.getDefaultInstance();
+ } else {
+ authRequestBuilder_.clear();
+ }
+ bitField0_ = (bitField0_ & ~0x00000008);
return this;
}
@@ -1099,6 +1141,14 @@ public final class BookkeeperProtocol {
} else {
result.addRequest_ = addRequestBuilder_.build();
}
+ if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
+ to_bitField0_ |= 0x00000008;
+ }
+ if (authRequestBuilder_ == null) {
+ result.authRequest_ = authRequest_;
+ } else {
+ result.authRequest_ = authRequestBuilder_.build();
+ }
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -1124,6 +1174,9 @@ public final class BookkeeperProtocol {
if (other.hasAddRequest()) {
mergeAddRequest(other.getAddRequest());
}
+ if (other.hasAuthRequest()) {
+ mergeAuthRequest(other.getAuthRequest());
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -1149,6 +1202,12 @@ public final class BookkeeperProtocol {
return false;
}
}
+ if (hasAuthRequest()) {
+ if (!getAuthRequest().isInitialized()) {
+
+ return false;
+ }
+ }
return true;
}
@@ -1202,6 +1261,15 @@ public final class BookkeeperProtocol {
setAddRequest(subBuilder.buildPartial());
break;
}
+ case 818: {
+ org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.Builder subBuilder = org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.newBuilder();
+ if (hasAuthRequest()) {
+ subBuilder.mergeFrom(getAuthRequest());
+ }
+ input.readMessage(subBuilder, extensionRegistry);
+ setAuthRequest(subBuilder.buildPartial());
+ break;
+ }
}
}
}
@@ -1478,6 +1546,96 @@ public final class BookkeeperProtocol {
return addRequestBuilder_;
}
+ // optional .AuthMessage authRequest = 102;
+ private org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage authRequest_ = org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.getDefaultInstance();
+ private com.google.protobuf.SingleFieldBuilder<
+ org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage, org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.Builder, org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessageOrBuilder> authRequestBuilder_;
+ public boolean hasAuthRequest() {
+ return ((bitField0_ & 0x00000008) == 0x00000008);
+ }
+ public org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage getAuthRequest() {
+ if (authRequestBuilder_ == null) {
+ return authRequest_;
+ } else {
+ return authRequestBuilder_.getMessage();
+ }
+ }
+ public Builder setAuthRequest(org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage value) {
+ if (authRequestBuilder_ == null) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ authRequest_ = value;
+ onChanged();
+ } else {
+ authRequestBuilder_.setMessage(value);
+ }
+ bitField0_ |= 0x00000008;
+ return this;
+ }
+ public Builder setAuthRequest(
+ org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.Builder builderForValue) {
+ if (authRequestBuilder_ == null) {
+ authRequest_ = builderForValue.build();
+ onChanged();
+ } else {
+ authRequestBuilder_.setMessage(builderForValue.build());
+ }
+ bitField0_ |= 0x00000008;
+ return this;
+ }
+ public Builder mergeAuthRequest(org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage value) {
+ if (authRequestBuilder_ == null) {
+ if (((bitField0_ & 0x00000008) == 0x00000008) &&
+ authRequest_ != org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.getDefaultInstance()) {
+ authRequest_ =
+ org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.newBuilder(authRequest_).mergeFrom(value).buildPartial();
+ } else {
+ authRequest_ = value;
+ }
+ onChanged();
+ } else {
+ authRequestBuilder_.mergeFrom(value);
+ }
+ bitField0_ |= 0x00000008;
+ return this;
+ }
+ public Builder clearAuthRequest() {
+ if (authRequestBuilder_ == null) {
+ authRequest_ = org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.getDefaultInstance();
+ onChanged();
+ } else {
+ authRequestBuilder_.clear();
+ }
+ bitField0_ = (bitField0_ & ~0x00000008);
+ return this;
+ }
+ public org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.Builder getAuthRequestBuilder() {
+ bitField0_ |= 0x00000008;
+ onChanged();
+ return getAuthRequestFieldBuilder().getBuilder();
+ }
+ public org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessageOrBuilder getAuthRequestOrBuilder() {
+ if (authRequestBuilder_ != null) {
+ return authRequestBuilder_.getMessageOrBuilder();
+ } else {
+ return authRequest_;
+ }
+ }
+ private com.google.protobuf.SingleFieldBuilder<
+ org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage, org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.Builder, org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessageOrBuilder>
+ getAuthRequestFieldBuilder() {
+ if (authRequestBuilder_ == null) {
+ authRequestBuilder_ = new com.google.protobuf.SingleFieldBuilder<
+ org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage, org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.Builder, org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessageOrBuilder>(
+ authRequest_,
+ getParentForChildren(),
+ isClean());
+ authRequest_ = null;
+ }
+ return authRequestBuilder_;
+ }
+
// @@protoc_insertion_point(builder_scope:Request)
}
@@ -2792,6 +2950,11 @@ public final class BookkeeperProtocol {
boolean hasAddResponse();
org.apache.bookkeeper.proto.BookkeeperProtocol.AddResponse getAddResponse();
org.apache.bookkeeper.proto.BookkeeperProtocol.AddResponseOrBuilder getAddResponseOrBuilder();
+
+ // optional .AuthMessage authResponse = 102;
+ boolean hasAuthResponse();
+ org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage getAuthResponse();
+ org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessageOrBuilder getAuthResponseOrBuilder();
}
public static final class Response extends
com.google.protobuf.GeneratedMessage
@@ -2871,11 +3034,25 @@ public final class BookkeeperProtocol {
return addResponse_;
}
+ // optional .AuthMessage authResponse = 102;
+ public static final int AUTHRESPONSE_FIELD_NUMBER = 102;
+ private org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage authResponse_;
+ public boolean hasAuthResponse() {
+ return ((bitField0_ & 0x00000010) == 0x00000010);
+ }
+ public org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage getAuthResponse() {
+ return authResponse_;
+ }
+ public org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessageOrBuilder getAuthResponseOrBuilder() {
+ return authResponse_;
+ }
+
private void initFields() {
header_ = org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader.getDefaultInstance();
status_ = org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode.EOK;
readResponse_ = org.apache.bookkeeper.proto.BookkeeperProtocol.ReadResponse.getDefaultInstance();
addResponse_ = org.apache.bookkeeper.proto.BookkeeperProtocol.AddResponse.getDefaultInstance();
+ authResponse_ = org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.getDefaultInstance();
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -2906,6 +3083,12 @@ public final class BookkeeperProtocol {
return false;
}
}
+ if (hasAuthResponse()) {
+ if (!getAuthResponse().isInitialized()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ }
memoizedIsInitialized = 1;
return true;
}
@@ -2925,6 +3108,9 @@ public final class BookkeeperProtocol {
if (((bitField0_ & 0x00000008) == 0x00000008)) {
output.writeMessage(101, addResponse_);
}
+ if (((bitField0_ & 0x00000010) == 0x00000010)) {
+ output.writeMessage(102, authResponse_);
+ }
getUnknownFields().writeTo(output);
}
@@ -2950,6 +3136,10 @@ public final class BookkeeperProtocol {
size += com.google.protobuf.CodedOutputStream
.computeMessageSize(101, addResponse_);
}
+ if (((bitField0_ & 0x00000010) == 0x00000010)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeMessageSize(102, authResponse_);
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -3069,6 +3259,7 @@ public final class BookkeeperProtocol {
getHeaderFieldBuilder();
getReadResponseFieldBuilder();
getAddResponseFieldBuilder();
+ getAuthResponseFieldBuilder();
}
}
private static Builder create() {
@@ -3097,6 +3288,12 @@ public final class BookkeeperProtocol {
addResponseBuilder_.clear();
}
bitField0_ = (bitField0_ & ~0x00000008);
+ if (authResponseBuilder_ == null) {
+ authResponse_ = org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.getDefaultInstance();
+ } else {
+ authResponseBuilder_.clear();
+ }
+ bitField0_ = (bitField0_ & ~0x00000010);
return this;
}
@@ -3163,6 +3360,14 @@ public final class BookkeeperProtocol {
} else {
result.addResponse_ = addResponseBuilder_.build();
}
+ if (((from_bitField0_ & 0x00000010) == 0x00000010)) {
+ to_bitField0_ |= 0x00000010;
+ }
+ if (authResponseBuilder_ == null) {
+ result.authResponse_ = authResponse_;
+ } else {
+ result.authResponse_ = authResponseBuilder_.build();
+ }
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -3191,6 +3396,9 @@ public final class BookkeeperProtocol {
if (other.hasAddResponse()) {
mergeAddResponse(other.getAddResponse());
}
+ if (other.hasAuthResponse()) {
+ mergeAuthResponse(other.getAuthResponse());
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -3220,6 +3428,12 @@ public final class BookkeeperProtocol {
return false;
}
}
+ if (hasAuthResponse()) {
+ if (!getAuthResponse().isInitialized()) {
+
+ return false;
+ }
+ }
return true;
}
@@ -3284,6 +3498,15 @@ public final class BookkeeperProtocol {
setAddResponse(subBuilder.buildPartial());
break;
}
+ case 818: {
+ org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.Builder subBuilder = org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.newBuilder();
+ if (hasAuthResponse()) {
+ subBuilder.mergeFrom(getAuthResponse());
+ }
+ input.readMessage(subBuilder, extensionRegistry);
+ setAuthResponse(subBuilder.buildPartial());
+ break;
+ }
}
}
}
@@ -3584,6 +3807,96 @@ public final class BookkeeperProtocol {
return addResponseBuilder_;
}
+ // optional .AuthMessage authResponse = 102;
+ private org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage authResponse_ = org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.getDefaultInstance();
+ private com.google.protobuf.SingleFieldBuilder<
+ org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage, org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.Builder, org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessageOrBuilder> authResponseBuilder_;
+ public boolean hasAuthResponse() {
+ return ((bitField0_ & 0x00000010) == 0x00000010);
+ }
+ public org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage getAuthResponse() {
+ if (authResponseBuilder_ == null) {
+ return authResponse_;
+ } else {
+ return authResponseBuilder_.getMessage();
+ }
+ }
+ public Builder setAuthResponse(org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage value) {
+ if (authResponseBuilder_ == null) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ authResponse_ = value;
+ onChanged();
+ } else {
+ authResponseBuilder_.setMessage(value);
+ }
+ bitField0_ |= 0x00000010;
+ return this;
+ }
+ public Builder setAuthResponse(
+ org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.Builder builderForValue) {
+ if (authResponseBuilder_ == null) {
+ authResponse_ = builderForValue.build();
+ onChanged();
+ } else {
+ authResponseBuilder_.setMessage(builderForValue.build());
+ }
+ bitField0_ |= 0x00000010;
+ return this;
+ }
+ public Builder mergeAuthResponse(org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage value) {
+ if (authResponseBuilder_ == null) {
+ if (((bitField0_ & 0x00000010) == 0x00000010) &&
+ authResponse_ != org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.getDefaultInstance()) {
+ authResponse_ =
+ org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.newBuilder(authResponse_).mergeFrom(value).buildPartial();
+ } else {
+ authResponse_ = value;
+ }
+ onChanged();
+ } else {
+ authResponseBuilder_.mergeFrom(value);
+ }
+ bitField0_ |= 0x00000010;
+ return this;
+ }
+ public Builder clearAuthResponse() {
+ if (authResponseBuilder_ == null) {
+ authResponse_ = org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.getDefaultInstance();
+ onChanged();
+ } else {
+ authResponseBuilder_.clear();
+ }
+ bitField0_ = (bitField0_ & ~0x00000010);
+ return this;
+ }
+ public org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.Builder getAuthResponseBuilder() {
+ bitField0_ |= 0x00000010;
+ onChanged();
+ return getAuthResponseFieldBuilder().getBuilder();
+ }
+ public org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessageOrBuilder getAuthResponseOrBuilder() {
+ if (authResponseBuilder_ != null) {
+ return authResponseBuilder_.getMessageOrBuilder();
+ } else {
+ return authResponse_;
+ }
+ }
+ private com.google.protobuf.SingleFieldBuilder<
+ org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage, org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.Builder, org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessageOrBuilder>
+ getAuthResponseFieldBuilder() {
+ if (authResponseBuilder_ == null) {
+ authResponseBuilder_ = new com.google.protobuf.SingleFieldBuilder<
+ org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage, org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.Builder, org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessageOrBuilder>(
+ authResponse_,
+ getParentForChildren(),
+ isClean());
+ authResponse_ = null;
+ }
+ return authResponseBuilder_;
+ }
+
// @@protoc_insertion_point(builder_scope:Response)
}
@@ -4625,6 +4938,404 @@ public final class BookkeeperProtocol {
// @@protoc_insertion_point(class_scope:AddResponse)
}
+ public interface AuthMessageOrBuilder extends
+ com.google.protobuf.GeneratedMessage.
+ ExtendableMessageOrBuilder<AuthMessage> {
+
+ // required string authPluginName = 1;
+ boolean hasAuthPluginName();
+ String getAuthPluginName();
+ }
+ public static final class AuthMessage extends
+ com.google.protobuf.GeneratedMessage.ExtendableMessage<
+ AuthMessage> implements AuthMessageOrBuilder {
+ // Use AuthMessage.newBuilder() to construct.
+ private AuthMessage(Builder builder) {
+ super(builder);
+ }
+ private AuthMessage(boolean noInit) {}
+
+ private static final AuthMessage defaultInstance;
+ public static AuthMessage getDefaultInstance() {
+ return defaultInstance;
+ }
+
+ public AuthMessage getDefaultInstanceForType() {
+ return defaultInstance;
+ }
+
+ public static final com.google.protobuf.Descriptors.Descriptor
+ getDescriptor() {
+ return org.apache.bookkeeper.proto.BookkeeperProtocol.internal_static_AuthMessage_descriptor;
+ }
+
+ protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return org.apache.bookkeeper.proto.BookkeeperProtocol.internal_static_AuthMessage_fieldAccessorTable;
+ }
+
+ private int bitField0_;
+ // required string authPluginName = 1;
+ public static final int AUTHPLUGINNAME_FIELD_NUMBER = 1;
+ private java.lang.Object authPluginName_;
+ public boolean hasAuthPluginName() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ public String getAuthPluginName() {
+ java.lang.Object ref = authPluginName_;
+ if (ref instanceof String) {
+ return (String) ref;
+ } else {
+ com.google.protobuf.ByteString bs =
+ (com.google.protobuf.ByteString) ref;
+ String s = bs.toStringUtf8();
+ if (com.google.protobuf.Internal.isValidUtf8(bs)) {
+ authPluginName_ = s;
+ }
+ return s;
+ }
+ }
+ private com.google.protobuf.ByteString getAuthPluginNameBytes() {
+ java.lang.Object ref = authPluginName_;
+ if (ref instanceof String) {
+ com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString.copyFromUtf8((String) ref);
+ authPluginName_ = b;
+ return b;
+ } else {
+ return (com.google.protobuf.ByteString) ref;
+ }
+ }
+
+ private void initFields() {
+ authPluginName_ = "";
+ }
+ private byte memoizedIsInitialized = -1;
+ public final boolean isInitialized() {
+ byte isInitialized = memoizedIsInitialized;
+ if (isInitialized != -1) return isInitialized == 1;
+
+ if (!hasAuthPluginName()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ if (!extensionsAreInitialized()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ memoizedIsInitialized = 1;
+ return true;
+ }
+
+ public void writeTo(com.google.protobuf.CodedOutputStream output)
+ throws java.io.IOException {
+ getSerializedSize();
+ com.google.protobuf.GeneratedMessage
+ .ExtendableMessage<org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage>.ExtensionWriter extensionWriter =
+ newExtensionWriter();
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ output.writeBytes(1, getAuthPluginNameBytes());
+ }
+ extensionWriter.writeUntil(536870912, output);
+ getUnknownFields().writeTo(output);
+ }
+
+ private int memoizedSerializedSize = -1;
+ public int getSerializedSize() {
+ int size = memoizedSerializedSize;
+ if (size != -1) return size;
+
+ size = 0;
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBytesSize(1, getAuthPluginNameBytes());
+ }
+ size += extensionsSerializedSize();
+ size += getUnknownFields().getSerializedSize();
+ memoizedSerializedSize = size;
+ return size;
+ }
+
+ private static final long serialVersionUID = 0L;
+ @java.lang.Override
+ protected java.lang.Object writeReplace()
+ throws java.io.ObjectStreamException {
+ return super.writeReplace();
+ }
+
+ public static org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage parseFrom(
+ com.google.protobuf.ByteString data)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return newBuilder().mergeFrom(data).buildParsed();
+ }
+ public static org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage parseFrom(
+ com.google.protobuf.ByteString data,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return newBuilder().mergeFrom(data, extensionRegistry)
+ .buildParsed();
+ }
+ public static org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage parseFrom(byte[] data)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return newBuilder().mergeFrom(data).buildParsed();
+ }
+ public static org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage parseFrom(
+ byte[] data,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return newBuilder().mergeFrom(data, extensionRegistry)
+ .buildParsed();
+ }
+ public static org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage parseFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ return newBuilder().mergeFrom(input).buildParsed();
+ }
+ public static org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage parseFrom(
+ java.io.InputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return newBuilder().mergeFrom(input, extensionRegistry)
+ .buildParsed();
+ }
+ public static org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage parseDelimitedFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ Builder builder = newBuilder();
+ if (builder.mergeDelimitedFrom(input)) {
+ return builder.buildParsed();
+ } else {
+ return null;
+ }
+ }
+ public static org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage parseDelimitedFrom(
+ java.io.InputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ Builder builder = newBuilder();
+ if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
+ return builder.buildParsed();
+ } else {
+ return null;
+ }
+ }
+ public static org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage parseFrom(
+ com.google.protobuf.CodedInputStream input)
+ throws java.io.IOException {
+ return newBuilder().mergeFrom(input).buildParsed();
+ }
+ public static org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage parseFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return newBuilder().mergeFrom(input, extensionRegistry)
+ .buildParsed();
+ }
+
+ public static Builder newBuilder() { return Builder.create(); }
+ public Builder newBuilderForType() { return newBuilder(); }
+ public static Builder newBuilder(org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage prototype) {
+ return newBuilder().mergeFrom(prototype);
+ }
+ public Builder toBuilder() { return newBuilder(this); }
+
+ @java.lang.Override
+ protected Builder newBuilderForType(
+ com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+ Builder builder = new Builder(parent);
+ return builder;
+ }
+ public static final class Builder extends
+ com.google.protobuf.GeneratedMessage.ExtendableBuilder<
+ org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage, Builder> implements org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessageOrBuilder {
+ public static final com.google.protobuf.Descriptors.Descriptor
+ getDescriptor() {
+ return org.apache.bookkeeper.proto.BookkeeperProtocol.internal_static_AuthMessage_descriptor;
+ }
+
+ protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return org.apache.bookkeeper.proto.BookkeeperProtocol.internal_static_AuthMessage_fieldAccessorTable;
+ }
+
+ // Construct using org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.newBuilder()
+ private Builder() {
+ maybeForceBuilderInitialization();
+ }
+
+ private Builder(BuilderParent parent) {
+ super(parent);
+ maybeForceBuilderInitialization();
+ }
+ private void maybeForceBuilderInitialization() {
+ if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
+ }
+ }
+ private static Builder create() {
+ return new Builder();
+ }
+
+ public Builder clear() {
+ super.clear();
+ authPluginName_ = "";
+ bitField0_ = (bitField0_ & ~0x00000001);
+ return this;
+ }
+
+ public Builder clone() {
+ return create().mergeFrom(buildPartial());
+ }
+
+ public com.google.protobuf.Descriptors.Descriptor
+ getDescriptorForType() {
+ return org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.getDescriptor();
+ }
+
+ public org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage getDefaultInstanceForType() {
+ return org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.getDefaultInstance();
+ }
+
+ public org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage build() {
+ org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage result = buildPartial();
+ if (!result.isInitialized()) {
+ throw newUninitializedMessageException(result);
+ }
+ return result;
+ }
+
+ private org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage buildParsed()
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage result = buildPartial();
+ if (!result.isInitialized()) {
+ throw newUninitializedMessageException(
+ result).asInvalidProtocolBufferException();
+ }
+ return result;
+ }
+
+ public org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage buildPartial() {
+ org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage result = new org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage(this);
+ int from_bitField0_ = bitField0_;
+ int to_bitField0_ = 0;
+ if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+ to_bitField0_ |= 0x00000001;
+ }
+ result.authPluginName_ = authPluginName_;
+ result.bitField0_ = to_bitField0_;
+ onBuilt();
+ return result;
+ }
+
+ public Builder mergeFrom(com.google.protobuf.Message other) {
+ if (other instanceof org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage) {
+ return mergeFrom((org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage)other);
+ } else {
+ super.mergeFrom(other);
+ return this;
+ }
+ }
+
+ public Builder mergeFrom(org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage other) {
+ if (other == org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.getDefaultInstance()) return this;
+ if (other.hasAuthPluginName()) {
+ setAuthPluginName(other.getAuthPluginName());
+ }
+ this.mergeExtensionFields(other);
+ this.mergeUnknownFields(other.getUnknownFields());
+ return this;
+ }
+
+ public final boolean isInitialized() {
+ if (!hasAuthPluginName()) {
+
+ return false;
+ }
+ if (!extensionsAreInitialized()) {
+
+ return false;
+ }
+ return true;
+ }
+
+ public Builder mergeFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ com.google.protobuf.UnknownFieldSet.Builder unknownFields =
+ com.google.protobuf.UnknownFieldSet.newBuilder(
+ this.getUnknownFields());
+ while (true) {
+ int tag = input.readTag();
+ switch (tag) {
+ case 0:
+ this.setUnknownFields(unknownFields.build());
+ onChanged();
+ return this;
+ default: {
+ if (!parseUnknownField(input, unknownFields,
+ extensionRegistry, tag)) {
+ this.setUnknownFields(unknownFields.build());
+ onChanged();
+ return this;
+ }
+ break;
+ }
+ case 10: {
+ bitField0_ |= 0x00000001;
+ authPluginName_ = input.readBytes();
+ break;
+ }
+ }
+ }
+ }
+
+ private int bitField0_;
+
+ // required string authPluginName = 1;
+ private java.lang.Object authPluginName_ = "";
+ public boolean hasAuthPluginName() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ public String getAuthPluginName() {
+ java.lang.Object ref = authPluginName_;
+ if (!(ref instanceof String)) {
+ String s = ((com.google.protobuf.ByteString) ref).toStringUtf8();
+ authPluginName_ = s;
+ return s;
+ } else {
+ return (String) ref;
+ }
+ }
+ public Builder setAuthPluginName(String value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000001;
+ authPluginName_ = value;
+ onChanged();
+ return this;
+ }
+ public Builder clearAuthPluginName() {
+ bitField0_ = (bitField0_ & ~0x00000001);
+ authPluginName_ = getDefaultInstance().getAuthPluginName();
+ onChanged();
+ return this;
+ }
+ void setAuthPluginName(com.google.protobuf.ByteString value) {
+ bitField0_ |= 0x00000001;
+ authPluginName_ = value;
+ onChanged();
+ }
+
+ // @@protoc_insertion_point(builder_scope:AuthMessage)
+ }
+
+ static {
+ defaultInstance = new AuthMessage(true);
+ defaultInstance.initFields();
+ }
+
+ // @@protoc_insertion_point(class_scope:AuthMessage)
+ }
+
private static com.google.protobuf.Descriptors.Descriptor
internal_static_BKPacketHeader_descriptor;
private static
@@ -4660,6 +5371,11 @@ public final class BookkeeperProtocol {
private static
com.google.protobuf.GeneratedMessage.FieldAccessorTable
internal_static_AddResponse_fieldAccessorTable;
+ private static com.google.protobuf.Descriptors.Descriptor
+ internal_static_AuthMessage_descriptor;
+ private static
+ com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internal_static_AuthMessage_fieldAccessorTable;
public static com.google.protobuf.Descriptors.FileDescriptor
getDescriptor() {
@@ -4672,33 +5388,36 @@ public final class BookkeeperProtocol {
"\n\'src/main/proto/BookkeeperProtocol.prot" +
"o\"e\n\016BKPacketHeader\022!\n\007version\030\001 \002(\0162\020.P" +
"rotocolVersion\022!\n\toperation\030\002 \002(\0162\016.Oper" +
- "ationType\022\r\n\005txnId\030\003 \002(\004\"n\n\007Request\022\037\n\006h" +
- "eader\030\001 \002(\0132\017.BKPacketHeader\022!\n\013readRequ" +
- "est\030d \001(\0132\014.ReadRequest\022\037\n\naddRequest\030e " +
- "\001(\0132\013.AddRequest\"~\n\013ReadRequest\022\037\n\004flag\030" +
- "d \001(\0162\021.ReadRequest.Flag\022\020\n\010ledgerId\030\001 \002" +
- "(\003\022\017\n\007entryId\030\002 \002(\003\022\021\n\tmasterKey\030\003 \001(\014\"\030" +
- "\n\004Flag\022\020\n\014FENCE_LEDGER\020\001\"\212\001\n\nAddRequest\022",
- "\036\n\004flag\030d \001(\0162\020.AddRequest.Flag\022\020\n\010ledge" +
- "rId\030\001 \002(\003\022\017\n\007entryId\030\002 \002(\003\022\021\n\tmasterKey\030" +
- "\003 \002(\014\022\014\n\004body\030\004 \002(\014\"\030\n\004Flag\022\020\n\014RECOVERY_" +
- "ADD\020\001\"\220\001\n\010Response\022\037\n\006header\030\001 \002(\0132\017.BKP" +
- "acketHeader\022\033\n\006status\030\002 \002(\0162\013.StatusCode" +
- "\022#\n\014readResponse\030d \001(\0132\r.ReadResponse\022!\n" +
- "\013addResponse\030e \001(\0132\014.AddResponse\"\\\n\014Read" +
- "Response\022\033\n\006status\030\001 \002(\0162\013.StatusCode\022\020\n" +
- "\010ledgerId\030\002 \002(\003\022\017\n\007entryId\030\003 \002(\003\022\014\n\004body" +
- "\030\004 \001(\014\"M\n\013AddResponse\022\033\n\006status\030\001 \002(\0162\013.",
- "StatusCode\022\020\n\010ledgerId\030\002 \002(\003\022\017\n\007entryId\030" +
- "\003 \002(\003*F\n\017ProtocolVersion\022\017\n\013VERSION_ONE\020" +
- "\001\022\017\n\013VERSION_TWO\020\002\022\021\n\rVERSION_THREE\020\003*\206\001" +
- "\n\nStatusCode\022\007\n\003EOK\020\000\022\016\n\tENOLEDGER\020\222\003\022\r\n" +
- "\010ENOENTRY\020\223\003\022\014\n\007EBADREQ\020\224\003\022\010\n\003EIO\020\365\003\022\010\n\003" +
- "EUA\020\366\003\022\020\n\013EBADVERSION\020\367\003\022\014\n\007EFENCED\020\370\003\022\016" +
- "\n\tEREADONLY\020\371\003*Y\n\rOperationType\022\016\n\nREAD_" +
- "ENTRY\020\001\022\r\n\tADD_ENTRY\020\002\022\024\n\020RANGE_READ_ENT" +
- "RY\020\003\022\023\n\017RANGE_ADD_ENTRY\020\004B\037\n\033org.apache." +
- "bookkeeper.protoH\001"
+ "ationType\022\r\n\005txnId\030\003 \002(\004\"\221\001\n\007Request\022\037\n\006" +
+ "header\030\001 \002(\0132\017.BKPacketHeader\022!\n\013readReq" +
+ "uest\030d \001(\0132\014.ReadRequest\022\037\n\naddRequest\030e" +
+ " \001(\0132\013.AddRequest\022!\n\013authRequest\030f \001(\0132\014" +
+ ".AuthMessage\"~\n\013ReadRequest\022\037\n\004flag\030d \001(" +
+ "\0162\021.ReadRequest.Flag\022\020\n\010ledgerId\030\001 \002(\003\022\017" +
+ "\n\007entryId\030\002 \002(\003\022\021\n\tmasterKey\030\003 \001(\014\"\030\n\004Fl",
+ "ag\022\020\n\014FENCE_LEDGER\020\001\"\212\001\n\nAddRequest\022\036\n\004f" +
+ "lag\030d \001(\0162\020.AddRequest.Flag\022\020\n\010ledgerId\030" +
+ "\001 \002(\003\022\017\n\007entryId\030\002 \002(\003\022\021\n\tmasterKey\030\003 \002(" +
+ "\014\022\014\n\004body\030\004 \002(\014\"\030\n\004Flag\022\020\n\014RECOVERY_ADD\020" +
+ "\001\"\264\001\n\010Response\022\037\n\006header\030\001 \002(\0132\017.BKPacke" +
+ "tHeader\022\033\n\006status\030\002 \002(\0162\013.StatusCode\022#\n\014" +
+ "readResponse\030d \001(\0132\r.ReadResponse\022!\n\013add" +
+ "Response\030e \001(\0132\014.AddResponse\022\"\n\014authResp" +
+ "onse\030f \001(\0132\014.AuthMessage\"\\\n\014ReadResponse" +
+ "\022\033\n\006status\030\001 \002(\0162\013.StatusCode\022\020\n\010ledgerI",
+ "d\030\002 \002(\003\022\017\n\007entryId\030\003 \002(\003\022\014\n\004body\030\004 \001(\014\"M" +
+ "\n\013AddResponse\022\033\n\006status\030\001 \002(\0162\013.StatusCo" +
+ "de\022\020\n\010ledgerId\030\002 \002(\003\022\017\n\007entryId\030\003 \002(\003\"0\n" +
+ "\013AuthMessage\022\026\n\016authPluginName\030\001 \002(\t*\t\010\350" +
+ "\007\020\200\200\200\200\002*F\n\017ProtocolVersion\022\017\n\013VERSION_ON" +
+ "E\020\001\022\017\n\013VERSION_TWO\020\002\022\021\n\rVERSION_THREE\020\003*" +
+ "\206\001\n\nStatusCode\022\007\n\003EOK\020\000\022\016\n\tENOLEDGER\020\222\003\022" +
+ "\r\n\010ENOENTRY\020\223\003\022\014\n\007EBADREQ\020\224\003\022\010\n\003EIO\020\365\003\022\010" +
+ "\n\003EUA\020\366\003\022\020\n\013EBADVERSION\020\367\003\022\014\n\007EFENCED\020\370\003" +
+ "\022\016\n\tEREADONLY\020\371\003*c\n\rOperationType\022\016\n\nREA",
+ "D_ENTRY\020\001\022\r\n\tADD_ENTRY\020\002\022\024\n\020RANGE_READ_E" +
+ "NTRY\020\003\022\023\n\017RANGE_ADD_ENTRY\020\004\022\010\n\004AUTH\020\005B\037\n" +
+ "\033org.apache.bookkeeper.protoH\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -4718,7 +5437,7 @@ public final class BookkeeperProtocol {
internal_static_Request_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_Request_descriptor,
- new java.lang.String[] { "Header", "ReadRequest", "AddRequest", },
+ new java.lang.String[] { "Header", "ReadRequest", "AddRequest", "AuthRequest", },
org.apache.bookkeeper.proto.BookkeeperProtocol.Request.class,
org.apache.bookkeeper.proto.BookkeeperProtocol.Request.Builder.class);
internal_static_ReadRequest_descriptor =
@@ -4742,7 +5461,7 @@ public final class BookkeeperProtocol {
internal_static_Response_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_Response_descriptor,
- new java.lang.String[] { "Header", "Status", "ReadResponse", "AddResponse", },
+ new java.lang.String[] { "Header", "Status", "ReadResponse", "AddResponse", "AuthResponse", },
org.apache.bookkeeper.proto.BookkeeperProtocol.Response.class,
org.apache.bookkeeper.proto.BookkeeperProtocol.Response.Builder.class);
internal_static_ReadResponse_descriptor =
@@ -4761,6 +5480,14 @@ public final class BookkeeperProtocol {
new java.lang.String[] { "Status", "LedgerId", "EntryId", },
org.apache.bookkeeper.proto.BookkeeperProtocol.AddResponse.class,
org.apache.bookkeeper.proto.BookkeeperProtocol.AddResponse.Builder.class);
+ internal_static_AuthMessage_descriptor =
+ getDescriptor().getMessageTypes().get(7);
+ internal_static_AuthMessage_fieldAccessorTable = new
+ com.google.protobuf.GeneratedMessage.FieldAccessorTable(
+ internal_static_AuthMessage_descriptor,
+ new java.lang.String[] { "AuthPluginName", },
+ org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.class,
+ org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage.Builder.class);
return null;
}
};
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 2bd4e9b..0f9feea 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.bookkeeper.auth.ClientAuthProvider;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeperClientStats;
import org.apache.bookkeeper.conf.ClientConfiguration;
@@ -79,6 +80,7 @@ import org.slf4j.LoggerFactory;
import com.google.common.collect.Sets;
import com.google.protobuf.ByteString;
+import com.google.protobuf.ExtensionRegistry;
/**
* This class manages all details of connection to a particular bookie. It also
@@ -134,15 +136,27 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
private final ClientConfiguration conf;
private final PerChannelBookieClientPool pcbcPool;
+ private final ClientAuthProvider.Factory authProviderFactory;
+ private final ExtensionRegistry extRegistry;
public PerChannelBookieClient(OrderedSafeExecutor executor, ClientSocketChannelFactory channelFactory,
BookieSocketAddress addr) {
- this(new ClientConfiguration(), executor, channelFactory, addr, null, NullStatsLogger.INSTANCE, null);
+ this(new ClientConfiguration(), executor, channelFactory, addr, null, NullStatsLogger.INSTANCE, null, null, null);
+ }
+
+ public PerChannelBookieClient(OrderedSafeExecutor executor, ClientSocketChannelFactory channelFactory,
+ BookieSocketAddress addr,
+ ClientAuthProvider.Factory authProviderFactory,
+ ExtensionRegistry extRegistry) {
+ this(new ClientConfiguration(), executor, channelFactory, addr, null, NullStatsLogger.INSTANCE,
+ authProviderFactory, extRegistry, null);
}
public PerChannelBookieClient(ClientConfiguration conf, OrderedSafeExecutor executor,
ClientSocketChannelFactory channelFactory, BookieSocketAddress addr,
HashedWheelTimer requestTimer, StatsLogger parentStatsLogger,
+ ClientAuthProvider.Factory authProviderFactory,
+ ExtensionRegistry extRegistry,
PerChannelBookieClientPool pcbcPool) {
this.conf = conf;
this.addr = addr;
@@ -153,6 +167,9 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
this.addEntryTimeout = conf.getAddEntryTimeout();
this.readEntryTimeout = conf.getReadEntryTimeout();
+ this.authProviderFactory = authProviderFactory;
+ this.extRegistry = extRegistry;
+
StringBuilder nameBuilder = new StringBuilder();
nameBuilder.append(addr.getHostname().replace('.', '_').replace('-', '_'))
.append("_").append(addr.getPort());
@@ -563,8 +580,8 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
bAddress = c.getRemoteAddress().toString();
}
- LOG.debug("Could not write request for reading entry: {} ledger-id: {} bookie: {}",
- new Object[]{ readCompletion.entryId, readCompletion.ledgerId, bAddress });
+ LOG.debug("Could not write request for reading entry: {} ledger-id: {} bookie: {} rc: {}",
+ new Object[]{ readCompletion.entryId, readCompletion.ledgerId, bAddress, rc });
readCompletion.cb.readEntryComplete(rc, readCompletion.ledgerId, readCompletion.entryId,
null, readCompletion.ctx);
@@ -594,8 +611,8 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
if(c != null) {
bAddress = c.getRemoteAddress().toString();
}
- LOG.debug("Could not write request for adding entry: {} ledger-id: {} bookie: {}",
- new Object[] { addCompletion.entryId, addCompletion.ledgerId, bAddress });
+ LOG.debug("Could not write request for adding entry: {} ledger-id: {} bookie: {} rc: {}",
+ new Object[] { addCompletion.entryId, addCompletion.ledgerId, bAddress, rc });
addCompletion.cb.writeComplete(rc, addCompletion.ledgerId, addCompletion.entryId,
addr, addCompletion.ctx);
@@ -656,8 +673,9 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
pipeline.addLast("lengthbasedframedecoder", new LengthFieldBasedFrameDecoder(MAX_FRAME_LENGTH, 0, 4, 0, 4));
pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
- pipeline.addLast("bookieProtoEncoder", new BookieProtoEncoding.RequestEncoder());
- pipeline.addLast("bookieProtoDecoder", new BookieProtoEncoding.ResponseDecoder());
+ pipeline.addLast("bookieProtoEncoder", new BookieProtoEncoding.RequestEncoder(extRegistry));
+ pipeline.addLast("bookieProtoDecoder", new BookieProtoEncoding.ResponseDecoder(extRegistry));
+ pipeline.addLast("authHandler", new AuthHandler.ClientSideHandler(authProviderFactory, txnIdGenerator));
pipeline.addLast("mainhandler", this);
return pipeline;
}
@@ -699,6 +717,16 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
return;
}
+ if (t instanceof AuthHandler.AuthenticationException) {
+ LOG.error("Error authenticating connection", t);
+ errorOutOutstandingEntries(BKException.Code.UnauthorizedAccessException);
+ Channel c = ctx.getChannel();
+ if (c != null) {
+ closeChannel(c);
+ }
+ return;
+ }
+
if (t instanceof IOException) {
// these are thrown when a bookie fails, logging them just pollutes
// the logs (the failure is logged from the listeners on the write
@@ -739,7 +767,6 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
LOG.debug("Unexpected response received from bookie : " + addr + " for type : " + header. getOperation() +
" and txnId : " + header.getTxnId());
}
-
} else {
long orderingKey = completionValue.ledgerId;
executor.submitOrdered(orderingKey, new SafeRunnable() {
@@ -748,10 +775,10 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
OperationType type = header.getOperation();
switch (type) {
case ADD_ENTRY:
- handleAddResponse(response.getAddResponse(), completionValue);
+ handleAddResponse(response, completionValue);
break;
case READ_ENTRY:
- handleReadResponse(response.getReadResponse(), completionValue);
+ handleReadResponse(response, completionValue);
break;
default:
LOG.error("Unexpected response, type:{} received from bookie:{}, ignoring",
@@ -770,13 +797,14 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
}
}
- void handleAddResponse(AddResponse response, CompletionValue completionValue) {
+ void handleAddResponse(Response response, CompletionValue completionValue) {
// The completion value should always be an instance of an AddCompletion object when we reach here.
AddCompletion ac = (AddCompletion)completionValue;
+ AddResponse addResponse = response.getAddResponse();
- long ledgerId = response.getLedgerId();
- long entryId = response.getEntryId();
- StatusCode status = response.getStatus();
+ long ledgerId = addResponse.getLedgerId();
+ long entryId = addResponse.getEntryId();
+ StatusCode status = response.getStatus() == StatusCode.EOK ? addResponse.getStatus() : response.getStatus();
if (LOG.isDebugEnabled()) {
LOG.debug("Got response for add request from bookie: " + addr + " for ledger: " + ledgerId + " entry: "
@@ -796,17 +824,19 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
ac.cb.writeComplete(rcToRet, ledgerId, entryId, addr, ac.ctx);
}
- void handleReadResponse(ReadResponse response, CompletionValue completionValue) {
+ void handleReadResponse(Response response, CompletionValue completionValue) {
// The completion value should always be an instance of a ReadCompletion object when we reach here.
ReadCompletion rc = (ReadCompletion)completionValue;
+ ReadResponse readResponse = response.getReadResponse();
+
+ long ledgerId = readResponse.getLedgerId();
+ long entryId = readResponse.getEntryId();
+ StatusCode status = response.getStatus() == StatusCode.EOK ? readResponse.getStatus() : response.getStatus();
- long ledgerId = response.getLedgerId();
- long entryId = response.getEntryId();
- StatusCode status = response.getStatus();
ChannelBuffer buffer = ChannelBuffers.buffer(0);
- if (response.hasBody()) {
- buffer = ChannelBuffers.copiedBuffer(response.getBody().asReadOnlyByteBuffer());
+ if (readResponse.hasBody()) {
+ buffer = ChannelBuffers.copiedBuffer(readResponse.getBody().asReadOnlyByteBuffer());
}
if (LOG.isDebugEnabled()) {
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/main/proto/BookkeeperProtocol.proto
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/proto/BookkeeperProtocol.proto b/bookkeeper-server/src/main/proto/BookkeeperProtocol.proto
index 56ba581..7aeadfc 100644
--- a/bookkeeper-server/src/main/proto/BookkeeperProtocol.proto
+++ b/bookkeeper-server/src/main/proto/BookkeeperProtocol.proto
@@ -55,6 +55,8 @@ enum OperationType {
// Not supported yet.
RANGE_READ_ENTRY = 3;
RANGE_ADD_ENTRY = 4;
+
+ AUTH = 5;
}
/**
@@ -71,6 +73,7 @@ message Request {
// Requests
optional ReadRequest readRequest = 100;
optional AddRequest addRequest = 101;
+ optional AuthMessage authRequest = 102;
}
message ReadRequest {
@@ -105,7 +108,7 @@ message Response {
// Response
optional ReadResponse readResponse = 100;
optional AddResponse addResponse = 101;
-
+ optional AuthMessage authResponse = 102;
}
message ReadResponse {
@@ -120,3 +123,12 @@ message AddResponse {
required int64 ledgerId = 2;
required int64 entryId = 3;
}
+
+/**
+ * Extendible message which auth mechanisms
+ * can use to carry their payload.
+ */
+message AuthMessage {
+ required string authPluginName = 1;
+ extensions 1000 to max;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/test/java/org/apache/bookkeeper/auth/TestAuth.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/auth/TestAuth.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/auth/TestAuth.java
new file mode 100644
index 0000000..a57bfe9
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/auth/TestAuth.java
@@ -0,0 +1,654 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.auth;
+
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.Enumeration;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.proto.BookieServer;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage;
+import org.apache.bookkeeper.proto.TestDataFormats;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+import com.google.protobuf.ExtensionRegistry;
+
+public class TestAuth extends BookKeeperClusterTestCase {
+ static final Logger LOG = LoggerFactory.getLogger(TestAuth.class);
+ public static final String TEST_AUTH_PROVIDER_PLUGIN_NAME = "TestAuthProviderPlugin";
+ private static final byte[] PASSWD = "testPasswd".getBytes();
+ private static final byte[] ENTRY = "TestEntry".getBytes();
+
+ public TestAuth() {
+ super(0); // start them later when auth providers are configured
+ }
+
+ // we pass in ledgerId because the method may throw exceptions
+ private void connectAndWriteToBookie(ClientConfiguration conf, AtomicLong ledgerWritten)
+ throws Exception {
+ LOG.info("Connecting to bookie");
+ BookKeeper bkc = new BookKeeper(conf, zkc);
+ LedgerHandle l = bkc.createLedger(1, 1, DigestType.CRC32,
+ PASSWD);
+ ledgerWritten.set(l.getId());
+ l.addEntry(ENTRY);
+ l.close();
+ bkc.close();
+ }
+
+ /**
+ * check if the entry exists. Restart the bookie to allow
+ * access
+ */
+ private int entryCount(long ledgerId, ServerConfiguration bookieConf,
+ ClientConfiguration clientConf) throws Exception {
+ LOG.info("Counting entries in {}", ledgerId);
+ for (ServerConfiguration conf : bsConfs) {
+ conf.setBookieAuthProviderFactoryClass(
+ AlwaysSucceedBookieAuthProviderFactory.class.getName());
+ }
+ clientConf.setClientAuthProviderFactoryClass(
+ SendUntilCompleteClientAuthProviderFactory.class.getName());
+
+ restartBookies();
+
+ BookKeeper bkc = new BookKeeper(clientConf, zkc);
+ LedgerHandle lh = bkc.openLedger(ledgerId, DigestType.CRC32,
+ PASSWD);
+ if (lh.getLastAddConfirmed() < 0) {
+ return 0;
+ }
+ Enumeration<LedgerEntry> e = lh.readEntries(0, lh.getLastAddConfirmed());
+ int count = 0;
+ while (e.hasMoreElements()) {
+ count++;
+ assertTrue("Should match what we wrote",
+ Arrays.equals(e.nextElement().getEntry(), ENTRY));
+ }
+ return count;
+ }
+
+ /**
+ * Test an connection will authorize with a single message
+ * to the server and a single response.
+ */
+ @Test(timeout=30000)
+ public void testSingleMessageAuth() throws Exception {
+ ServerConfiguration bookieConf = newServerConfiguration();
+ bookieConf.setBookieAuthProviderFactoryClass(
+ AlwaysSucceedBookieAuthProviderFactory.class.getName());
+
+ ClientConfiguration clientConf = newClientConfiguration();
+ clientConf.setClientAuthProviderFactoryClass(
+ SendUntilCompleteClientAuthProviderFactory.class.getName());
+
+ startAndStoreBookie(bookieConf);
+
+ AtomicLong ledgerId = new AtomicLong(-1);
+ connectAndWriteToBookie(clientConf, ledgerId); // should succeed
+
+ assertFalse(ledgerId.get() == -1);
+ assertEquals("Should have entry", 1, entryCount(ledgerId.get(), bookieConf, clientConf));
+ }
+
+ /**
+ * Test that when the bookie provider sends a failure message
+ * the client will not be able to write
+ */
+ @Test(timeout=30000)
+ public void testSingleMessageAuthFailure() throws Exception {
+ ServerConfiguration bookieConf = newServerConfiguration();
+ bookieConf.setBookieAuthProviderFactoryClass(
+ AlwaysFailBookieAuthProviderFactory.class.getName());
+
+ ClientConfiguration clientConf = newClientConfiguration();
+ clientConf.setClientAuthProviderFactoryClass(
+ SendUntilCompleteClientAuthProviderFactory.class.getName());
+
+ startAndStoreBookie(bookieConf);
+
+ AtomicLong ledgerId = new AtomicLong(-1);
+ try {
+ connectAndWriteToBookie(clientConf, ledgerId); // should fail
+ fail("Shouldn't get this far");
+ } catch (BKException.BKUnauthorizedAccessException bke) {
+ // client shouldnt be able to find enough bookies to
+ // write
+ }
+ assertFalse(ledgerId.get() == -1);
+ assertEquals("Shouldn't have entry", 0, entryCount(ledgerId.get(), bookieConf, clientConf));
+ }
+
+ /**
+ * Test that authentication works when the providers
+ * exchange multiple messages
+ */
+ @Test(timeout=30000)
+ public void testMultiMessageAuth() throws Exception {
+ ServerConfiguration bookieConf = newServerConfiguration();
+ bookieConf.setBookieAuthProviderFactoryClass(
+ SucceedAfter3BookieAuthProviderFactory.class.getName());
+
+ ClientConfiguration clientConf = newClientConfiguration();
+ clientConf.setClientAuthProviderFactoryClass(
+ SendUntilCompleteClientAuthProviderFactory.class.getName());
+
+ AtomicLong ledgerId = new AtomicLong(-1);
+ startAndStoreBookie(bookieConf);
+ connectAndWriteToBookie(clientConf, ledgerId); // should succeed
+
+ assertFalse(ledgerId.get() == -1);
+ assertEquals("Should have entry", 1, entryCount(ledgerId.get(), bookieConf, clientConf));
+ }
+
+ /**
+ * Test that when the bookie provider sends a failure message
+ * the client will not be able to write
+ */
+ @Test(timeout=30000)
+ public void testMultiMessageAuthFailure() throws Exception {
+ ServerConfiguration bookieConf = newServerConfiguration();
+ bookieConf.setBookieAuthProviderFactoryClass(
+ FailAfter3BookieAuthProviderFactory.class.getName());
+
+ ClientConfiguration clientConf = newClientConfiguration();
+ clientConf.setClientAuthProviderFactoryClass(
+ SendUntilCompleteClientAuthProviderFactory.class.getName());
+
+ startAndStoreBookie(bookieConf);
+
+ AtomicLong ledgerId = new AtomicLong(-1);
+ try {
+ connectAndWriteToBookie(clientConf, ledgerId); // should fail
+ fail("Shouldn't get this far");
+ } catch (BKException.BKUnauthorizedAccessException bke) {
+ // bookie should have sent a negative response before
+ // breaking the conneciton
+ }
+ assertFalse(ledgerId.get() == -1);
+ assertEquals("Shouldn't have entry", 0, entryCount(ledgerId.get(), bookieConf, clientConf));
+ }
+
+ /**
+ * Test that when the bookie and the client have a different
+ * plugin configured, no messages will get through.
+ */
+ @Test(timeout=30000)
+ public void testDifferentPluginFailure() throws Exception {
+ ServerConfiguration bookieConf = newServerConfiguration();
+ bookieConf.setBookieAuthProviderFactoryClass(
+ DifferentPluginBookieAuthProviderFactory.class.getName());
+
+ ClientConfiguration clientConf = newClientConfiguration();
+ clientConf.setClientAuthProviderFactoryClass(
+ SendUntilCompleteClientAuthProviderFactory.class.getName());
+
+ startAndStoreBookie(bookieConf);
+ AtomicLong ledgerId = new AtomicLong(-1);
+ try {
+ connectAndWriteToBookie(clientConf, ledgerId); // should fail
+ fail("Shouldn't get this far");
+ } catch (BKException.BKUnauthorizedAccessException bke) {
+ // bookie should have sent a negative response before
+ // breaking the conneciton
+ }
+ assertFalse(ledgerId.get() == -1);
+ assertEquals("Shouldn't have entry", 0, entryCount(ledgerId.get(), bookieConf, clientConf));
+ }
+
+ /**
+ * Test that when the plugin class does exist, but
+ * doesn't implement the interface, we fail predictably
+ */
+ @Test(timeout=30000)
+ public void testExistantButNotValidPlugin() throws Exception {
+ ServerConfiguration bookieConf = newServerConfiguration();
+ bookieConf.setBookieAuthProviderFactoryClass(
+ "java.lang.String");
+
+ ClientConfiguration clientConf = newClientConfiguration();
+ clientConf.setClientAuthProviderFactoryClass(
+ "java.lang.String");
+ try {
+ startAndStoreBookie(bookieConf);
+ fail("Shouldn't get this far");
+ } catch (RuntimeException e) {
+ // received correct exception
+ assertTrue("Wrong exception thrown",
+ e.getMessage().contains("not "
+ + BookieAuthProvider.Factory.class.getName()));
+ }
+
+ try {
+ BookKeeper bkc = new BookKeeper(clientConf, zkc);
+ fail("Shouldn't get this far");
+ } catch (RuntimeException e) {
+ // received correct exception
+ assertTrue("Wrong exception thrown",
+ e.getMessage().contains("not "
+ + ClientAuthProvider.Factory.class.getName()));
+ }
+ }
+
+ /**
+ * Test that when the plugin class does not exist,
+ * the bookie will not start and the client will
+ * break.
+ */
+ @Test(timeout=30000)
+ public void testNonExistantPlugin() throws Exception {
+ ServerConfiguration bookieConf = newServerConfiguration();
+ bookieConf.setBookieAuthProviderFactoryClass(
+ "NonExistantClassNameForTestingAuthPlugins");
+
+ ClientConfiguration clientConf = newClientConfiguration();
+ clientConf.setClientAuthProviderFactoryClass(
+ "NonExistantClassNameForTestingAuthPlugins");
+ try {
+ startAndStoreBookie(bookieConf);
+ fail("Shouldn't get this far");
+ } catch (RuntimeException e) {
+ // received correct exception
+ assertEquals("Wrong exception thrown",
+ e.getCause().getClass(), ClassNotFoundException.class);
+ }
+
+ try {
+ BookKeeper bkc = new BookKeeper(clientConf, zkc);
+ fail("Shouldn't get this far");
+ } catch (RuntimeException e) {
+ // received correct exception
+ assertEquals("Wrong exception thrown",
+ e.getCause().getClass(), ClassNotFoundException.class);
+ }
+ }
+
+ /**
+ * Test that when the plugin on the bookie crashes, the client doesn't
+ * hang also, but it cannot write in any case.
+ */
+ @Test(timeout=30000)
+ public void testCrashDuringAuth() throws Exception {
+ ServerConfiguration bookieConf = newServerConfiguration();
+ bookieConf.setBookieAuthProviderFactoryClass(
+ CrashAfter3BookieAuthProviderFactory.class.getName());
+
+ ClientConfiguration clientConf = newClientConfiguration();
+ clientConf.setClientAuthProviderFactoryClass(
+ SendUntilCompleteClientAuthProviderFactory.class.getName());
+
+ startAndStoreBookie(bookieConf);
+
+ AtomicLong ledgerId = new AtomicLong(-1);
+ try {
+ connectAndWriteToBookie(clientConf, ledgerId);
+ fail("Shouldn't get this far");
+ } catch (BKException.BKNotEnoughBookiesException bke) {
+ // bookie won't respond, request will timeout, and then
+ // we wont be able to find a replacement
+ }
+ assertFalse(ledgerId.get() == -1);
+ assertEquals("Shouldn't have entry", 0, entryCount(ledgerId.get(), bookieConf, clientConf));
+ }
+
+ /**
+ * Test that when a bookie simply stops replying during auth, the client doesn't
+ * hang also, but it cannot write in any case.
+ */
+ @Test(timeout=30000)
+ public void testCrashType2DuringAuth() throws Exception {
+ ServerConfiguration bookieConf = newServerConfiguration();
+ bookieConf.setBookieAuthProviderFactoryClass(
+ CrashType2After3BookieAuthProviderFactory.class.getName());
+
+ ClientConfiguration clientConf = newClientConfiguration();
+ clientConf.setClientAuthProviderFactoryClass(
+ SendUntilCompleteClientAuthProviderFactory.class.getName());
+ crashType2bookieInstance = startAndStoreBookie(bookieConf);
+
+ AtomicLong ledgerId = new AtomicLong(-1);
+ try {
+ connectAndWriteToBookie(clientConf, ledgerId);
+ fail("Shouldn't get this far");
+ } catch (BKException.BKNotEnoughBookiesException bke) {
+ // bookie won't respond, request will timeout, and then
+ // we wont be able to find a replacement
+ }
+ assertFalse(ledgerId.get() == -1);
+ assertEquals("Shouldn't have entry", 0, entryCount(ledgerId.get(), bookieConf, clientConf));
+ }
+
+ BookieServer startAndStoreBookie(ServerConfiguration conf) throws Exception {
+ bsConfs.add(conf);
+ BookieServer s = startBookie(conf);
+ bs.add(s);
+ return s;
+ }
+
+ public static class AlwaysSucceedBookieAuthProviderFactory
+ implements BookieAuthProvider.Factory {
+ @Override
+ public String getPluginName() {
+ return TEST_AUTH_PROVIDER_PLUGIN_NAME;
+ }
+
+ @Override
+ public void init(ServerConfiguration conf, ExtensionRegistry registry) {
+ TestDataFormats.registerAllExtensions(registry);
+ }
+
+ @Override
+ public BookieAuthProvider newProvider(InetSocketAddress addr,
+ final GenericCallback<Void> completeCb) {
+ return new BookieAuthProvider() {
+ public void process(AuthMessage m, GenericCallback<AuthMessage> cb) {
+
+ AuthMessage.Builder builder
+ = AuthMessage.newBuilder()
+ .setAuthPluginName(getPluginName());
+ builder.setExtension(TestDataFormats.messageType,
+ TestDataFormats.AuthMessageType.SUCCESS_RESPONSE);
+
+ cb.operationComplete(BKException.Code.OK, builder.build());
+ completeCb.operationComplete(BKException.Code.OK, null);
+ }
+ };
+ }
+ }
+
+ public static class AlwaysFailBookieAuthProviderFactory
+ implements BookieAuthProvider.Factory {
+ @Override
+ public String getPluginName() {
+ return TEST_AUTH_PROVIDER_PLUGIN_NAME;
+ }
+
+ @Override
+ public void init(ServerConfiguration conf, ExtensionRegistry registry) {
+ TestDataFormats.registerAllExtensions(registry);
+ }
+
+ @Override
+ public BookieAuthProvider newProvider(InetSocketAddress addr,
+ final GenericCallback<Void> completeCb) {
+ return new BookieAuthProvider() {
+ public void process(AuthMessage m, GenericCallback<AuthMessage> cb) {
+ AuthMessage.Builder builder
+ = AuthMessage.newBuilder()
+ .setAuthPluginName(getPluginName());
+ builder.setExtension(TestDataFormats.messageType,
+ TestDataFormats.AuthMessageType.FAILURE_RESPONSE);
+
+ cb.operationComplete(BKException.Code.OK, builder.build());
+ completeCb.operationComplete(
+ BKException.Code.UnauthorizedAccessException, null);
+ }
+ };
+ }
+ }
+
+ private static class SendUntilCompleteClientAuthProviderFactory
+ implements ClientAuthProvider.Factory {
+
+ @Override
+ public String getPluginName() {
+ return TEST_AUTH_PROVIDER_PLUGIN_NAME;
+ }
+
+ @Override
+ public void init(ClientConfiguration conf, ExtensionRegistry registry) {
+ TestDataFormats.registerAllExtensions(registry);
+ }
+
+ @Override
+ public ClientAuthProvider newProvider(InetSocketAddress addr,
+ final GenericCallback<Void> completeCb) {
+ AuthMessage.Builder builder
+ = AuthMessage.newBuilder()
+ .setAuthPluginName(getPluginName());
+ builder.setExtension(TestDataFormats.messageType,
+ TestDataFormats.AuthMessageType.PAYLOAD_MESSAGE);
+ final AuthMessage message = builder.build();
+
+ return new ClientAuthProvider() {
+ public void init(GenericCallback<AuthMessage> cb) {
+ cb.operationComplete(BKException.Code.OK, message);
+ }
+
+ public void process(AuthMessage m, GenericCallback<AuthMessage> cb) {
+ if (m.hasExtension(TestDataFormats.messageType)) {
+ TestDataFormats.AuthMessageType type
+ = m.getExtension(TestDataFormats.messageType);
+ if (type == TestDataFormats.AuthMessageType.SUCCESS_RESPONSE) {
+ completeCb.operationComplete(BKException.Code.OK, null);
+ } else if (type == TestDataFormats.AuthMessageType.FAILURE_RESPONSE) {
+ completeCb.operationComplete(BKException.Code.UnauthorizedAccessException, null);
+ } else {
+ cb.operationComplete(BKException.Code.OK, message);
+ }
+ } else {
+ completeCb.operationComplete(BKException.Code.UnauthorizedAccessException, null);
+ }
+ }
+ };
+ }
+ }
+
+ public static class SucceedAfter3BookieAuthProviderFactory
+ implements BookieAuthProvider.Factory {
+ AtomicInteger numMessages = new AtomicInteger(0);
+
+ @Override
+ public String getPluginName() {
+ return TEST_AUTH_PROVIDER_PLUGIN_NAME;
+ }
+
+ @Override
+ public void init(ServerConfiguration conf, ExtensionRegistry registry) {
+ TestDataFormats.registerAllExtensions(registry);
+ }
+
+ @Override
+ public BookieAuthProvider newProvider(InetSocketAddress addr,
+ final GenericCallback<Void> completeCb) {
+ return new BookieAuthProvider() {
+ public void process(AuthMessage m, GenericCallback<AuthMessage> cb) {
+ AuthMessage.Builder builder
+ = AuthMessage.newBuilder()
+ .setAuthPluginName(getPluginName());
+ if (numMessages.incrementAndGet() == 3) {
+ builder.setExtension(TestDataFormats.messageType,
+ TestDataFormats.AuthMessageType.SUCCESS_RESPONSE);
+
+ cb.operationComplete(BKException.Code.OK, builder.build());
+ completeCb.operationComplete(BKException.Code.OK, null);
+ } else {
+ builder.setExtension(TestDataFormats.messageType,
+ TestDataFormats.AuthMessageType.PAYLOAD_MESSAGE);
+
+ cb.operationComplete(BKException.Code.OK, builder.build());
+ }
+ }
+ };
+ }
+ }
+
+ public static class FailAfter3BookieAuthProviderFactory
+ implements BookieAuthProvider.Factory {
+ AtomicInteger numMessages = new AtomicInteger(0);
+
+ @Override
+ public String getPluginName() {
+ return TEST_AUTH_PROVIDER_PLUGIN_NAME;
+ }
+
+ @Override
+ public void init(ServerConfiguration conf, ExtensionRegistry registry) {
+ TestDataFormats.registerAllExtensions(registry);
+ }
+
+ @Override
+ public BookieAuthProvider newProvider(InetSocketAddress addr,
+ final GenericCallback<Void> completeCb) {
+ return new BookieAuthProvider() {
+ public void process(AuthMessage m, GenericCallback<AuthMessage> cb) {
+ AuthMessage.Builder builder
+ = AuthMessage.newBuilder()
+ .setAuthPluginName(getPluginName());
+ if (numMessages.incrementAndGet() == 3) {
+ builder.setExtension(TestDataFormats.messageType,
+ TestDataFormats.AuthMessageType.FAILURE_RESPONSE);
+
+ cb.operationComplete(BKException.Code.OK, builder.build());
+ completeCb.operationComplete(BKException.Code.UnauthorizedAccessException,
+ null);
+ } else {
+ builder.setExtension(TestDataFormats.messageType,
+ TestDataFormats.AuthMessageType.PAYLOAD_MESSAGE);
+
+ cb.operationComplete(BKException.Code.OK, builder.build());
+ }
+ }
+ };
+ }
+ }
+
+ public static class CrashAfter3BookieAuthProviderFactory
+ implements BookieAuthProvider.Factory {
+ AtomicInteger numMessages = new AtomicInteger(0);
+
+ @Override
+ public String getPluginName() {
+ return TEST_AUTH_PROVIDER_PLUGIN_NAME;
+ }
+
+ @Override
+ public void init(ServerConfiguration conf, ExtensionRegistry registry) {
+ TestDataFormats.registerAllExtensions(registry);
+ }
+
+ @Override
+ public BookieAuthProvider newProvider(InetSocketAddress addr,
+ final GenericCallback<Void> completeCb) {
+ return new BookieAuthProvider() {
+ public void process(AuthMessage m, GenericCallback<AuthMessage> cb) {
+ AuthMessage.Builder builder
+ = AuthMessage.newBuilder()
+ .setAuthPluginName(getPluginName());
+ if (numMessages.incrementAndGet() == 3) {
+ throw new RuntimeException("Do bad things to the bookie");
+ } else {
+ builder.setExtension(TestDataFormats.messageType,
+ TestDataFormats.AuthMessageType.PAYLOAD_MESSAGE);
+
+ cb.operationComplete(BKException.Code.OK, builder.build());
+ }
+ }
+ };
+ }
+ }
+
+ private static BookieServer crashType2bookieInstance = null;
+ public static class CrashType2After3BookieAuthProviderFactory
+ implements BookieAuthProvider.Factory {
+ AtomicInteger numMessages = new AtomicInteger(0);
+
+ @Override
+ public String getPluginName() {
+ return TEST_AUTH_PROVIDER_PLUGIN_NAME;
+ }
+
+ @Override
+ public void init(ServerConfiguration conf, ExtensionRegistry registry) {
+ TestDataFormats.registerAllExtensions(registry);
+ }
+
+ @Override
+ public BookieAuthProvider newProvider(InetSocketAddress addr,
+ final GenericCallback<Void> completeCb) {
+ return new BookieAuthProvider() {
+ public void process(AuthMessage m, GenericCallback<AuthMessage> cb) {
+ AuthMessage.Builder builder
+ = AuthMessage.newBuilder()
+ .setAuthPluginName(getPluginName());
+ if (numMessages.incrementAndGet() != 3) {
+ builder.setExtension(TestDataFormats.messageType,
+ TestDataFormats.AuthMessageType.PAYLOAD_MESSAGE);
+ cb.operationComplete(BKException.Code.OK, builder.build());
+ return;
+ }
+
+ crashType2bookieInstance.suspendProcessing();
+ }
+ };
+ }
+ }
+
+ public static class DifferentPluginBookieAuthProviderFactory
+ implements BookieAuthProvider.Factory {
+ @Override
+ public String getPluginName() {
+ return "DifferentAuthProviderPlugin";
+ }
+
+ @Override
+ public void init(ServerConfiguration conf, ExtensionRegistry registry) {
+ TestDataFormats.registerAllExtensions(registry);
+ }
+
+ @Override
+ public BookieAuthProvider newProvider(InetSocketAddress addr,
+ final GenericCallback<Void> completeCb) {
+ return new BookieAuthProvider() {
+ public void process(AuthMessage m, GenericCallback<AuthMessage> cb) {
+
+ AuthMessage.Builder builder
+ = AuthMessage.newBuilder()
+ .setAuthPluginName(getPluginName());
+ builder.setExtension(TestDataFormats.messageType,
+ TestDataFormats.AuthMessageType.FAILURE_RESPONSE);
+
+ cb.operationComplete(BKException.Code.OK, builder.build());
+ completeCb.operationComplete(BKException.Code.OK, null);
+ }
+ };
+ }
+ }
+
+}
[3/3] bookkeeper git commit: BOOKKEEPER-901: Authentication framework
Posted by si...@apache.org.
BOOKKEEPER-901: Authentication framework
Author: Ivan Kelly <iv...@yahoo-inc.com>
Reviewers: Sijie Guo<si...@apache.org>
Closes #23 from merlimat/authentication-framework and squashes the following commits:
aa01548 [Ivan Kelly] BOOKKEEPER-901: Add an authentication framework
f930fbd [Ivan Kelly] BOOKKEEPER-794 BookkeeperProtocol.Response.status is completely ignored
Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/b1c12c0f
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/b1c12c0f
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/b1c12c0f
Branch: refs/heads/master
Commit: b1c12c0f41b7c27b2452fef311f12077d771f431
Parents: e32c388
Author: Ivan Kelly <iv...@yahoo-inc.com>
Authored: Mon Apr 4 23:45:51 2016 -0700
Committer: Sijie Guo <si...@apache.org>
Committed: Mon Apr 4 23:45:51 2016 -0700
----------------------------------------------------------------------
bookkeeper-server/pom.xml | 9 +
.../auth/AuthProviderFactoryFactory.java | 111 +++
.../bookkeeper/auth/BookieAuthProvider.java | 83 ++
.../bookkeeper/auth/ClientAuthProvider.java | 89 +++
.../apache/bookkeeper/client/PendingAddOp.java | 4 +-
.../bookkeeper/conf/ClientConfiguration.java | 53 +-
.../bookkeeper/conf/ServerConfiguration.java | 24 +
.../apache/bookkeeper/proto/AuthHandler.java | 356 +++++++++
.../apache/bookkeeper/proto/BookieClient.java | 32 +-
.../bookkeeper/proto/BookieNettyServer.java | 21 +-
.../bookkeeper/proto/BookieProtoEncoding.java | 125 ++-
.../apache/bookkeeper/proto/BookieProtocol.java | 36 +
.../proto/BookieRequestProcessor.java | 1 +
.../bookkeeper/proto/BookkeeperProtocol.java | 787 ++++++++++++++++++-
.../proto/PerChannelBookieClient.java | 70 +-
.../src/main/proto/BookkeeperProtocol.proto | 14 +-
.../org/apache/bookkeeper/auth/TestAuth.java | 654 +++++++++++++++
.../proto/TestBackwardCompatCMS42.java | 239 ++++++
.../bookkeeper/proto/TestDataFormats.java | 126 +++
.../proto/TestPerChannelBookieClient.java | 25 +-
.../test/BookKeeperClusterTestCase.java | 10 +
.../src/test/proto/TestDataFormats.proto | 34 +
22 files changed, 2803 insertions(+), 100 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/pom.xml
----------------------------------------------------------------------
diff --git a/bookkeeper-server/pom.xml b/bookkeeper-server/pom.xml
index a1a74e0..eb67fd8 100644
--- a/bookkeeper-server/pom.xml
+++ b/bookkeeper-server/pom.xml
@@ -262,6 +262,7 @@
<!-- exclude generated file //-->
<exclude>**/DataFormats.java</exclude>
<exclude>**/BookkeeperProtocol.java</exclude>
+ <exclude>**/TestDataFormats.java</exclude>
</excludes>
</configuration>
</plugin>
@@ -324,6 +325,14 @@
<arg value="--java_out=src/main/java" />
<arg value="src/main/proto/DataFormats.proto" />
</exec>
+ <exec executable="protoc" failonerror="true">
+ <arg value="--java_out=src/main/java" />
+ <arg value="src/main/proto/BookkeeperProtocol.proto" />
+ </exec>
+ <exec executable="protoc" failonerror="true">
+ <arg value="--java_out=src/test/java" />
+ <arg value="src/test/proto/TestDataFormats.proto" />
+ </exec>
</target>
</configuration>
<goals>
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/AuthProviderFactoryFactory.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/AuthProviderFactoryFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/AuthProviderFactoryFactory.java
new file mode 100644
index 0000000..d05c475
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/AuthProviderFactoryFactory.java
@@ -0,0 +1,111 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.auth;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage;
+import org.apache.bookkeeper.util.ReflectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.protobuf.ExtensionRegistry;
+
+
+
+public class AuthProviderFactoryFactory {
+ static Logger LOG = LoggerFactory.getLogger(AuthProviderFactoryFactory.class);
+
+ public static BookieAuthProvider.Factory newBookieAuthProviderFactory(ServerConfiguration conf,
+ ExtensionRegistry registry) throws IOException {
+ String factoryClassName = conf.getBookieAuthProviderFactoryClass();
+
+ if (factoryClassName == null || factoryClassName.length() == 0) {
+ return new NullBookieAuthProviderFactory();
+ }
+
+ BookieAuthProvider.Factory factory = ReflectionUtils.newInstance(factoryClassName,
+ BookieAuthProvider.Factory.class);
+ factory.init(conf, registry);
+ return factory;
+ }
+
+ public static ClientAuthProvider.Factory newClientAuthProviderFactory(ClientConfiguration conf,
+ ExtensionRegistry registry) throws IOException {
+ String factoryClassName = conf.getClientAuthProviderFactoryClass();
+
+ if (factoryClassName == null || factoryClassName.length() == 0) {
+ return new NullClientAuthProviderFactory();
+ }
+
+ ClientAuthProvider.Factory factory = ReflectionUtils.newInstance(factoryClassName,
+ ClientAuthProvider.Factory.class);
+ factory.init(conf, registry);
+ return factory;
+ }
+
+ private final static String nullPluginName = "NULLPlugin";
+
+ private static class NullBookieAuthProviderFactory implements BookieAuthProvider.Factory {
+ @Override
+ public String getPluginName() {
+ return nullPluginName;
+ }
+
+ @Override
+ public void init(ServerConfiguration conf, ExtensionRegistry registry) {}
+
+ @Override
+ public BookieAuthProvider newProvider(InetSocketAddress addr,
+ GenericCallback<Void> completeCb) {
+ completeCb.operationComplete(BKException.Code.OK, null);
+ return new BookieAuthProvider() {
+ public void process(AuthMessage m, GenericCallback<AuthMessage> cb) {}
+ };
+ }
+ }
+
+ private static class NullClientAuthProviderFactory implements ClientAuthProvider.Factory {
+ @Override
+ public String getPluginName() {
+ return nullPluginName;
+ }
+
+ @Override
+ public void init(ClientConfiguration conf, ExtensionRegistry registry) {}
+
+ @Override
+ public ClientAuthProvider newProvider(InetSocketAddress addr,
+ GenericCallback<Void> completeCb) {
+ completeCb.operationComplete(BKException.Code.OK, null);
+ return new ClientAuthProvider() {
+ public void init(GenericCallback<AuthMessage> cb) {}
+ public void process(AuthMessage m, GenericCallback<AuthMessage> cb) {}
+ };
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/BookieAuthProvider.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/BookieAuthProvider.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/BookieAuthProvider.java
new file mode 100644
index 0000000..4fb7d07
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/BookieAuthProvider.java
@@ -0,0 +1,83 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.auth;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage;
+
+import com.google.protobuf.ExtensionRegistry;
+
+/**
+ * Bookie authentication provider interface.
+ * This must be implemented by any party wishing to implement
+ * an authentication mechanism for bookkeeper connections.
+ */
+public interface BookieAuthProvider {
+ interface Factory {
+ /**
+ * Initialize the factory with the server configuration
+ * and protobuf message registry. Implementors must
+ * add any extention messages which contain the auth
+ * payload, so that the server can decode auth messages
+ * it receives from the client.
+ */
+ void init(ServerConfiguration conf,
+ ExtensionRegistry registry) throws IOException;
+
+ /**
+ * Create a new instance of a bookie auth provider.
+ * Each connection should get its own instance, as they
+ * can hold connection specific state.
+ * The completeCb is used to notify the server that
+ * the authentication handshake is complete.
+ * CompleteCb should be called only once.
+ * If the authentication was successful, BKException.Code.OK
+ * should be passed as the return code. Otherwise, another
+ * error code should be passed.
+ * If authentication fails, the server will close the
+ * connection.
+ * @param addr the address of the client being authenticated
+ * @param completeCb callback to be notified when authentication
+ * is complete.
+ */
+ BookieAuthProvider newProvider(InetSocketAddress addr,
+ GenericCallback<Void> completeCb);
+
+ /**
+ * Get Auth provider plugin name.
+ * Used as a sanity check to ensure that the bookie and the client.
+ * are using the same auth provider.
+ */
+ String getPluginName();
+ }
+
+ /**
+ * Process a request from the client. cb will receive the next
+ * message to be sent to the client. If there are no more messages
+ * to send to the client, cb should not be called, and completeCb
+ * must be called instead.
+ */
+ void process(AuthMessage m, GenericCallback<AuthMessage> cb);
+}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/ClientAuthProvider.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/ClientAuthProvider.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/ClientAuthProvider.java
new file mode 100644
index 0000000..fba2264
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/ClientAuthProvider.java
@@ -0,0 +1,89 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.auth;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage;
+
+import com.google.protobuf.ExtensionRegistry;
+
+/**
+ * Client authentication provider interface.
+ * This must be implemented by any party wishing to implement
+ * an authentication mechanism for bookkeeper connections.
+ */
+public interface ClientAuthProvider {
+ interface Factory {
+ /**
+ * Initialize the factory with the client configuration
+ * and protobuf message registry. Implementors must
+ * add any extention messages which contain the auth
+ * payload, so that the client can decode auth messages
+ * it receives from the server.
+ */
+ void init(ClientConfiguration conf,
+ ExtensionRegistry registry) throws IOException;
+
+ /**
+ * Create a new instance of a client auth provider.
+ * Each connection should get its own instance, as they
+ * can hold connection specific state.
+ * The completeCb is used to notify the client that
+ * the authentication handshake is complete.
+ * CompleteCb should be called only once.
+ * If the authentication was successful, BKException.Code.OK
+ * should be passed as the return code. Otherwise, another
+ * error code should be passed.
+ * @param addr the address of the socket being authenticated
+ * @param completeCb callback to be notified when authentication
+ * is complete.
+ */
+ ClientAuthProvider newProvider(InetSocketAddress addr,
+ GenericCallback<Void> completeCb);
+
+ /**
+ * Get Auth provider plugin name.
+ * Used as a sanity check to ensure that the bookie and the client.
+ * are using the same auth provider.
+ */
+ String getPluginName();
+ }
+
+ /**
+ * Initiate the authentication. cb will receive the initial
+ * authentication message which should be sent to the server.
+ * cb may not be called if authentication is not requires. In
+ * this case, completeCb should be called.
+ */
+ void init(GenericCallback<AuthMessage> cb);
+
+ /**
+ * Process a response from the server. cb will receive the next
+ * message to be sent to the server. If there are no more messages
+ * to send to the server, cb should not be called, and completeCb
+ * must be called instead.
+ */
+ void process(AuthMessage m, GenericCallback<AuthMessage> cb);
+}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
index bc487f6..1946069 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
@@ -208,8 +208,8 @@ class PendingAddOp implements WriteCallback, TimerTask {
lh.handleUnrecoverableErrorDuringAdd(rc);
return;
default:
- LOG.warn("Write did not succeed: L{} E{} on {}",
- new Object[] { ledgerId, entryId, addr });
+ LOG.warn("Write did not succeed: L{} E{} on {}, rc = {}",
+ new Object[] { ledgerId, entryId, addr, rc });
lh.handleBookieFailure(addr, bookieIndex);
return;
}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
index d0750d3..b8d738b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
@@ -81,6 +81,9 @@ public class ClientConfiguration extends AbstractConfiguration {
protected final static String ENABLE_TASK_EXECUTION_STATS = "enableTaskExecutionStats";
protected final static String TASK_EXECUTION_WARN_TIME_MICROS = "taskExecutionWarnTimeMicros";
+ // Client auth provider factory class name
+ protected final static String CLIENT_AUTH_PROVIDER_FACTORY_CLASS = "clientAuthProviderFactoryClass";
+
/**
* Construct a default client-side configuration
*/
@@ -700,7 +703,7 @@ public class ClientConfiguration extends AbstractConfiguration {
/**
* Check if bookie health check is enabled.
- *
+ *
* @return
*/
public boolean isBookieHealthCheckEnabled() {
@@ -709,15 +712,15 @@ public class ClientConfiguration extends AbstractConfiguration {
/**
* Enables the bookie health check.
- *
+ *
* <p>
* If the number of read/write errors for a bookie exceeds {@link #getBookieErrorThresholdPerInterval()} per
* interval, that bookie is quarantined for {@link #getBookieQuarantineTimeSeconds()} seconds. During this
* quarantined period, the client will try not to use this bookie when creating new ensembles.
* </p>
- *
+ *
* By default, the bookie health check is <b>disabled</b>.
- *
+ *
* @return client configuration
*/
public ClientConfiguration enableBookieHealthCheck() {
@@ -727,7 +730,7 @@ public class ClientConfiguration extends AbstractConfiguration {
/**
* Get the bookie health check interval in seconds.
- *
+ *
* @return
*/
public int getBookieHealthCheckIntervalSeconds() {
@@ -736,11 +739,11 @@ public class ClientConfiguration extends AbstractConfiguration {
/**
* Set the bookie health check interval. Default is 60 seconds.
- *
+ *
* <p>
* Note: Please {@link #enableBookieHealthCheck()} to use this configuration.
* </p>
- *
+ *
* @param interval
* @param unit
* @return client configuration
@@ -752,7 +755,7 @@ public class ClientConfiguration extends AbstractConfiguration {
/**
* Get the error threshold for a bookie to be quarantined.
- *
+ *
* @return
*/
public long getBookieErrorThresholdPerInterval() {
@@ -762,11 +765,11 @@ public class ClientConfiguration extends AbstractConfiguration {
/**
* Set the error threshold per interval ({@link #getBookieHealthCheckIntervalSeconds()}) for a bookie before it is
* quarantined. Default is 100 errors per minute.
- *
+ *
* <p>
* Note: Please {@link #enableBookieHealthCheck()} to use this configuration.
* </p>
- *
+ *
* @param threshold
* @param unit
* @return client configuration
@@ -778,7 +781,7 @@ public class ClientConfiguration extends AbstractConfiguration {
/**
* Get the time for which a bookie will be quarantined.
- *
+ *
* @return
*/
public int getBookieQuarantineTimeSeconds() {
@@ -787,11 +790,11 @@ public class ClientConfiguration extends AbstractConfiguration {
/**
* Set the time for which a bookie will be quarantined. Default is 30 minutes.
- *
+ *
* <p>
* Note: Please {@link #enableBookieHealthCheck()} to use this configuration.
* </p>
- *
+ *
* @param quarantineTime
* @param unit
* @return client configuration
@@ -800,4 +803,28 @@ public class ClientConfiguration extends AbstractConfiguration {
setProperty(BOOKIE_QUARANTINE_TIME_SECONDS, unit.toSeconds(quarantineTime));
return this;
}
+
+ /**
+ * Set the client authentication provider factory class name.
+ * If this is not set, no authentication will be used
+ *
+ * @param factoryClass
+ * the client authentication provider factory class name
+ * @return client configuration
+ */
+ public ClientConfiguration setClientAuthProviderFactoryClass(
+ String factoryClass) {
+ setProperty(CLIENT_AUTH_PROVIDER_FACTORY_CLASS, factoryClass);
+ return this;
+ }
+
+ /**
+ * Get the client authentication provider factory class name. If this returns null, no authentication will take
+ * place.
+ *
+ * @return the client authentication provider factory class name or null.
+ */
+ public String getClientAuthProviderFactoryClass() {
+ return getString(CLIENT_AUTH_PROVIDER_FACTORY_CLASS, null);
+ }
}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index 76e5037..d770650 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -122,6 +122,9 @@ public class ServerConfiguration extends AbstractConfiguration {
protected final static String LEDGER_STORAGE_CLASS = "ledgerStorageClass";
+ // Bookie auth provider factory class name
+ protected final static String BOOKIE_AUTH_PROVIDER_FACTORY_CLASS = "bookieAuthProviderFactoryClass";
+
/**
* Construct a default configuration object
*/
@@ -1566,4 +1569,25 @@ public class ServerConfiguration extends AbstractConfiguration {
}
}
+ /*
+ * Set the bookie authentication provider factory class name.
+ * If this is not set, no authentication will be used
+ *
+ * @param factoryClass
+ * the bookie authentication provider factory class name
+ * @return void
+ */
+ public void setBookieAuthProviderFactoryClass(String factoryClass) {
+ setProperty(BOOKIE_AUTH_PROVIDER_FACTORY_CLASS, factoryClass);
+ }
+
+ /**
+ * Get the bookie authentication provider factory class name.
+ * If this returns null, no authentication will take place.
+ *
+ * @return the bookie authentication provider factory class name or null.
+ */
+ public String getBookieAuthProviderFactoryClass() {
+ return getString(BOOKIE_AUTH_PROVIDER_FACTORY_CLASS, null);
+ }
}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
new file mode 100644
index 0000000..522bc0b
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
@@ -0,0 +1,356 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.proto;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.jboss.netty.channel.SimpleChannelHandler;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.DefaultExceptionEvent;
+import org.jboss.netty.channel.ExceptionEvent;
+
+import org.apache.bookkeeper.auth.BookieAuthProvider;
+import org.apache.bookkeeper.auth.ClientAuthProvider;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.AddRequest;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.AddResponse;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.OperationType;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class AuthHandler {
+ static final Logger LOG = LoggerFactory.getLogger(AuthHandler.class);
+
+ static class ServerSideHandler extends SimpleChannelHandler {
+ volatile boolean authenticated = false;
+ final BookieAuthProvider.Factory authProviderFactory;
+ BookieAuthProvider authProvider;
+
+ ServerSideHandler(BookieAuthProvider.Factory authProviderFactory) {
+ this.authProviderFactory = authProviderFactory;
+ authProvider = null;
+ }
+
+ @Override
+ public void channelOpen(ChannelHandlerContext ctx,
+ ChannelStateEvent e) throws Exception {
+ LOG.info("Channel open {}", ctx.getChannel());
+ SocketAddress remote = ctx.getChannel().getRemoteAddress();
+ if (remote instanceof InetSocketAddress) {
+ authProvider = authProviderFactory.newProvider((InetSocketAddress)remote,
+ new AuthHandshakeCompleteCallback());
+ } else {
+ LOG.error("Unknown socket type {} for {}", remote.getClass(), remote);
+ }
+ super.channelOpen(ctx, e);
+ }
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx,
+ MessageEvent e)
+ throws Exception {
+ if (authProvider == null) {
+ // close the channel, authProvider should only be
+ // null if the other end of line is an InetSocketAddress
+ // anything else is strange, and we don't want to deal
+ // with it
+ ctx.getChannel().close();
+ return;
+ }
+
+ Object event = e.getMessage();
+ if (authenticated) {
+ super.messageReceived(ctx, e);
+ } else if (event instanceof BookieProtocol.AuthRequest) { // pre-PB-client
+ BookieProtocol.AuthRequest req = (BookieProtocol.AuthRequest)event;
+ assert (req.getOpCode() == BookieProtocol.AUTH);
+ if (checkAuthPlugin(req.getAuthMessage(), ctx.getChannel())) {
+ authProvider.process(req.getAuthMessage(),
+ new AuthResponseCallbackLegacy(req, ctx.getChannel()));
+ } else {
+ ctx.getChannel().close();
+ }
+ } else if (event instanceof BookieProtocol.Request) {
+ BookieProtocol.Request req = (BookieProtocol.Request)event;
+ if (req.getOpCode() == BookieProtocol.ADDENTRY) {
+ ctx.getChannel().write(
+ new BookieProtocol.AddResponse(
+ req.getProtocolVersion(), BookieProtocol.EUA,
+ req.getLedgerId(), req.getEntryId()));
+ } else if (req.getOpCode() == BookieProtocol.READENTRY) {
+ ctx.getChannel().write(
+ new BookieProtocol.ReadResponse(
+ req.getProtocolVersion(), BookieProtocol.EUA,
+ req.getLedgerId(), req.getEntryId()));
+ } else {
+ ctx.getChannel().close();
+ }
+ } else if (event instanceof BookkeeperProtocol.Request) { // post-PB-client
+ BookkeeperProtocol.Request req = (BookkeeperProtocol.Request)event;
+ if (req.getHeader().getOperation() == BookkeeperProtocol.OperationType.AUTH
+ && req.hasAuthRequest()
+ && checkAuthPlugin(req.getAuthRequest(), ctx.getChannel())) {
+ authProvider.process(req.getAuthRequest(),
+ new AuthResponseCallback(req, ctx.getChannel()));
+ } else {
+ BookkeeperProtocol.Response.Builder builder
+ = BookkeeperProtocol.Response.newBuilder()
+ .setHeader(req.getHeader())
+ .setStatus(BookkeeperProtocol.StatusCode.EUA);
+
+ ctx.getChannel().write(builder.build());
+ }
+ } else {
+ // close the channel, junk coming over it
+ ctx.getChannel().close();
+ }
+ }
+
+ private boolean checkAuthPlugin(AuthMessage am, final Channel src) {
+ if (!am.hasAuthPluginName()
+ || !am.getAuthPluginName().equals(authProviderFactory.getPluginName())) {
+ LOG.error("Received message from incompatible auth plugin. Local = {},"
+ + " Remote = {}, Channel = {}",
+ authProviderFactory.getPluginName(), am.getAuthPluginName());
+ return false;
+ }
+ return true;
+ }
+
+ static class AuthResponseCallbackLegacy implements GenericCallback<AuthMessage> {
+ final BookieProtocol.AuthRequest req;
+ final Channel channel;
+
+ AuthResponseCallbackLegacy(BookieProtocol.AuthRequest req, Channel channel) {
+ this.req = req;
+ this.channel = channel;
+ }
+
+ public void operationComplete(int rc, AuthMessage newam) {
+ if (rc != BKException.Code.OK) {
+ LOG.error("Error processing auth message, closing connection");
+ channel.close();
+ return;
+ }
+ channel.write(new BookieProtocol.AuthResponse(req.getProtocolVersion(),
+ newam));
+ }
+ }
+
+ static class AuthResponseCallback implements GenericCallback<AuthMessage> {
+ final BookkeeperProtocol.Request req;
+ final Channel channel;
+
+ AuthResponseCallback(BookkeeperProtocol.Request req, Channel channel) {
+ this.req = req;
+ this.channel = channel;
+ }
+
+ public void operationComplete(int rc, AuthMessage newam) {
+ BookkeeperProtocol.Response.Builder builder
+ = BookkeeperProtocol.Response.newBuilder()
+ .setHeader(req.getHeader());
+
+ if (rc != BKException.Code.OK) {
+ LOG.error("Error processing auth message, closing connection");
+
+ builder.setStatus(BookkeeperProtocol.StatusCode.EUA);
+ channel.write(builder.build());
+ channel.close();
+ return;
+ } else {
+ builder.setStatus(BookkeeperProtocol.StatusCode.EOK)
+ .setAuthResponse(newam);
+ channel.write(builder.build());
+ }
+ }
+ }
+
+ class AuthHandshakeCompleteCallback implements GenericCallback<Void> {
+ @Override
+ public void operationComplete(int rc, Void v) {
+ if (rc == BKException.Code.OK) {
+ authenticated = true;
+ } else {
+ LOG.debug("Authentication failed on server side");
+ }
+ }
+ }
+ }
+
+ static class ClientSideHandler extends SimpleChannelHandler {
+ volatile boolean authenticated = false;
+ final ClientAuthProvider.Factory authProviderFactory;
+ ClientAuthProvider authProvider;
+ AtomicLong transactionIdGenerator;
+ Queue<MessageEvent> waitingForAuth = new ConcurrentLinkedQueue<MessageEvent>();
+
+ ClientSideHandler(ClientAuthProvider.Factory authProviderFactory,
+ AtomicLong transactionIdGenerator) {
+ this.authProviderFactory = authProviderFactory;
+ this.transactionIdGenerator = transactionIdGenerator;
+ authProvider = null;
+ }
+
+ @Override
+ public void channelConnected(ChannelHandlerContext ctx,
+ ChannelStateEvent e)
+ throws Exception {
+ SocketAddress remote = ctx.getChannel().getRemoteAddress();
+ if (remote instanceof InetSocketAddress) {
+ authProvider = authProviderFactory.newProvider((InetSocketAddress)remote,
+ new AuthHandshakeCompleteCallback(ctx));
+ authProvider.init(new AuthRequestCallback(ctx));
+ } else {
+ LOG.error("Unknown socket type {} for {}", remote.getClass(), remote);
+ }
+ super.channelConnected(ctx, e);
+ }
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx,
+ MessageEvent e)
+ throws Exception {
+ assert (authProvider != null);
+
+ Object event = e.getMessage();
+
+ if (authenticated) {
+ super.messageReceived(ctx, e);
+ } else if (event instanceof BookkeeperProtocol.Response) {
+ BookkeeperProtocol.Response resp = (BookkeeperProtocol.Response)event;
+ if (resp.getHeader().getOperation() == BookkeeperProtocol.OperationType.AUTH) {
+ if (resp.getStatus() != BookkeeperProtocol.StatusCode.EOK) {
+ authenticationError(ctx, resp.getStatus().getNumber());
+ } else {
+ assert (resp.hasAuthResponse());
+ BookkeeperProtocol.AuthMessage am = resp.getAuthResponse();
+ authProvider.process(am, new AuthRequestCallback(ctx));
+ }
+ } else {
+ // else just drop the message,
+ // we're not authenticated so nothing should be coming through
+ }
+ }
+ }
+
+ @Override
+ public void writeRequested(ChannelHandlerContext ctx,
+ MessageEvent e)
+ throws Exception {
+ synchronized (waitingForAuth) {
+ if (authenticated) {
+ super.writeRequested(ctx, e);
+ } else if (e.getMessage() instanceof BookkeeperProtocol.Request) {
+ // let auth messages through, queue the rest
+ BookkeeperProtocol.Request req = (BookkeeperProtocol.Request)e.getMessage();
+ if (req.getHeader().getOperation()
+ == BookkeeperProtocol.OperationType.AUTH) {
+ super.writeRequested(ctx, e);
+ } else {
+ waitingForAuth.add(e);
+ }
+ } // else just drop
+ }
+ }
+
+ long newTxnId() {
+ return transactionIdGenerator.incrementAndGet();
+ }
+
+ void authenticationError(ChannelHandlerContext ctx, int errorCode) {
+ LOG.error("Error processing auth message, erroring connection {}", errorCode);
+ ctx.sendUpstream(new DefaultExceptionEvent(ctx.getChannel(),
+ new AuthenticationException(
+ "Auth failed with error " + errorCode)));
+ }
+
+ class AuthRequestCallback implements GenericCallback<AuthMessage> {
+ Channel channel;
+ ChannelHandlerContext ctx;
+
+ AuthRequestCallback(ChannelHandlerContext ctx) {
+ this.channel = ctx.getChannel();
+ this.ctx = ctx;
+ }
+
+ public void operationComplete(int rc, AuthMessage newam) {
+ if (rc != BKException.Code.OK) {
+ authenticationError(ctx, rc);
+ return;
+ }
+
+ BookkeeperProtocol.BKPacketHeader header
+ = BookkeeperProtocol.BKPacketHeader.newBuilder()
+ .setVersion(BookkeeperProtocol.ProtocolVersion.VERSION_THREE)
+ .setOperation(BookkeeperProtocol.OperationType.AUTH)
+ .setTxnId(newTxnId()).build();
+ BookkeeperProtocol.Request.Builder builder
+ = BookkeeperProtocol.Request.newBuilder()
+ .setHeader(header)
+ .setAuthRequest(newam);
+
+ channel.write(builder.build());
+ }
+ }
+
+ class AuthHandshakeCompleteCallback implements GenericCallback<Void> {
+ ChannelHandlerContext ctx;
+ AuthHandshakeCompleteCallback(ChannelHandlerContext ctx) {
+ this.ctx = ctx;
+ }
+
+ @Override
+ public void operationComplete(int rc, Void v) {
+ if (rc == BKException.Code.OK) {
+ synchronized (waitingForAuth) {
+ authenticated = true;
+ MessageEvent e = waitingForAuth.poll();
+ while (e != null) {
+ ctx.sendDownstream(e);
+ e = waitingForAuth.poll();
+ }
+ }
+ } else {
+ authenticationError(ctx, rc);
+ LOG.debug("Authentication failed on server side");
+ }
+ }
+ }
+ }
+
+ static class AuthenticationException extends IOException {
+ AuthenticationException(String reason) {
+ super(reason);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
index 8a79547..d0052d6 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
@@ -28,8 +28,11 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.bookkeeper.auth.AuthProviderFactoryFactory;
+import org.apache.bookkeeper.auth.ClientAuthProvider;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.net.BookieSocketAddress;
@@ -52,6 +55,7 @@ import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.protobuf.ExtensionRegistry;
/**
* Implements the client-side part of the BookKeeper protocol.
@@ -60,11 +64,18 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
public class BookieClient implements PerChannelBookieClientFactory {
static final Logger LOG = LoggerFactory.getLogger(BookieClient.class);
- final OrderedSafeExecutor executor;
- final ClientSocketChannelFactory channelFactory;
+ // This is global state that should be across all BookieClients
+ AtomicLong totalBytesOutstanding = new AtomicLong();
+
+ OrderedSafeExecutor executor;
+ ClientSocketChannelFactory channelFactory;
final ConcurrentHashMap<BookieSocketAddress, PerChannelBookieClientPool> channels =
new ConcurrentHashMap<BookieSocketAddress, PerChannelBookieClientPool>();
final HashedWheelTimer requestTimer;
+
+ final private ClientAuthProvider.Factory authProviderFactory;
+ final private ExtensionRegistry registry;
+
private final ClientConfiguration conf;
private volatile boolean closed;
private final ReentrantReadWriteLock closeLock;
@@ -73,17 +84,22 @@ public class BookieClient implements PerChannelBookieClientFactory {
private final long bookieErrorThresholdPerInterval;
- public BookieClient(ClientConfiguration conf, ClientSocketChannelFactory channelFactory, OrderedSafeExecutor executor) {
+ public BookieClient(ClientConfiguration conf, ClientSocketChannelFactory channelFactory,
+ OrderedSafeExecutor executor) throws IOException {
this(conf, channelFactory, executor, NullStatsLogger.INSTANCE);
}
- public BookieClient(ClientConfiguration conf, ClientSocketChannelFactory channelFactory, OrderedSafeExecutor executor,
- StatsLogger statsLogger) {
+ public BookieClient(ClientConfiguration conf, ClientSocketChannelFactory channelFactory,
+ OrderedSafeExecutor executor, StatsLogger statsLogger) throws IOException {
this.conf = conf;
this.channelFactory = channelFactory;
this.executor = executor;
this.closed = false;
this.closeLock = new ReentrantReadWriteLock();
+
+ this.registry = ExtensionRegistry.newInstance();
+ this.authProviderFactory = AuthProviderFactoryFactory.newClientAuthProviderFactory(conf, registry);
+
this.statsLogger = statsLogger;
this.numConnectionsPerBookie = conf.getNumChannelsPerBookie();
this.requestTimer = new HashedWheelTimer(
@@ -120,8 +136,8 @@ public class BookieClient implements PerChannelBookieClientFactory {
@Override
public PerChannelBookieClient create(BookieSocketAddress address, PerChannelBookieClientPool pcbcPool) {
- return new PerChannelBookieClient(conf, executor, channelFactory, address,
- requestTimer, statsLogger, pcbcPool);
+ return new PerChannelBookieClient(conf, executor, channelFactory, address, requestTimer, statsLogger,
+ authProviderFactory, registry, pcbcPool);
}
private PerChannelBookieClientPool lookupClient(BookieSocketAddress addr, Object key) {
@@ -133,7 +149,7 @@ public class BookieClient implements PerChannelBookieClientFactory {
return null;
}
PerChannelBookieClientPool newClientPool =
- new DefaultPerChannelBookieClientPool(this, addr, numConnectionsPerBookie);
+ new DefaultPerChannelBookieClientPool(this, addr, numConnectionsPerBookie);
PerChannelBookieClientPool oldClientPool = channels.putIfAbsent(addr, newClientPool);
if (null == oldClientPool) {
clientPool = newClientPool;
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
index b623998..bb1b207 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
@@ -28,6 +28,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.auth.BookieAuthProvider;
+import org.apache.bookkeeper.auth.AuthProviderFactoryFactory;
import org.apache.bookkeeper.processor.RequestProcessor;
import org.apache.zookeeper.KeeperException;
import org.jboss.netty.bootstrap.ServerBootstrap;
@@ -48,6 +50,7 @@ import org.jboss.netty.handler.codec.frame.LengthFieldPrepender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.protobuf.ExtensionRegistry;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -66,11 +69,21 @@ class BookieNettyServer {
Object suspensionLock = new Object();
boolean suspended = false;
+ final BookieAuthProvider.Factory authProviderFactory;
+ final BookieProtoEncoding.ResponseEncoder responseEncoder;
+ final BookieProtoEncoding.RequestDecoder requestDecoder;
+
BookieNettyServer(ServerConfiguration conf, RequestProcessor processor)
throws IOException, KeeperException, InterruptedException, BookieException {
this.conf = conf;
this.requestProcessor = processor;
+ ExtensionRegistry registry = ExtensionRegistry.newInstance();
+ authProviderFactory = AuthProviderFactoryFactory.newBookieAuthProviderFactory(conf, registry);
+
+ responseEncoder = new BookieProtoEncoding.ResponseEncoder(registry);
+ requestDecoder = new BookieProtoEncoding.RequestDecoder(registry);
+
ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
String base = "bookie-" + conf.getBookiePort() + "-netty";
serverChannelFactory = new NioServerSocketChannelFactory(
@@ -140,11 +153,15 @@ class BookieNettyServer {
new LengthFieldBasedFrameDecoder(maxMessageSize, 0, 4, 0, 4));
pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
- pipeline.addLast("bookieProtoDecoder", new BookieProtoEncoding.RequestDecoder());
- pipeline.addLast("bookieProtoEncoder", new BookieProtoEncoding.ResponseEncoder());
+ pipeline.addLast("bookieProtoDecoder", requestDecoder);
+ pipeline.addLast("bookieProtoEncoder", responseEncoder);
+ pipeline.addLast("bookieAuthHandler",
+ new AuthHandler.ServerSideHandler(authProviderFactory));
+
SimpleChannelHandler requestHandler = isRunning.get() ?
new BookieRequestHandler(conf, requestProcessor, allChannels)
: new RejectRequestHandler();
+
pipeline.addLast("bookieRequestHandler", requestHandler);
return pipeline;
}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
index 6ece56e..683a6fb 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
@@ -20,29 +20,26 @@
*/
package org.apache.bookkeeper.proto;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.ExtensionRegistry;
import com.google.protobuf.InvalidProtocolBufferException;
+
import org.jboss.netty.buffer.ChannelBufferFactory;
import org.jboss.netty.buffer.ChannelBufferInputStream;
+import org.jboss.netty.buffer.ChannelBufferOutputStream;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
-
import org.apache.bookkeeper.proto.BookieProtocol.PacketHeader;
import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
import org.jboss.netty.handler.codec.oneone.OneToOneDecoder;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class BookieProtoEncoding {
private final static Logger LOG = LoggerFactory.getLogger(BookieProtoEncoding.class);
- static final EnDecoder REQ_PREV3 = new RequestEnDeCoderPreV3();
- static final EnDecoder REP_PREV3 = new ResponseEnDeCoderPreV3();
- static final EnDecoder REQ_V3 = new RequestEnDecoderV3();
- static final EnDecoder REP_V3 = new ResponseEnDecoderV3();
-
static interface EnDecoder {
/**
@@ -68,6 +65,12 @@ public class BookieProtoEncoding {
}
static class RequestEnDeCoderPreV3 implements EnDecoder {
+ final ExtensionRegistry extensionRegistry;
+
+ RequestEnDeCoderPreV3(ExtensionRegistry extensionRegistry) {
+ this.extensionRegistry = extensionRegistry;
+ }
+
@Override
public Object encode(Object msg, ChannelBufferFactory bufferFactory)
throws Exception {
@@ -83,8 +86,7 @@ public class BookieProtoEncoding {
buf.writeInt(new PacketHeader(r.getProtocolVersion(), r.getOpCode(), r.getFlags()).toInt());
buf.writeBytes(r.getMasterKey(), 0, BookieProtocol.MASTER_KEY_LENGTH);
return ChannelBuffers.wrappedBuffer(buf, ar.getData());
- } else {
- assert(r instanceof BookieProtocol.ReadRequest);
+ } else if (r instanceof BookieProtocol.ReadRequest) {
int totalHeaderSize = 4 // for request type
+ 8 // for ledgerId
+ 8; // for entryId
@@ -101,6 +103,19 @@ public class BookieProtoEncoding {
}
return buf;
+ } else if (r instanceof BookieProtocol.AuthRequest) {
+ BookkeeperProtocol.AuthMessage am = ((BookieProtocol.AuthRequest)r).getAuthMessage();
+ int totalHeaderSize = 4; // for request type
+ int totalSize = totalHeaderSize + am.getSerializedSize();
+ ChannelBuffer buf = bufferFactory.getBuffer(totalSize);
+ buf.writeInt(new PacketHeader(r.getProtocolVersion(),
+ r.getOpCode(),
+ r.getFlags()).toInt());
+ ChannelBufferOutputStream bufStream = new ChannelBufferOutputStream(buf);
+ am.writeTo(bufStream);
+ return buf;
+ } else {
+ return msg;
}
}
@@ -141,12 +156,23 @@ public class BookieProtoEncoding {
} else {
return new BookieProtocol.ReadRequest(h.getVersion(), ledgerId, entryId, flags);
}
+ case BookieProtocol.AUTH:
+ BookkeeperProtocol.AuthMessage.Builder builder
+ = BookkeeperProtocol.AuthMessage.newBuilder();
+ builder.mergeFrom(new ChannelBufferInputStream(packet), extensionRegistry);
+ return new BookieProtocol.AuthRequest(h.getVersion(), builder.build());
}
return packet;
}
}
static class ResponseEnDeCoderPreV3 implements EnDecoder {
+ final ExtensionRegistry extensionRegistry;
+
+ ResponseEnDeCoderPreV3(ExtensionRegistry extensionRegistry) {
+ this.extensionRegistry = extensionRegistry;
+ }
+
@Override
public Object encode(Object msg, ChannelBufferFactory bufferFactory)
throws Exception {
@@ -157,12 +183,13 @@ public class BookieProtoEncoding {
ChannelBuffer buf = bufferFactory.getBuffer(24);
buf.writeInt(new PacketHeader(r.getProtocolVersion(),
r.getOpCode(), (short)0).toInt());
- buf.writeInt(r.getErrorCode());
- buf.writeLong(r.getLedgerId());
- buf.writeLong(r.getEntryId());
ServerStats.getInstance().incrementPacketsSent();
if (msg instanceof BookieProtocol.ReadResponse) {
+ buf.writeInt(r.getErrorCode());
+ buf.writeLong(r.getLedgerId());
+ buf.writeLong(r.getEntryId());
+
BookieProtocol.ReadResponse rr = (BookieProtocol.ReadResponse)r;
if (rr.hasData()) {
return ChannelBuffers.wrappedBuffer(buf,
@@ -171,7 +198,15 @@ public class BookieProtoEncoding {
return buf;
}
} else if (msg instanceof BookieProtocol.AddResponse) {
+ buf.writeInt(r.getErrorCode());
+ buf.writeLong(r.getLedgerId());
+ buf.writeLong(r.getEntryId());
+
return buf;
+ } else if (msg instanceof BookieProtocol.AuthResponse) {
+ BookkeeperProtocol.AuthMessage am = ((BookieProtocol.AuthResponse)r).getAuthMessage();
+ return ChannelBuffers.wrappedBuffer(buf,
+ ChannelBuffers.wrappedBuffer(am.toByteArray()));
} else {
LOG.error("Cannot encode unknown response type {}", msg.getClass().getName());
return msg;
@@ -180,19 +215,23 @@ public class BookieProtoEncoding {
@Override
public Object decode(ChannelBuffer buffer)
throws Exception {
- final int rc;
- final long ledgerId, entryId;
+ int rc;
+ long ledgerId, entryId;
final PacketHeader header;
header = PacketHeader.fromInt(buffer.readInt());
- rc = buffer.readInt();
- ledgerId = buffer.readLong();
- entryId = buffer.readLong();
switch (header.getOpCode()) {
case BookieProtocol.ADDENTRY:
+ rc = buffer.readInt();
+ ledgerId = buffer.readLong();
+ entryId = buffer.readLong();
return new BookieProtocol.AddResponse(header.getVersion(), rc, ledgerId, entryId);
case BookieProtocol.READENTRY:
+ rc = buffer.readInt();
+ ledgerId = buffer.readLong();
+ entryId = buffer.readLong();
+
if (rc == BookieProtocol.EOK) {
return new BookieProtocol.ReadResponse(header.getVersion(), rc,
ledgerId, entryId, buffer.slice());
@@ -200,6 +239,13 @@ public class BookieProtoEncoding {
return new BookieProtocol.ReadResponse(header.getVersion(), rc,
ledgerId, entryId);
}
+ case BookieProtocol.AUTH:
+ ChannelBufferInputStream bufStream = new ChannelBufferInputStream(buffer);
+ BookkeeperProtocol.AuthMessage.Builder builder
+ = BookkeeperProtocol.AuthMessage.newBuilder();
+ builder.mergeFrom(bufStream, extensionRegistry);
+ BookkeeperProtocol.AuthMessage am = builder.build();
+ return new BookieProtocol.AuthResponse(header.getVersion(), am);
default:
return buffer;
}
@@ -207,10 +253,16 @@ public class BookieProtoEncoding {
}
static class RequestEnDecoderV3 implements EnDecoder {
+ final ExtensionRegistry extensionRegistry;
+
+ RequestEnDecoderV3(ExtensionRegistry extensionRegistry) {
+ this.extensionRegistry = extensionRegistry;
+ }
@Override
public Object decode(ChannelBuffer packet) throws Exception {
- return BookkeeperProtocol.Request.parseFrom(new ChannelBufferInputStream(packet));
+ return BookkeeperProtocol.Request.parseFrom(new ChannelBufferInputStream(packet),
+ extensionRegistry);
}
@Override
@@ -222,10 +274,16 @@ public class BookieProtoEncoding {
}
static class ResponseEnDecoderV3 implements EnDecoder {
+ final ExtensionRegistry extensionRegistry;
+
+ ResponseEnDecoderV3(ExtensionRegistry extensionRegistry) {
+ this.extensionRegistry = extensionRegistry;
+ }
@Override
public Object decode(ChannelBuffer packet) throws Exception {
- return BookkeeperProtocol.Response.parseFrom(new ChannelBufferInputStream(packet));
+ return BookkeeperProtocol.Response.parseFrom(new ChannelBufferInputStream(packet),
+ extensionRegistry);
}
@Override
@@ -238,6 +296,14 @@ public class BookieProtoEncoding {
public static class RequestEncoder extends OneToOneEncoder {
+ final EnDecoder REQ_PREV3;
+ final EnDecoder REQ_V3;
+
+ RequestEncoder(ExtensionRegistry extensionRegistry) {
+ REQ_PREV3 = new RequestEnDeCoderPreV3(extensionRegistry);
+ REQ_V3 = new RequestEnDecoderV3(extensionRegistry);
+ }
+
@Override
protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg)
throws Exception {
@@ -256,6 +322,13 @@ public class BookieProtoEncoding {
}
public static class RequestDecoder extends OneToOneDecoder {
+ final EnDecoder REQ_PREV3;
+ final EnDecoder REQ_V3;
+
+ RequestDecoder(ExtensionRegistry extensionRegistry) {
+ REQ_PREV3 = new RequestEnDeCoderPreV3(extensionRegistry);
+ REQ_V3 = new RequestEnDecoderV3(extensionRegistry);
+ }
@Override
protected Object decode(ChannelHandlerContext ctx, Channel channel, Object msg)
@@ -283,6 +356,13 @@ public class BookieProtoEncoding {
}
public static class ResponseEncoder extends OneToOneEncoder {
+ final EnDecoder REP_PREV3;
+ final EnDecoder REP_V3;
+
+ ResponseEncoder(ExtensionRegistry extensionRegistry) {
+ REP_PREV3 = new ResponseEnDeCoderPreV3(extensionRegistry);
+ REP_V3 = new ResponseEnDecoderV3(extensionRegistry);
+ }
@Override
protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg)
@@ -302,6 +382,13 @@ public class BookieProtoEncoding {
}
public static class ResponseDecoder extends OneToOneDecoder {
+ final EnDecoder REP_PREV3;
+ final EnDecoder REP_V3;
+
+ ResponseDecoder(ExtensionRegistry extensionRegistry) {
+ REP_PREV3 = new ResponseEnDeCoderPreV3(extensionRegistry);
+ REP_V3 = new ResponseEnDecoderV3(extensionRegistry);
+ }
@Override
protected Object decode(ChannelHandlerContext ctx, Channel channel, Object msg)
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
index 4dd26d6..2ce5ed8 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
@@ -24,6 +24,8 @@ package org.apache.bookkeeper.proto;
import org.jboss.netty.buffer.ChannelBuffer;
import java.nio.ByteBuffer;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage;
+
/**
* The packets of the Bookie protocol all have a 4-byte integer indicating the
* type of request or response at the very beginning of the packet followed by a
@@ -133,6 +135,13 @@ public interface BookieProtocol {
public static final byte READENTRY = 2;
/**
+ * Auth message. This code is for passing auth messages between the auth
+ * providers on the client and bookie. The message payload is determined
+ * by the auth providers themselves.
+ */
+ public static final byte AUTH = 3;
+
+ /**
* The error code that indicates success
*/
public static final int EOK = 0;
@@ -273,6 +282,19 @@ public interface BookieProtocol {
}
}
+ static class AuthRequest extends Request {
+ final AuthMessage authMessage;
+
+ AuthRequest(byte protocolVersion, AuthMessage authMessage) {
+ super(protocolVersion, AUTH, -1, -1, FLAG_NONE, null);
+ this.authMessage = authMessage;
+ }
+
+ AuthMessage getAuthMessage() {
+ return authMessage;
+ }
+ }
+
static class Response {
final byte protocolVersion;
final byte opCode;
@@ -343,4 +365,18 @@ public interface BookieProtocol {
super(protocolVersion, ADDENTRY, errorCode, ledgerId, entryId);
}
}
+
+ static class AuthResponse extends Response {
+ final AuthMessage authMessage;
+
+ AuthResponse(byte protocolVersion, AuthMessage authMessage) {
+ super(protocolVersion, AUTH, EOK, -1, -1);
+ this.authMessage = authMessage;
+ }
+
+ AuthMessage getAuthMessage() {
+ return authMessage;
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
index 9fec15f..1608328 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
@@ -124,6 +124,7 @@ public class BookieRequestProcessor implements RequestProcessor {
processReadRequestV3(r, c);
break;
default:
+ LOG.info("Unknown operation type {}", header.getOperation());
BookkeeperProtocol.Response.Builder response =
BookkeeperProtocol.Response.newBuilder().setHeader(r.getHeader())
.setStatus(BookkeeperProtocol.StatusCode.EBADREQ);