You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2016/04/05 08:46:03 UTC
[3/3] bookkeeper git commit: BOOKKEEPER-901: Authentication framework
BOOKKEEPER-901: Authentication framework
Author: Ivan Kelly <iv...@yahoo-inc.com>
Reviewers: Sijie Guo<si...@apache.org>
Closes #23 from merlimat/authentication-framework and squashes the following commits:
aa01548 [Ivan Kelly] BOOKKEEPER-901: Add an authentication framework
f930fbd [Ivan Kelly] BOOKKEEPER-794 BookkeeperProtocol.Response.status is completely ignored
Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/b1c12c0f
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/b1c12c0f
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/b1c12c0f
Branch: refs/heads/master
Commit: b1c12c0f41b7c27b2452fef311f12077d771f431
Parents: e32c388
Author: Ivan Kelly <iv...@yahoo-inc.com>
Authored: Mon Apr 4 23:45:51 2016 -0700
Committer: Sijie Guo <si...@apache.org>
Committed: Mon Apr 4 23:45:51 2016 -0700
----------------------------------------------------------------------
bookkeeper-server/pom.xml | 9 +
.../auth/AuthProviderFactoryFactory.java | 111 +++
.../bookkeeper/auth/BookieAuthProvider.java | 83 ++
.../bookkeeper/auth/ClientAuthProvider.java | 89 +++
.../apache/bookkeeper/client/PendingAddOp.java | 4 +-
.../bookkeeper/conf/ClientConfiguration.java | 53 +-
.../bookkeeper/conf/ServerConfiguration.java | 24 +
.../apache/bookkeeper/proto/AuthHandler.java | 356 +++++++++
.../apache/bookkeeper/proto/BookieClient.java | 32 +-
.../bookkeeper/proto/BookieNettyServer.java | 21 +-
.../bookkeeper/proto/BookieProtoEncoding.java | 125 ++-
.../apache/bookkeeper/proto/BookieProtocol.java | 36 +
.../proto/BookieRequestProcessor.java | 1 +
.../bookkeeper/proto/BookkeeperProtocol.java | 787 ++++++++++++++++++-
.../proto/PerChannelBookieClient.java | 70 +-
.../src/main/proto/BookkeeperProtocol.proto | 14 +-
.../org/apache/bookkeeper/auth/TestAuth.java | 654 +++++++++++++++
.../proto/TestBackwardCompatCMS42.java | 239 ++++++
.../bookkeeper/proto/TestDataFormats.java | 126 +++
.../proto/TestPerChannelBookieClient.java | 25 +-
.../test/BookKeeperClusterTestCase.java | 10 +
.../src/test/proto/TestDataFormats.proto | 34 +
22 files changed, 2803 insertions(+), 100 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/pom.xml
----------------------------------------------------------------------
diff --git a/bookkeeper-server/pom.xml b/bookkeeper-server/pom.xml
index a1a74e0..eb67fd8 100644
--- a/bookkeeper-server/pom.xml
+++ b/bookkeeper-server/pom.xml
@@ -262,6 +262,7 @@
<!-- exclude generated file //-->
<exclude>**/DataFormats.java</exclude>
<exclude>**/BookkeeperProtocol.java</exclude>
+ <exclude>**/TestDataFormats.java</exclude>
</excludes>
</configuration>
</plugin>
@@ -324,6 +325,14 @@
<arg value="--java_out=src/main/java" />
<arg value="src/main/proto/DataFormats.proto" />
</exec>
+ <exec executable="protoc" failonerror="true">
+ <arg value="--java_out=src/main/java" />
+ <arg value="src/main/proto/BookkeeperProtocol.proto" />
+ </exec>
+ <exec executable="protoc" failonerror="true">
+ <arg value="--java_out=src/test/java" />
+ <arg value="src/test/proto/TestDataFormats.proto" />
+ </exec>
</target>
</configuration>
<goals>
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/AuthProviderFactoryFactory.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/AuthProviderFactoryFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/AuthProviderFactoryFactory.java
new file mode 100644
index 0000000..d05c475
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/AuthProviderFactoryFactory.java
@@ -0,0 +1,111 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.auth;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage;
+import org.apache.bookkeeper.util.ReflectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.protobuf.ExtensionRegistry;
+
+
+
+public class AuthProviderFactoryFactory {
+ static Logger LOG = LoggerFactory.getLogger(AuthProviderFactoryFactory.class);
+
+ public static BookieAuthProvider.Factory newBookieAuthProviderFactory(ServerConfiguration conf,
+ ExtensionRegistry registry) throws IOException {
+ String factoryClassName = conf.getBookieAuthProviderFactoryClass();
+
+ if (factoryClassName == null || factoryClassName.length() == 0) {
+ return new NullBookieAuthProviderFactory();
+ }
+
+ BookieAuthProvider.Factory factory = ReflectionUtils.newInstance(factoryClassName,
+ BookieAuthProvider.Factory.class);
+ factory.init(conf, registry);
+ return factory;
+ }
+
+ public static ClientAuthProvider.Factory newClientAuthProviderFactory(ClientConfiguration conf,
+ ExtensionRegistry registry) throws IOException {
+ String factoryClassName = conf.getClientAuthProviderFactoryClass();
+
+ if (factoryClassName == null || factoryClassName.length() == 0) {
+ return new NullClientAuthProviderFactory();
+ }
+
+ ClientAuthProvider.Factory factory = ReflectionUtils.newInstance(factoryClassName,
+ ClientAuthProvider.Factory.class);
+ factory.init(conf, registry);
+ return factory;
+ }
+
+ private final static String nullPluginName = "NULLPlugin";
+
+ private static class NullBookieAuthProviderFactory implements BookieAuthProvider.Factory {
+ @Override
+ public String getPluginName() {
+ return nullPluginName;
+ }
+
+ @Override
+ public void init(ServerConfiguration conf, ExtensionRegistry registry) {}
+
+ @Override
+ public BookieAuthProvider newProvider(InetSocketAddress addr,
+ GenericCallback<Void> completeCb) {
+ completeCb.operationComplete(BKException.Code.OK, null);
+ return new BookieAuthProvider() {
+ public void process(AuthMessage m, GenericCallback<AuthMessage> cb) {}
+ };
+ }
+ }
+
+ private static class NullClientAuthProviderFactory implements ClientAuthProvider.Factory {
+ @Override
+ public String getPluginName() {
+ return nullPluginName;
+ }
+
+ @Override
+ public void init(ClientConfiguration conf, ExtensionRegistry registry) {}
+
+ @Override
+ public ClientAuthProvider newProvider(InetSocketAddress addr,
+ GenericCallback<Void> completeCb) {
+ completeCb.operationComplete(BKException.Code.OK, null);
+ return new ClientAuthProvider() {
+ public void init(GenericCallback<AuthMessage> cb) {}
+ public void process(AuthMessage m, GenericCallback<AuthMessage> cb) {}
+ };
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/BookieAuthProvider.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/BookieAuthProvider.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/BookieAuthProvider.java
new file mode 100644
index 0000000..4fb7d07
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/BookieAuthProvider.java
@@ -0,0 +1,83 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.auth;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage;
+
+import com.google.protobuf.ExtensionRegistry;
+
+/**
+ * Bookie authentication provider interface.
+ * This must be implemented by any party wishing to implement
+ * an authentication mechanism for bookkeeper connections.
+ */
+public interface BookieAuthProvider {
+ interface Factory {
+ /**
+ * Initialize the factory with the server configuration
+ * and protobuf message registry. Implementors must
+ * add any extention messages which contain the auth
+ * payload, so that the server can decode auth messages
+ * it receives from the client.
+ */
+ void init(ServerConfiguration conf,
+ ExtensionRegistry registry) throws IOException;
+
+ /**
+ * Create a new instance of a bookie auth provider.
+ * Each connection should get its own instance, as they
+ * can hold connection specific state.
+ * The completeCb is used to notify the server that
+ * the authentication handshake is complete.
+ * CompleteCb should be called only once.
+ * If the authentication was successful, BKException.Code.OK
+ * should be passed as the return code. Otherwise, another
+ * error code should be passed.
+ * If authentication fails, the server will close the
+ * connection.
+ * @param addr the address of the client being authenticated
+ * @param completeCb callback to be notified when authentication
+ * is complete.
+ */
+ BookieAuthProvider newProvider(InetSocketAddress addr,
+ GenericCallback<Void> completeCb);
+
+ /**
+ * Get Auth provider plugin name.
+ * Used as a sanity check to ensure that the bookie and the client.
+ * are using the same auth provider.
+ */
+ String getPluginName();
+ }
+
+ /**
+ * Process a request from the client. cb will receive the next
+ * message to be sent to the client. If there are no more messages
+ * to send to the client, cb should not be called, and completeCb
+ * must be called instead.
+ */
+ void process(AuthMessage m, GenericCallback<AuthMessage> cb);
+}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/ClientAuthProvider.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/ClientAuthProvider.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/ClientAuthProvider.java
new file mode 100644
index 0000000..fba2264
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/auth/ClientAuthProvider.java
@@ -0,0 +1,89 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.auth;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage;
+
+import com.google.protobuf.ExtensionRegistry;
+
+/**
+ * Client authentication provider interface.
+ * This must be implemented by any party wishing to implement
+ * an authentication mechanism for bookkeeper connections.
+ */
+public interface ClientAuthProvider {
+ interface Factory {
+ /**
+ * Initialize the factory with the client configuration
+ * and protobuf message registry. Implementors must
+ * add any extention messages which contain the auth
+ * payload, so that the client can decode auth messages
+ * it receives from the server.
+ */
+ void init(ClientConfiguration conf,
+ ExtensionRegistry registry) throws IOException;
+
+ /**
+ * Create a new instance of a client auth provider.
+ * Each connection should get its own instance, as they
+ * can hold connection specific state.
+ * The completeCb is used to notify the client that
+ * the authentication handshake is complete.
+ * CompleteCb should be called only once.
+ * If the authentication was successful, BKException.Code.OK
+ * should be passed as the return code. Otherwise, another
+ * error code should be passed.
+ * @param addr the address of the socket being authenticated
+ * @param completeCb callback to be notified when authentication
+ * is complete.
+ */
+ ClientAuthProvider newProvider(InetSocketAddress addr,
+ GenericCallback<Void> completeCb);
+
+ /**
+ * Get Auth provider plugin name.
+ * Used as a sanity check to ensure that the bookie and the client.
+ * are using the same auth provider.
+ */
+ String getPluginName();
+ }
+
+ /**
+ * Initiate the authentication. cb will receive the initial
+ * authentication message which should be sent to the server.
+ * cb may not be called if authentication is not requires. In
+ * this case, completeCb should be called.
+ */
+ void init(GenericCallback<AuthMessage> cb);
+
+ /**
+ * Process a response from the server. cb will receive the next
+ * message to be sent to the server. If there are no more messages
+ * to send to the server, cb should not be called, and completeCb
+ * must be called instead.
+ */
+ void process(AuthMessage m, GenericCallback<AuthMessage> cb);
+}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
index bc487f6..1946069 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
@@ -208,8 +208,8 @@ class PendingAddOp implements WriteCallback, TimerTask {
lh.handleUnrecoverableErrorDuringAdd(rc);
return;
default:
- LOG.warn("Write did not succeed: L{} E{} on {}",
- new Object[] { ledgerId, entryId, addr });
+ LOG.warn("Write did not succeed: L{} E{} on {}, rc = {}",
+ new Object[] { ledgerId, entryId, addr, rc });
lh.handleBookieFailure(addr, bookieIndex);
return;
}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
index d0750d3..b8d738b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
@@ -81,6 +81,9 @@ public class ClientConfiguration extends AbstractConfiguration {
protected final static String ENABLE_TASK_EXECUTION_STATS = "enableTaskExecutionStats";
protected final static String TASK_EXECUTION_WARN_TIME_MICROS = "taskExecutionWarnTimeMicros";
+ // Client auth provider factory class name
+ protected final static String CLIENT_AUTH_PROVIDER_FACTORY_CLASS = "clientAuthProviderFactoryClass";
+
/**
* Construct a default client-side configuration
*/
@@ -700,7 +703,7 @@ public class ClientConfiguration extends AbstractConfiguration {
/**
* Check if bookie health check is enabled.
- *
+ *
* @return
*/
public boolean isBookieHealthCheckEnabled() {
@@ -709,15 +712,15 @@ public class ClientConfiguration extends AbstractConfiguration {
/**
* Enables the bookie health check.
- *
+ *
* <p>
* If the number of read/write errors for a bookie exceeds {@link #getBookieErrorThresholdPerInterval()} per
* interval, that bookie is quarantined for {@link #getBookieQuarantineTimeSeconds()} seconds. During this
* quarantined period, the client will try not to use this bookie when creating new ensembles.
* </p>
- *
+ *
* By default, the bookie health check is <b>disabled</b>.
- *
+ *
* @return client configuration
*/
public ClientConfiguration enableBookieHealthCheck() {
@@ -727,7 +730,7 @@ public class ClientConfiguration extends AbstractConfiguration {
/**
* Get the bookie health check interval in seconds.
- *
+ *
* @return
*/
public int getBookieHealthCheckIntervalSeconds() {
@@ -736,11 +739,11 @@ public class ClientConfiguration extends AbstractConfiguration {
/**
* Set the bookie health check interval. Default is 60 seconds.
- *
+ *
* <p>
* Note: Please {@link #enableBookieHealthCheck()} to use this configuration.
* </p>
- *
+ *
* @param interval
* @param unit
* @return client configuration
@@ -752,7 +755,7 @@ public class ClientConfiguration extends AbstractConfiguration {
/**
* Get the error threshold for a bookie to be quarantined.
- *
+ *
* @return
*/
public long getBookieErrorThresholdPerInterval() {
@@ -762,11 +765,11 @@ public class ClientConfiguration extends AbstractConfiguration {
/**
* Set the error threshold per interval ({@link #getBookieHealthCheckIntervalSeconds()}) for a bookie before it is
* quarantined. Default is 100 errors per minute.
- *
+ *
* <p>
* Note: Please {@link #enableBookieHealthCheck()} to use this configuration.
* </p>
- *
+ *
* @param threshold
* @param unit
* @return client configuration
@@ -778,7 +781,7 @@ public class ClientConfiguration extends AbstractConfiguration {
/**
* Get the time for which a bookie will be quarantined.
- *
+ *
* @return
*/
public int getBookieQuarantineTimeSeconds() {
@@ -787,11 +790,11 @@ public class ClientConfiguration extends AbstractConfiguration {
/**
* Set the time for which a bookie will be quarantined. Default is 30 minutes.
- *
+ *
* <p>
* Note: Please {@link #enableBookieHealthCheck()} to use this configuration.
* </p>
- *
+ *
* @param quarantineTime
* @param unit
* @return client configuration
@@ -800,4 +803,28 @@ public class ClientConfiguration extends AbstractConfiguration {
setProperty(BOOKIE_QUARANTINE_TIME_SECONDS, unit.toSeconds(quarantineTime));
return this;
}
+
+ /**
+ * Set the client authentication provider factory class name.
+ * If this is not set, no authentication will be used
+ *
+ * @param factoryClass
+ * the client authentication provider factory class name
+ * @return client configuration
+ */
+ public ClientConfiguration setClientAuthProviderFactoryClass(
+ String factoryClass) {
+ setProperty(CLIENT_AUTH_PROVIDER_FACTORY_CLASS, factoryClass);
+ return this;
+ }
+
+ /**
+ * Get the client authentication provider factory class name. If this returns null, no authentication will take
+ * place.
+ *
+ * @return the client authentication provider factory class name or null.
+ */
+ public String getClientAuthProviderFactoryClass() {
+ return getString(CLIENT_AUTH_PROVIDER_FACTORY_CLASS, null);
+ }
}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index 76e5037..d770650 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -122,6 +122,9 @@ public class ServerConfiguration extends AbstractConfiguration {
protected final static String LEDGER_STORAGE_CLASS = "ledgerStorageClass";
+ // Bookie auth provider factory class name
+ protected final static String BOOKIE_AUTH_PROVIDER_FACTORY_CLASS = "bookieAuthProviderFactoryClass";
+
/**
* Construct a default configuration object
*/
@@ -1566,4 +1569,25 @@ public class ServerConfiguration extends AbstractConfiguration {
}
}
+ /*
+ * Set the bookie authentication provider factory class name.
+ * If this is not set, no authentication will be used
+ *
+ * @param factoryClass
+ * the bookie authentication provider factory class name
+ * @return void
+ */
+ public void setBookieAuthProviderFactoryClass(String factoryClass) {
+ setProperty(BOOKIE_AUTH_PROVIDER_FACTORY_CLASS, factoryClass);
+ }
+
+ /**
+ * Get the bookie authentication provider factory class name.
+ * If this returns null, no authentication will take place.
+ *
+ * @return the bookie authentication provider factory class name or null.
+ */
+ public String getBookieAuthProviderFactoryClass() {
+ return getString(BOOKIE_AUTH_PROVIDER_FACTORY_CLASS, null);
+ }
}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
new file mode 100644
index 0000000..522bc0b
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
@@ -0,0 +1,356 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.proto;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.jboss.netty.channel.SimpleChannelHandler;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.DefaultExceptionEvent;
+import org.jboss.netty.channel.ExceptionEvent;
+
+import org.apache.bookkeeper.auth.BookieAuthProvider;
+import org.apache.bookkeeper.auth.ClientAuthProvider;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.AddRequest;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.AddResponse;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.OperationType;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class AuthHandler {
+ static final Logger LOG = LoggerFactory.getLogger(AuthHandler.class);
+
+ static class ServerSideHandler extends SimpleChannelHandler {
+ volatile boolean authenticated = false;
+ final BookieAuthProvider.Factory authProviderFactory;
+ BookieAuthProvider authProvider;
+
+ ServerSideHandler(BookieAuthProvider.Factory authProviderFactory) {
+ this.authProviderFactory = authProviderFactory;
+ authProvider = null;
+ }
+
+ @Override
+ public void channelOpen(ChannelHandlerContext ctx,
+ ChannelStateEvent e) throws Exception {
+ LOG.info("Channel open {}", ctx.getChannel());
+ SocketAddress remote = ctx.getChannel().getRemoteAddress();
+ if (remote instanceof InetSocketAddress) {
+ authProvider = authProviderFactory.newProvider((InetSocketAddress)remote,
+ new AuthHandshakeCompleteCallback());
+ } else {
+ LOG.error("Unknown socket type {} for {}", remote.getClass(), remote);
+ }
+ super.channelOpen(ctx, e);
+ }
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx,
+ MessageEvent e)
+ throws Exception {
+ if (authProvider == null) {
+ // close the channel, authProvider should only be
+ // null if the other end of line is an InetSocketAddress
+ // anything else is strange, and we don't want to deal
+ // with it
+ ctx.getChannel().close();
+ return;
+ }
+
+ Object event = e.getMessage();
+ if (authenticated) {
+ super.messageReceived(ctx, e);
+ } else if (event instanceof BookieProtocol.AuthRequest) { // pre-PB-client
+ BookieProtocol.AuthRequest req = (BookieProtocol.AuthRequest)event;
+ assert (req.getOpCode() == BookieProtocol.AUTH);
+ if (checkAuthPlugin(req.getAuthMessage(), ctx.getChannel())) {
+ authProvider.process(req.getAuthMessage(),
+ new AuthResponseCallbackLegacy(req, ctx.getChannel()));
+ } else {
+ ctx.getChannel().close();
+ }
+ } else if (event instanceof BookieProtocol.Request) {
+ BookieProtocol.Request req = (BookieProtocol.Request)event;
+ if (req.getOpCode() == BookieProtocol.ADDENTRY) {
+ ctx.getChannel().write(
+ new BookieProtocol.AddResponse(
+ req.getProtocolVersion(), BookieProtocol.EUA,
+ req.getLedgerId(), req.getEntryId()));
+ } else if (req.getOpCode() == BookieProtocol.READENTRY) {
+ ctx.getChannel().write(
+ new BookieProtocol.ReadResponse(
+ req.getProtocolVersion(), BookieProtocol.EUA,
+ req.getLedgerId(), req.getEntryId()));
+ } else {
+ ctx.getChannel().close();
+ }
+ } else if (event instanceof BookkeeperProtocol.Request) { // post-PB-client
+ BookkeeperProtocol.Request req = (BookkeeperProtocol.Request)event;
+ if (req.getHeader().getOperation() == BookkeeperProtocol.OperationType.AUTH
+ && req.hasAuthRequest()
+ && checkAuthPlugin(req.getAuthRequest(), ctx.getChannel())) {
+ authProvider.process(req.getAuthRequest(),
+ new AuthResponseCallback(req, ctx.getChannel()));
+ } else {
+ BookkeeperProtocol.Response.Builder builder
+ = BookkeeperProtocol.Response.newBuilder()
+ .setHeader(req.getHeader())
+ .setStatus(BookkeeperProtocol.StatusCode.EUA);
+
+ ctx.getChannel().write(builder.build());
+ }
+ } else {
+ // close the channel, junk coming over it
+ ctx.getChannel().close();
+ }
+ }
+
+ private boolean checkAuthPlugin(AuthMessage am, final Channel src) {
+ if (!am.hasAuthPluginName()
+ || !am.getAuthPluginName().equals(authProviderFactory.getPluginName())) {
+ LOG.error("Received message from incompatible auth plugin. Local = {},"
+ + " Remote = {}, Channel = {}",
+ authProviderFactory.getPluginName(), am.getAuthPluginName());
+ return false;
+ }
+ return true;
+ }
+
+ static class AuthResponseCallbackLegacy implements GenericCallback<AuthMessage> {
+ final BookieProtocol.AuthRequest req;
+ final Channel channel;
+
+ AuthResponseCallbackLegacy(BookieProtocol.AuthRequest req, Channel channel) {
+ this.req = req;
+ this.channel = channel;
+ }
+
+ public void operationComplete(int rc, AuthMessage newam) {
+ if (rc != BKException.Code.OK) {
+ LOG.error("Error processing auth message, closing connection");
+ channel.close();
+ return;
+ }
+ channel.write(new BookieProtocol.AuthResponse(req.getProtocolVersion(),
+ newam));
+ }
+ }
+
+ static class AuthResponseCallback implements GenericCallback<AuthMessage> {
+ final BookkeeperProtocol.Request req;
+ final Channel channel;
+
+ AuthResponseCallback(BookkeeperProtocol.Request req, Channel channel) {
+ this.req = req;
+ this.channel = channel;
+ }
+
+ public void operationComplete(int rc, AuthMessage newam) {
+ BookkeeperProtocol.Response.Builder builder
+ = BookkeeperProtocol.Response.newBuilder()
+ .setHeader(req.getHeader());
+
+ if (rc != BKException.Code.OK) {
+ LOG.error("Error processing auth message, closing connection");
+
+ builder.setStatus(BookkeeperProtocol.StatusCode.EUA);
+ channel.write(builder.build());
+ channel.close();
+ return;
+ } else {
+ builder.setStatus(BookkeeperProtocol.StatusCode.EOK)
+ .setAuthResponse(newam);
+ channel.write(builder.build());
+ }
+ }
+ }
+
+ class AuthHandshakeCompleteCallback implements GenericCallback<Void> {
+ @Override
+ public void operationComplete(int rc, Void v) {
+ if (rc == BKException.Code.OK) {
+ authenticated = true;
+ } else {
+ LOG.debug("Authentication failed on server side");
+ }
+ }
+ }
+ }
+
+ static class ClientSideHandler extends SimpleChannelHandler {
+ volatile boolean authenticated = false;
+ final ClientAuthProvider.Factory authProviderFactory;
+ ClientAuthProvider authProvider;
+ AtomicLong transactionIdGenerator;
+ Queue<MessageEvent> waitingForAuth = new ConcurrentLinkedQueue<MessageEvent>();
+
+ ClientSideHandler(ClientAuthProvider.Factory authProviderFactory,
+ AtomicLong transactionIdGenerator) {
+ this.authProviderFactory = authProviderFactory;
+ this.transactionIdGenerator = transactionIdGenerator;
+ authProvider = null;
+ }
+
+ @Override
+ public void channelConnected(ChannelHandlerContext ctx,
+ ChannelStateEvent e)
+ throws Exception {
+ SocketAddress remote = ctx.getChannel().getRemoteAddress();
+ if (remote instanceof InetSocketAddress) {
+ authProvider = authProviderFactory.newProvider((InetSocketAddress)remote,
+ new AuthHandshakeCompleteCallback(ctx));
+ authProvider.init(new AuthRequestCallback(ctx));
+ } else {
+ LOG.error("Unknown socket type {} for {}", remote.getClass(), remote);
+ }
+ super.channelConnected(ctx, e);
+ }
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx,
+ MessageEvent e)
+ throws Exception {
+ assert (authProvider != null);
+
+ Object event = e.getMessage();
+
+ if (authenticated) {
+ super.messageReceived(ctx, e);
+ } else if (event instanceof BookkeeperProtocol.Response) {
+ BookkeeperProtocol.Response resp = (BookkeeperProtocol.Response)event;
+ if (resp.getHeader().getOperation() == BookkeeperProtocol.OperationType.AUTH) {
+ if (resp.getStatus() != BookkeeperProtocol.StatusCode.EOK) {
+ authenticationError(ctx, resp.getStatus().getNumber());
+ } else {
+ assert (resp.hasAuthResponse());
+ BookkeeperProtocol.AuthMessage am = resp.getAuthResponse();
+ authProvider.process(am, new AuthRequestCallback(ctx));
+ }
+ } else {
+ // else just drop the message,
+ // we're not authenticated so nothing should be coming through
+ }
+ }
+ }
+
+ @Override
+ public void writeRequested(ChannelHandlerContext ctx,
+ MessageEvent e)
+ throws Exception {
+ synchronized (waitingForAuth) {
+ if (authenticated) {
+ super.writeRequested(ctx, e);
+ } else if (e.getMessage() instanceof BookkeeperProtocol.Request) {
+ // let auth messages through, queue the rest
+ BookkeeperProtocol.Request req = (BookkeeperProtocol.Request)e.getMessage();
+ if (req.getHeader().getOperation()
+ == BookkeeperProtocol.OperationType.AUTH) {
+ super.writeRequested(ctx, e);
+ } else {
+ waitingForAuth.add(e);
+ }
+ } // else just drop
+ }
+ }
+
+ long newTxnId() {
+ return transactionIdGenerator.incrementAndGet();
+ }
+
+ void authenticationError(ChannelHandlerContext ctx, int errorCode) {
+ LOG.error("Error processing auth message, erroring connection {}", errorCode);
+ ctx.sendUpstream(new DefaultExceptionEvent(ctx.getChannel(),
+ new AuthenticationException(
+ "Auth failed with error " + errorCode)));
+ }
+
+ class AuthRequestCallback implements GenericCallback<AuthMessage> {
+ Channel channel;
+ ChannelHandlerContext ctx;
+
+ AuthRequestCallback(ChannelHandlerContext ctx) {
+ this.channel = ctx.getChannel();
+ this.ctx = ctx;
+ }
+
+ public void operationComplete(int rc, AuthMessage newam) {
+ if (rc != BKException.Code.OK) {
+ authenticationError(ctx, rc);
+ return;
+ }
+
+ BookkeeperProtocol.BKPacketHeader header
+ = BookkeeperProtocol.BKPacketHeader.newBuilder()
+ .setVersion(BookkeeperProtocol.ProtocolVersion.VERSION_THREE)
+ .setOperation(BookkeeperProtocol.OperationType.AUTH)
+ .setTxnId(newTxnId()).build();
+ BookkeeperProtocol.Request.Builder builder
+ = BookkeeperProtocol.Request.newBuilder()
+ .setHeader(header)
+ .setAuthRequest(newam);
+
+ channel.write(builder.build());
+ }
+ }
+
+ class AuthHandshakeCompleteCallback implements GenericCallback<Void> {
+ ChannelHandlerContext ctx;
+ AuthHandshakeCompleteCallback(ChannelHandlerContext ctx) {
+ this.ctx = ctx;
+ }
+
+ @Override
+ public void operationComplete(int rc, Void v) {
+ if (rc == BKException.Code.OK) {
+ synchronized (waitingForAuth) {
+ authenticated = true;
+ MessageEvent e = waitingForAuth.poll();
+ while (e != null) {
+ ctx.sendDownstream(e);
+ e = waitingForAuth.poll();
+ }
+ }
+ } else {
+ authenticationError(ctx, rc);
+ LOG.debug("Authentication failed on server side");
+ }
+ }
+ }
+ }
+
+ static class AuthenticationException extends IOException {
+ AuthenticationException(String reason) {
+ super(reason);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
index 8a79547..d0052d6 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
@@ -28,8 +28,11 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.bookkeeper.auth.AuthProviderFactoryFactory;
+import org.apache.bookkeeper.auth.ClientAuthProvider;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.net.BookieSocketAddress;
@@ -52,6 +55,7 @@ import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.protobuf.ExtensionRegistry;
/**
* Implements the client-side part of the BookKeeper protocol.
@@ -60,11 +64,18 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
public class BookieClient implements PerChannelBookieClientFactory {
static final Logger LOG = LoggerFactory.getLogger(BookieClient.class);
- final OrderedSafeExecutor executor;
- final ClientSocketChannelFactory channelFactory;
+ // This is global state that should be across all BookieClients
+ AtomicLong totalBytesOutstanding = new AtomicLong();
+
+ OrderedSafeExecutor executor;
+ ClientSocketChannelFactory channelFactory;
final ConcurrentHashMap<BookieSocketAddress, PerChannelBookieClientPool> channels =
new ConcurrentHashMap<BookieSocketAddress, PerChannelBookieClientPool>();
final HashedWheelTimer requestTimer;
+
+ final private ClientAuthProvider.Factory authProviderFactory;
+ final private ExtensionRegistry registry;
+
private final ClientConfiguration conf;
private volatile boolean closed;
private final ReentrantReadWriteLock closeLock;
@@ -73,17 +84,22 @@ public class BookieClient implements PerChannelBookieClientFactory {
private final long bookieErrorThresholdPerInterval;
- public BookieClient(ClientConfiguration conf, ClientSocketChannelFactory channelFactory, OrderedSafeExecutor executor) {
+ public BookieClient(ClientConfiguration conf, ClientSocketChannelFactory channelFactory,
+ OrderedSafeExecutor executor) throws IOException {
this(conf, channelFactory, executor, NullStatsLogger.INSTANCE);
}
- public BookieClient(ClientConfiguration conf, ClientSocketChannelFactory channelFactory, OrderedSafeExecutor executor,
- StatsLogger statsLogger) {
+ public BookieClient(ClientConfiguration conf, ClientSocketChannelFactory channelFactory,
+ OrderedSafeExecutor executor, StatsLogger statsLogger) throws IOException {
this.conf = conf;
this.channelFactory = channelFactory;
this.executor = executor;
this.closed = false;
this.closeLock = new ReentrantReadWriteLock();
+
+ this.registry = ExtensionRegistry.newInstance();
+ this.authProviderFactory = AuthProviderFactoryFactory.newClientAuthProviderFactory(conf, registry);
+
this.statsLogger = statsLogger;
this.numConnectionsPerBookie = conf.getNumChannelsPerBookie();
this.requestTimer = new HashedWheelTimer(
@@ -120,8 +136,8 @@ public class BookieClient implements PerChannelBookieClientFactory {
@Override
public PerChannelBookieClient create(BookieSocketAddress address, PerChannelBookieClientPool pcbcPool) {
- return new PerChannelBookieClient(conf, executor, channelFactory, address,
- requestTimer, statsLogger, pcbcPool);
+ return new PerChannelBookieClient(conf, executor, channelFactory, address, requestTimer, statsLogger,
+ authProviderFactory, registry, pcbcPool);
}
private PerChannelBookieClientPool lookupClient(BookieSocketAddress addr, Object key) {
@@ -133,7 +149,7 @@ public class BookieClient implements PerChannelBookieClientFactory {
return null;
}
PerChannelBookieClientPool newClientPool =
- new DefaultPerChannelBookieClientPool(this, addr, numConnectionsPerBookie);
+ new DefaultPerChannelBookieClientPool(this, addr, numConnectionsPerBookie);
PerChannelBookieClientPool oldClientPool = channels.putIfAbsent(addr, newClientPool);
if (null == oldClientPool) {
clientPool = newClientPool;
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
index b623998..bb1b207 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
@@ -28,6 +28,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.auth.BookieAuthProvider;
+import org.apache.bookkeeper.auth.AuthProviderFactoryFactory;
import org.apache.bookkeeper.processor.RequestProcessor;
import org.apache.zookeeper.KeeperException;
import org.jboss.netty.bootstrap.ServerBootstrap;
@@ -48,6 +50,7 @@ import org.jboss.netty.handler.codec.frame.LengthFieldPrepender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.protobuf.ExtensionRegistry;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -66,11 +69,21 @@ class BookieNettyServer {
Object suspensionLock = new Object();
boolean suspended = false;
+ final BookieAuthProvider.Factory authProviderFactory;
+ final BookieProtoEncoding.ResponseEncoder responseEncoder;
+ final BookieProtoEncoding.RequestDecoder requestDecoder;
+
BookieNettyServer(ServerConfiguration conf, RequestProcessor processor)
throws IOException, KeeperException, InterruptedException, BookieException {
this.conf = conf;
this.requestProcessor = processor;
+ ExtensionRegistry registry = ExtensionRegistry.newInstance();
+ authProviderFactory = AuthProviderFactoryFactory.newBookieAuthProviderFactory(conf, registry);
+
+ responseEncoder = new BookieProtoEncoding.ResponseEncoder(registry);
+ requestDecoder = new BookieProtoEncoding.RequestDecoder(registry);
+
ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
String base = "bookie-" + conf.getBookiePort() + "-netty";
serverChannelFactory = new NioServerSocketChannelFactory(
@@ -140,11 +153,15 @@ class BookieNettyServer {
new LengthFieldBasedFrameDecoder(maxMessageSize, 0, 4, 0, 4));
pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
- pipeline.addLast("bookieProtoDecoder", new BookieProtoEncoding.RequestDecoder());
- pipeline.addLast("bookieProtoEncoder", new BookieProtoEncoding.ResponseEncoder());
+ pipeline.addLast("bookieProtoDecoder", requestDecoder);
+ pipeline.addLast("bookieProtoEncoder", responseEncoder);
+ pipeline.addLast("bookieAuthHandler",
+ new AuthHandler.ServerSideHandler(authProviderFactory));
+
SimpleChannelHandler requestHandler = isRunning.get() ?
new BookieRequestHandler(conf, requestProcessor, allChannels)
: new RejectRequestHandler();
+
pipeline.addLast("bookieRequestHandler", requestHandler);
return pipeline;
}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
index 6ece56e..683a6fb 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
@@ -20,29 +20,26 @@
*/
package org.apache.bookkeeper.proto;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.ExtensionRegistry;
import com.google.protobuf.InvalidProtocolBufferException;
+
import org.jboss.netty.buffer.ChannelBufferFactory;
import org.jboss.netty.buffer.ChannelBufferInputStream;
+import org.jboss.netty.buffer.ChannelBufferOutputStream;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
-
import org.apache.bookkeeper.proto.BookieProtocol.PacketHeader;
import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
import org.jboss.netty.handler.codec.oneone.OneToOneDecoder;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class BookieProtoEncoding {
private final static Logger LOG = LoggerFactory.getLogger(BookieProtoEncoding.class);
- static final EnDecoder REQ_PREV3 = new RequestEnDeCoderPreV3();
- static final EnDecoder REP_PREV3 = new ResponseEnDeCoderPreV3();
- static final EnDecoder REQ_V3 = new RequestEnDecoderV3();
- static final EnDecoder REP_V3 = new ResponseEnDecoderV3();
-
static interface EnDecoder {
/**
@@ -68,6 +65,12 @@ public class BookieProtoEncoding {
}
static class RequestEnDeCoderPreV3 implements EnDecoder {
+ final ExtensionRegistry extensionRegistry;
+
+ RequestEnDeCoderPreV3(ExtensionRegistry extensionRegistry) {
+ this.extensionRegistry = extensionRegistry;
+ }
+
@Override
public Object encode(Object msg, ChannelBufferFactory bufferFactory)
throws Exception {
@@ -83,8 +86,7 @@ public class BookieProtoEncoding {
buf.writeInt(new PacketHeader(r.getProtocolVersion(), r.getOpCode(), r.getFlags()).toInt());
buf.writeBytes(r.getMasterKey(), 0, BookieProtocol.MASTER_KEY_LENGTH);
return ChannelBuffers.wrappedBuffer(buf, ar.getData());
- } else {
- assert(r instanceof BookieProtocol.ReadRequest);
+ } else if (r instanceof BookieProtocol.ReadRequest) {
int totalHeaderSize = 4 // for request type
+ 8 // for ledgerId
+ 8; // for entryId
@@ -101,6 +103,19 @@ public class BookieProtoEncoding {
}
return buf;
+ } else if (r instanceof BookieProtocol.AuthRequest) {
+ BookkeeperProtocol.AuthMessage am = ((BookieProtocol.AuthRequest)r).getAuthMessage();
+ int totalHeaderSize = 4; // for request type
+ int totalSize = totalHeaderSize + am.getSerializedSize();
+ ChannelBuffer buf = bufferFactory.getBuffer(totalSize);
+ buf.writeInt(new PacketHeader(r.getProtocolVersion(),
+ r.getOpCode(),
+ r.getFlags()).toInt());
+ ChannelBufferOutputStream bufStream = new ChannelBufferOutputStream(buf);
+ am.writeTo(bufStream);
+ return buf;
+ } else {
+ return msg;
}
}
@@ -141,12 +156,23 @@ public class BookieProtoEncoding {
} else {
return new BookieProtocol.ReadRequest(h.getVersion(), ledgerId, entryId, flags);
}
+ case BookieProtocol.AUTH:
+ BookkeeperProtocol.AuthMessage.Builder builder
+ = BookkeeperProtocol.AuthMessage.newBuilder();
+ builder.mergeFrom(new ChannelBufferInputStream(packet), extensionRegistry);
+ return new BookieProtocol.AuthRequest(h.getVersion(), builder.build());
}
return packet;
}
}
static class ResponseEnDeCoderPreV3 implements EnDecoder {
+ final ExtensionRegistry extensionRegistry;
+
+ ResponseEnDeCoderPreV3(ExtensionRegistry extensionRegistry) {
+ this.extensionRegistry = extensionRegistry;
+ }
+
@Override
public Object encode(Object msg, ChannelBufferFactory bufferFactory)
throws Exception {
@@ -157,12 +183,13 @@ public class BookieProtoEncoding {
ChannelBuffer buf = bufferFactory.getBuffer(24);
buf.writeInt(new PacketHeader(r.getProtocolVersion(),
r.getOpCode(), (short)0).toInt());
- buf.writeInt(r.getErrorCode());
- buf.writeLong(r.getLedgerId());
- buf.writeLong(r.getEntryId());
ServerStats.getInstance().incrementPacketsSent();
if (msg instanceof BookieProtocol.ReadResponse) {
+ buf.writeInt(r.getErrorCode());
+ buf.writeLong(r.getLedgerId());
+ buf.writeLong(r.getEntryId());
+
BookieProtocol.ReadResponse rr = (BookieProtocol.ReadResponse)r;
if (rr.hasData()) {
return ChannelBuffers.wrappedBuffer(buf,
@@ -171,7 +198,15 @@ public class BookieProtoEncoding {
return buf;
}
} else if (msg instanceof BookieProtocol.AddResponse) {
+ buf.writeInt(r.getErrorCode());
+ buf.writeLong(r.getLedgerId());
+ buf.writeLong(r.getEntryId());
+
return buf;
+ } else if (msg instanceof BookieProtocol.AuthResponse) {
+ BookkeeperProtocol.AuthMessage am = ((BookieProtocol.AuthResponse)r).getAuthMessage();
+ return ChannelBuffers.wrappedBuffer(buf,
+ ChannelBuffers.wrappedBuffer(am.toByteArray()));
} else {
LOG.error("Cannot encode unknown response type {}", msg.getClass().getName());
return msg;
@@ -180,19 +215,23 @@ public class BookieProtoEncoding {
@Override
public Object decode(ChannelBuffer buffer)
throws Exception {
- final int rc;
- final long ledgerId, entryId;
+ int rc;
+ long ledgerId, entryId;
final PacketHeader header;
header = PacketHeader.fromInt(buffer.readInt());
- rc = buffer.readInt();
- ledgerId = buffer.readLong();
- entryId = buffer.readLong();
switch (header.getOpCode()) {
case BookieProtocol.ADDENTRY:
+ rc = buffer.readInt();
+ ledgerId = buffer.readLong();
+ entryId = buffer.readLong();
return new BookieProtocol.AddResponse(header.getVersion(), rc, ledgerId, entryId);
case BookieProtocol.READENTRY:
+ rc = buffer.readInt();
+ ledgerId = buffer.readLong();
+ entryId = buffer.readLong();
+
if (rc == BookieProtocol.EOK) {
return new BookieProtocol.ReadResponse(header.getVersion(), rc,
ledgerId, entryId, buffer.slice());
@@ -200,6 +239,13 @@ public class BookieProtoEncoding {
return new BookieProtocol.ReadResponse(header.getVersion(), rc,
ledgerId, entryId);
}
+ case BookieProtocol.AUTH:
+ ChannelBufferInputStream bufStream = new ChannelBufferInputStream(buffer);
+ BookkeeperProtocol.AuthMessage.Builder builder
+ = BookkeeperProtocol.AuthMessage.newBuilder();
+ builder.mergeFrom(bufStream, extensionRegistry);
+ BookkeeperProtocol.AuthMessage am = builder.build();
+ return new BookieProtocol.AuthResponse(header.getVersion(), am);
default:
return buffer;
}
@@ -207,10 +253,16 @@ public class BookieProtoEncoding {
}
static class RequestEnDecoderV3 implements EnDecoder {
+ final ExtensionRegistry extensionRegistry;
+
+ RequestEnDecoderV3(ExtensionRegistry extensionRegistry) {
+ this.extensionRegistry = extensionRegistry;
+ }
@Override
public Object decode(ChannelBuffer packet) throws Exception {
- return BookkeeperProtocol.Request.parseFrom(new ChannelBufferInputStream(packet));
+ return BookkeeperProtocol.Request.parseFrom(new ChannelBufferInputStream(packet),
+ extensionRegistry);
}
@Override
@@ -222,10 +274,16 @@ public class BookieProtoEncoding {
}
static class ResponseEnDecoderV3 implements EnDecoder {
+ final ExtensionRegistry extensionRegistry;
+
+ ResponseEnDecoderV3(ExtensionRegistry extensionRegistry) {
+ this.extensionRegistry = extensionRegistry;
+ }
@Override
public Object decode(ChannelBuffer packet) throws Exception {
- return BookkeeperProtocol.Response.parseFrom(new ChannelBufferInputStream(packet));
+ return BookkeeperProtocol.Response.parseFrom(new ChannelBufferInputStream(packet),
+ extensionRegistry);
}
@Override
@@ -238,6 +296,14 @@ public class BookieProtoEncoding {
public static class RequestEncoder extends OneToOneEncoder {
+ final EnDecoder REQ_PREV3;
+ final EnDecoder REQ_V3;
+
+ RequestEncoder(ExtensionRegistry extensionRegistry) {
+ REQ_PREV3 = new RequestEnDeCoderPreV3(extensionRegistry);
+ REQ_V3 = new RequestEnDecoderV3(extensionRegistry);
+ }
+
@Override
protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg)
throws Exception {
@@ -256,6 +322,13 @@ public class BookieProtoEncoding {
}
public static class RequestDecoder extends OneToOneDecoder {
+ final EnDecoder REQ_PREV3;
+ final EnDecoder REQ_V3;
+
+ RequestDecoder(ExtensionRegistry extensionRegistry) {
+ REQ_PREV3 = new RequestEnDeCoderPreV3(extensionRegistry);
+ REQ_V3 = new RequestEnDecoderV3(extensionRegistry);
+ }
@Override
protected Object decode(ChannelHandlerContext ctx, Channel channel, Object msg)
@@ -283,6 +356,13 @@ public class BookieProtoEncoding {
}
public static class ResponseEncoder extends OneToOneEncoder {
+ final EnDecoder REP_PREV3;
+ final EnDecoder REP_V3;
+
+ ResponseEncoder(ExtensionRegistry extensionRegistry) {
+ REP_PREV3 = new ResponseEnDeCoderPreV3(extensionRegistry);
+ REP_V3 = new ResponseEnDecoderV3(extensionRegistry);
+ }
@Override
protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg)
@@ -302,6 +382,13 @@ public class BookieProtoEncoding {
}
public static class ResponseDecoder extends OneToOneDecoder {
+ final EnDecoder REP_PREV3;
+ final EnDecoder REP_V3;
+
+ ResponseDecoder(ExtensionRegistry extensionRegistry) {
+ REP_PREV3 = new ResponseEnDeCoderPreV3(extensionRegistry);
+ REP_V3 = new ResponseEnDecoderV3(extensionRegistry);
+ }
@Override
protected Object decode(ChannelHandlerContext ctx, Channel channel, Object msg)
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
index 4dd26d6..2ce5ed8 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
@@ -24,6 +24,8 @@ package org.apache.bookkeeper.proto;
import org.jboss.netty.buffer.ChannelBuffer;
import java.nio.ByteBuffer;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage;
+
/**
* The packets of the Bookie protocol all have a 4-byte integer indicating the
* type of request or response at the very beginning of the packet followed by a
@@ -133,6 +135,13 @@ public interface BookieProtocol {
public static final byte READENTRY = 2;
/**
+ * Auth message. This code is for passing auth messages between the auth
+ * providers on the client and bookie. The message payload is determined
+ * by the auth providers themselves.
+ */
+ public static final byte AUTH = 3;
+
+ /**
* The error code that indicates success
*/
public static final int EOK = 0;
@@ -273,6 +282,19 @@ public interface BookieProtocol {
}
}
+ static class AuthRequest extends Request {
+ final AuthMessage authMessage;
+
+ AuthRequest(byte protocolVersion, AuthMessage authMessage) {
+ super(protocolVersion, AUTH, -1, -1, FLAG_NONE, null);
+ this.authMessage = authMessage;
+ }
+
+ AuthMessage getAuthMessage() {
+ return authMessage;
+ }
+ }
+
static class Response {
final byte protocolVersion;
final byte opCode;
@@ -343,4 +365,18 @@ public interface BookieProtocol {
super(protocolVersion, ADDENTRY, errorCode, ledgerId, entryId);
}
}
+
+ static class AuthResponse extends Response {
+ final AuthMessage authMessage;
+
+ AuthResponse(byte protocolVersion, AuthMessage authMessage) {
+ super(protocolVersion, AUTH, EOK, -1, -1);
+ this.authMessage = authMessage;
+ }
+
+ AuthMessage getAuthMessage() {
+ return authMessage;
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
index 9fec15f..1608328 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
@@ -124,6 +124,7 @@ public class BookieRequestProcessor implements RequestProcessor {
processReadRequestV3(r, c);
break;
default:
+ LOG.info("Unknown operation type {}", header.getOperation());
BookkeeperProtocol.Response.Builder response =
BookkeeperProtocol.Response.newBuilder().setHeader(r.getHeader())
.setStatus(BookkeeperProtocol.StatusCode.EBADREQ);