You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/11/11 18:43:17 UTC
[bookkeeper] branch branch-4.7 updated: Fixed Auth with v2 protocol
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch branch-4.7
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/branch-4.7 by this push:
new 8b1c2a3 Fixed Auth with v2 protocol
8b1c2a3 is described below
commit 8b1c2a38d05e5e32749df40a8def0f20b237b256
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Sun Nov 11 10:42:32 2018 -0800
Fixed Auth with v2 protocol
### Motivation
BK auth framework is currently broken when using v2 protocol.
### Changes
* Fixed auth when using V2 protocol
* Made sure a client with authentication enabled can talk to a bookie without authentication. This is required in any case when enabling/disabling authentication on a live cluster.
* Run all auth tests against both v2 and v3 protocol.
This should be included in 4.7.2 to give a path to upgrade.
cc/ rdhabalia
Reviewers: Enrico Olivelli <eo...@gmail.com>, Sijie Guo <si...@apache.org>
This closes #1805 from merlimat/fix-v2-auth
(cherry picked from commit dc2aaaa070d9a8d393409b69c80ad668b70f6d2b)
Signed-off-by: Sijie Guo <si...@apache.org>
---
.../org/apache/bookkeeper/proto/AuthHandler.java | 58 ++++++++++++++++++----
.../bookkeeper/proto/BookieRequestProcessor.java | 11 ++++
.../bookkeeper/proto/PerChannelBookieClient.java | 2 +-
.../java/org/apache/bookkeeper/auth/TestAuth.java | 40 ++++++++++++++-
4 files changed, 99 insertions(+), 12 deletions(-)
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
index 1b1f60f..a7ac452 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
@@ -226,16 +226,19 @@ class AuthHandler {
final Queue<Object> waitingForAuth = new ConcurrentLinkedQueue<>();
final ClientConnectionPeer connectionPeer;
+ private final boolean isUsingV2Protocol;
+
public ClientAuthProvider getAuthProvider() {
return authProvider;
}
ClientSideHandler(ClientAuthProvider.Factory authProviderFactory, AtomicLong transactionIdGenerator,
- ClientConnectionPeer connectionPeer) {
+ ClientConnectionPeer connectionPeer, boolean isUsingV2Protocol) {
this.authProviderFactory = authProviderFactory;
this.transactionIdGenerator = transactionIdGenerator;
this.connectionPeer = connectionPeer;
authProvider = null;
+ this.isUsingV2Protocol = isUsingV2Protocol;
}
@Override
@@ -279,7 +282,7 @@ class AuthHandler {
if (AUTHENTICATION_DISABLED_PLUGIN_NAME.equals(am.getAuthPluginName())){
SocketAddress remote = ctx.channel().remoteAddress();
LOG.info("Authentication is not enabled."
- + "Considering this client {0} authenticated", remote);
+ + "Considering this client {} authenticated", remote);
AuthHandshakeCompleteCallback cb = new AuthHandshakeCompleteCallback(ctx);
cb.operationComplete(BKException.Code.OK, null);
return;
@@ -296,6 +299,33 @@ class AuthHandler {
break;
}
}
+ } else if (msg instanceof BookieProtocol.Response) {
+ BookieProtocol.Response resp = (BookieProtocol.Response) msg;
+ switch (resp.opCode) {
+ case BookieProtocol.AUTH:
+ if (resp.errorCode != BookieProtocol.EOK) {
+ authenticationError(ctx, resp.errorCode);
+ } else {
+ BookkeeperProtocol.AuthMessage am = ((BookieProtocol.AuthResponse) resp).authMessage;
+ if (AUTHENTICATION_DISABLED_PLUGIN_NAME.equals(am.getAuthPluginName())) {
+ SocketAddress remote = ctx.channel().remoteAddress();
+ LOG.info("Authentication is not enabled."
+ + "Considering this client {} authenticated", remote);
+ AuthHandshakeCompleteCallback cb = new AuthHandshakeCompleteCallback(ctx);
+ cb.operationComplete(BKException.Code.OK, null);
+ return;
+ }
+ byte[] payload = am.getPayload().toByteArray();
+ authProvider.process(AuthToken.wrap(payload), new AuthRequestCallback(ctx,
+ authProviderFactory.getPluginName()));
+ }
+ break;
+ default:
+ LOG.warn("dropping received message {} from bookie {}", msg, ctx.channel());
+ // else just drop the message, we're not authenticated so nothing should be coming
+ // through
+ break;
+ }
}
}
@@ -319,7 +349,7 @@ class AuthHandler {
} else if (msg instanceof BookieProtocol.Request) {
// let auth messages through, queue the rest
BookieProtocol.Request req = (BookieProtocol.Request) msg;
- if (BookkeeperProtocol.OperationType.AUTH.getNumber() == req.getOpCode()) {
+ if (BookieProtocol.AUTH == req.getOpCode()) {
super.write(ctx, msg, promise);
super.flush(ctx);
} else {
@@ -356,16 +386,24 @@ class AuthHandler {
authenticationError(ctx, rc);
return;
}
+
AuthMessage message = AuthMessage.newBuilder().setAuthPluginName(pluginName)
.setPayload(ByteString.copyFrom(newam.getData())).build();
- BookkeeperProtocol.BKPacketHeader header = BookkeeperProtocol.BKPacketHeader.newBuilder()
- .setVersion(BookkeeperProtocol.ProtocolVersion.VERSION_THREE)
- .setOperation(BookkeeperProtocol.OperationType.AUTH).setTxnId(newTxnId()).build();
- BookkeeperProtocol.Request.Builder builder = BookkeeperProtocol.Request.newBuilder().setHeader(header)
- .setAuthRequest(message);
-
- channel.writeAndFlush(builder.build());
+ if (isUsingV2Protocol) {
+ channel.writeAndFlush(
+ new BookieProtocol.AuthRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, message),
+ channel.voidPromise());
+ } else {
+ // V3 protocol
+ BookkeeperProtocol.BKPacketHeader header = BookkeeperProtocol.BKPacketHeader.newBuilder()
+ .setVersion(BookkeeperProtocol.ProtocolVersion.VERSION_THREE)
+ .setOperation(BookkeeperProtocol.OperationType.AUTH).setTxnId(newTxnId()).build();
+ BookkeeperProtocol.Request.Builder builder = BookkeeperProtocol.Request.newBuilder()
+ .setHeader(header)
+ .setAuthRequest(message);
+ channel.writeAndFlush(builder.build());
+ }
}
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
index 299031c..4097103 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
@@ -306,6 +306,17 @@ public class BookieRequestProcessor implements RequestProcessor {
checkArgument(r instanceof BookieProtocol.ReadRequest);
processReadRequest((BookieProtocol.ReadRequest) r, c);
break;
+ case BookieProtocol.AUTH:
+ LOG.info("Ignoring auth operation from client {}", c.remoteAddress());
+ BookkeeperProtocol.AuthMessage message = BookkeeperProtocol.AuthMessage
+ .newBuilder()
+ .setAuthPluginName(AuthProviderFactoryFactory.AUTHENTICATION_DISABLED_PLUGIN_NAME)
+ .setPayload(ByteString.copyFrom(AuthToken.NULL.getData()))
+ .build();
+
+ c.writeAndFlush(new BookieProtocol.AuthResponse(
+ BookieProtocol.CURRENT_PROTOCOL_VERSION, message));
+ break;
default:
LOG.error("Unknown op type {}, sending error", r.getOpCode());
c.writeAndFlush(ResponseBuilder.buildErrorResponse(BookieProtocol.EBADREQ, r));
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 6b356f4..1a033bd 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -446,7 +446,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
"bookieProtoDecoder",
new BookieProtoEncoding.ResponseDecoder(extRegistry, useV2WireProtocol));
pipeline.addLast("authHandler", new AuthHandler.ClientSideHandler(authProviderFactory, txnIdGenerator,
- connectionPeer));
+ connectionPeer, useV2WireProtocol));
pipeline.addLast("mainhandler", PerChannelBookieClient.this);
}
});
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/auth/TestAuth.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/auth/TestAuth.java
index 5d86036..877f8d8 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/auth/TestAuth.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/auth/TestAuth.java
@@ -28,6 +28,7 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Enumeration;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -45,12 +46,16 @@ import org.apache.bookkeeper.proto.BookieServer;
import org.apache.bookkeeper.proto.ClientConnectionPeer;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test authentication.
*/
+@RunWith(Parameterized.class)
public class TestAuth extends BookKeeperClusterTestCase {
static final Logger LOG = LoggerFactory.getLogger(TestAuth.class);
public static final String TEST_AUTH_PROVIDER_PLUGIN_NAME = "TestAuthProviderPlugin";
@@ -61,8 +66,29 @@ public class TestAuth extends BookKeeperClusterTestCase {
private static final byte[] FAILURE_RESPONSE = {2};
private static final byte[] PAYLOAD_MESSAGE = {3};
- public TestAuth() {
+ enum ProtocolVersion {
+ ProtocolV2, ProtocolV3
+ }
+
+ @Parameters
+ public static Collection<Object[]> configs() {
+ return Arrays.asList(new Object[][] {
+ { ProtocolVersion.ProtocolV2 },
+ { ProtocolVersion.ProtocolV3 },
+ });
+ }
+
+ private final ProtocolVersion protocolVersion;
+
+ public TestAuth(ProtocolVersion protocolVersion) {
super(0); // start them later when auth providers are configured
+ this.protocolVersion = protocolVersion;
+ }
+
+ protected ClientConfiguration newClientConfiguration() {
+ ClientConfiguration conf = super.newClientConfiguration();
+ conf.setUseV2WireProtocol(protocolVersion == ProtocolVersion.ProtocolV2);
+ return conf;
}
// we pass in ledgerId because the method may throw exceptions
@@ -135,6 +161,13 @@ public class TestAuth extends BookKeeperClusterTestCase {
@Test
public void testCloseMethodCalledOnAuthProvider() throws Exception {
+ LogCloseCallsBookieAuthProviderFactory.closeCountersOnFactory.set(0);
+ LogCloseCallsBookieAuthProviderFactory.closeCountersOnConnections.set(0);
+ LogCloseCallsBookieAuthProviderFactory.initCountersOnFactory.set(0);
+ LogCloseCallsBookieAuthProviderFactory.initCountersOnConnections.set(0);
+ LogCloseCallsClientAuthProviderFactory.initCountersOnFactory.set(0);
+ LogCloseCallsClientAuthProviderFactory.closeCountersOnFactory.set(0);
+
ServerConfiguration bookieConf = newServerConfiguration();
bookieConf.setBookieAuthProviderFactoryClass(
LogCloseCallsBookieAuthProviderFactory.class.getName());
@@ -271,6 +304,11 @@ public class TestAuth extends BookKeeperClusterTestCase {
} catch (BKException.BKUnauthorizedAccessException bke) {
// bookie should have sent a negative response before
// breaking the conneciton
+ assertEquals(ProtocolVersion.ProtocolV3, protocolVersion);
+ } catch (BKException.BKNotEnoughBookiesException nebe) {
+ // With V2 we don't get the authorization error, but rather just
+ // fail to write to bookies.
+ assertEquals(ProtocolVersion.ProtocolV2, protocolVersion);
}
assertFalse(ledgerId.get() == -1);
assertEquals("Shouldn't have entry", 0, entryCount(ledgerId.get(), bookieConf, clientConf));