You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/11/11 18:43:17 UTC

[bookkeeper] branch branch-4.7 updated: Fixed Auth with v2 protocol

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-4.7
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/branch-4.7 by this push:
     new 8b1c2a3  Fixed Auth with v2 protocol
8b1c2a3 is described below

commit 8b1c2a38d05e5e32749df40a8def0f20b237b256
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Sun Nov 11 10:42:32 2018 -0800

    Fixed Auth with v2 protocol
    
    ### Motivation
    
    BK auth framework is currently broken when using v2 protocol.
    
    ### Changes
    
     * Fixed auth when using V2 protocol
     * Made sure a client with authentication enabled can talk to a bookie without authentication. This is required in any case when enabling/disabling authentication on a live cluster.
     * Run all auth tests against both v2 and v3 protocol.
    
    This should be included in 4.7.2 to give a path to upgrade.
    
    cc/ rdhabalia
    
    Reviewers: Enrico Olivelli <eo...@gmail.com>, Sijie Guo <si...@apache.org>
    
    This closes #1805 from merlimat/fix-v2-auth
    
    (cherry picked from commit dc2aaaa070d9a8d393409b69c80ad668b70f6d2b)
    Signed-off-by: Sijie Guo <si...@apache.org>
---
 .../org/apache/bookkeeper/proto/AuthHandler.java   | 58 ++++++++++++++++++----
 .../bookkeeper/proto/BookieRequestProcessor.java   | 11 ++++
 .../bookkeeper/proto/PerChannelBookieClient.java   |  2 +-
 .../java/org/apache/bookkeeper/auth/TestAuth.java  | 40 ++++++++++++++-
 4 files changed, 99 insertions(+), 12 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
index 1b1f60f..a7ac452 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
@@ -226,16 +226,19 @@ class AuthHandler {
         final Queue<Object> waitingForAuth = new ConcurrentLinkedQueue<>();
         final ClientConnectionPeer connectionPeer;
 
+        private final boolean isUsingV2Protocol;
+
         public ClientAuthProvider getAuthProvider() {
             return authProvider;
         }
 
         ClientSideHandler(ClientAuthProvider.Factory authProviderFactory, AtomicLong transactionIdGenerator,
-                ClientConnectionPeer connectionPeer) {
+                ClientConnectionPeer connectionPeer, boolean isUsingV2Protocol) {
             this.authProviderFactory = authProviderFactory;
             this.transactionIdGenerator = transactionIdGenerator;
             this.connectionPeer = connectionPeer;
             authProvider = null;
+            this.isUsingV2Protocol = isUsingV2Protocol;
         }
 
         @Override
@@ -279,7 +282,7 @@ class AuthHandler {
                             if (AUTHENTICATION_DISABLED_PLUGIN_NAME.equals(am.getAuthPluginName())){
                                 SocketAddress remote = ctx.channel().remoteAddress();
                                 LOG.info("Authentication is not enabled."
-                                    + "Considering this client {0} authenticated", remote);
+                                    + "Considering this client {} authenticated", remote);
                                 AuthHandshakeCompleteCallback cb = new AuthHandshakeCompleteCallback(ctx);
                                 cb.operationComplete(BKException.Code.OK, null);
                                 return;
@@ -296,6 +299,33 @@ class AuthHandler {
                         break;
                     }
                 }
+            } else if (msg instanceof BookieProtocol.Response) {
+                BookieProtocol.Response resp = (BookieProtocol.Response) msg;
+                switch (resp.opCode) {
+                case BookieProtocol.AUTH:
+                    if (resp.errorCode != BookieProtocol.EOK) {
+                        authenticationError(ctx, resp.errorCode);
+                    } else {
+                        BookkeeperProtocol.AuthMessage am = ((BookieProtocol.AuthResponse) resp).authMessage;
+                        if (AUTHENTICATION_DISABLED_PLUGIN_NAME.equals(am.getAuthPluginName())) {
+                            SocketAddress remote = ctx.channel().remoteAddress();
+                            LOG.info("Authentication is not enabled."
+                                    + "Considering this client {} authenticated", remote);
+                            AuthHandshakeCompleteCallback cb = new AuthHandshakeCompleteCallback(ctx);
+                            cb.operationComplete(BKException.Code.OK, null);
+                            return;
+                        }
+                        byte[] payload = am.getPayload().toByteArray();
+                        authProvider.process(AuthToken.wrap(payload), new AuthRequestCallback(ctx,
+                                authProviderFactory.getPluginName()));
+                    }
+                    break;
+                default:
+                    LOG.warn("dropping received message {} from bookie {}", msg, ctx.channel());
+                    // else just drop the message, we're not authenticated so nothing should be coming
+                    // through
+                    break;
+                }
             }
         }
 
@@ -319,7 +349,7 @@ class AuthHandler {
                 } else if (msg instanceof BookieProtocol.Request) {
                     // let auth messages through, queue the rest
                     BookieProtocol.Request req = (BookieProtocol.Request) msg;
-                    if (BookkeeperProtocol.OperationType.AUTH.getNumber() == req.getOpCode()) {
+                    if (BookieProtocol.AUTH == req.getOpCode()) {
                         super.write(ctx, msg, promise);
                         super.flush(ctx);
                     } else {
@@ -356,16 +386,24 @@ class AuthHandler {
                     authenticationError(ctx, rc);
                     return;
                 }
+
                 AuthMessage message = AuthMessage.newBuilder().setAuthPluginName(pluginName)
                         .setPayload(ByteString.copyFrom(newam.getData())).build();
 
-                BookkeeperProtocol.BKPacketHeader header = BookkeeperProtocol.BKPacketHeader.newBuilder()
-                        .setVersion(BookkeeperProtocol.ProtocolVersion.VERSION_THREE)
-                        .setOperation(BookkeeperProtocol.OperationType.AUTH).setTxnId(newTxnId()).build();
-                BookkeeperProtocol.Request.Builder builder = BookkeeperProtocol.Request.newBuilder().setHeader(header)
-                        .setAuthRequest(message);
-
-                channel.writeAndFlush(builder.build());
+                if (isUsingV2Protocol) {
+                    channel.writeAndFlush(
+                            new BookieProtocol.AuthRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, message),
+                            channel.voidPromise());
+                } else {
+                    // V3 protocol
+                    BookkeeperProtocol.BKPacketHeader header = BookkeeperProtocol.BKPacketHeader.newBuilder()
+                            .setVersion(BookkeeperProtocol.ProtocolVersion.VERSION_THREE)
+                            .setOperation(BookkeeperProtocol.OperationType.AUTH).setTxnId(newTxnId()).build();
+                    BookkeeperProtocol.Request.Builder builder = BookkeeperProtocol.Request.newBuilder()
+                            .setHeader(header)
+                            .setAuthRequest(message);
+                    channel.writeAndFlush(builder.build());
+                }
             }
         }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
index 299031c..4097103 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
@@ -306,6 +306,17 @@ public class BookieRequestProcessor implements RequestProcessor {
                     checkArgument(r instanceof BookieProtocol.ReadRequest);
                     processReadRequest((BookieProtocol.ReadRequest) r, c);
                     break;
+                case BookieProtocol.AUTH:
+                    LOG.info("Ignoring auth operation from client {}", c.remoteAddress());
+                    BookkeeperProtocol.AuthMessage message = BookkeeperProtocol.AuthMessage
+                            .newBuilder()
+                            .setAuthPluginName(AuthProviderFactoryFactory.AUTHENTICATION_DISABLED_PLUGIN_NAME)
+                            .setPayload(ByteString.copyFrom(AuthToken.NULL.getData()))
+                            .build();
+
+                    c.writeAndFlush(new BookieProtocol.AuthResponse(
+                            BookieProtocol.CURRENT_PROTOCOL_VERSION, message));
+                    break;
                 default:
                     LOG.error("Unknown op type {}, sending error", r.getOpCode());
                     c.writeAndFlush(ResponseBuilder.buildErrorResponse(BookieProtocol.EBADREQ, r));
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 6b356f4..1a033bd 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -446,7 +446,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
                     "bookieProtoDecoder",
                     new BookieProtoEncoding.ResponseDecoder(extRegistry, useV2WireProtocol));
                 pipeline.addLast("authHandler", new AuthHandler.ClientSideHandler(authProviderFactory, txnIdGenerator,
-                            connectionPeer));
+                            connectionPeer, useV2WireProtocol));
                 pipeline.addLast("mainhandler", PerChannelBookieClient.this);
             }
         });
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/auth/TestAuth.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/auth/TestAuth.java
index 5d86036..877f8d8 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/auth/TestAuth.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/auth/TestAuth.java
@@ -28,6 +28,7 @@ import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Enumeration;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -45,12 +46,16 @@ import org.apache.bookkeeper.proto.BookieServer;
 import org.apache.bookkeeper.proto.ClientConnectionPeer;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * Test authentication.
  */
+@RunWith(Parameterized.class)
 public class TestAuth extends BookKeeperClusterTestCase {
     static final Logger LOG = LoggerFactory.getLogger(TestAuth.class);
     public static final String TEST_AUTH_PROVIDER_PLUGIN_NAME = "TestAuthProviderPlugin";
@@ -61,8 +66,29 @@ public class TestAuth extends BookKeeperClusterTestCase {
     private static final byte[] FAILURE_RESPONSE = {2};
     private static final byte[] PAYLOAD_MESSAGE = {3};
 
-    public TestAuth() {
+    enum ProtocolVersion {
+        ProtocolV2, ProtocolV3
+    }
+
+    @Parameters
+    public static Collection<Object[]> configs() {
+        return Arrays.asList(new Object[][] {
+                { ProtocolVersion.ProtocolV2 },
+                { ProtocolVersion.ProtocolV3 },
+        });
+    }
+
+    private final ProtocolVersion protocolVersion;
+
+    public TestAuth(ProtocolVersion protocolVersion) {
         super(0); // start them later when auth providers are configured
+        this.protocolVersion = protocolVersion;
+    }
+
+    protected ClientConfiguration newClientConfiguration() {
+        ClientConfiguration conf = super.newClientConfiguration();
+        conf.setUseV2WireProtocol(protocolVersion == ProtocolVersion.ProtocolV2);
+        return conf;
     }
 
     // we pass in ledgerId because the method may throw exceptions
@@ -135,6 +161,13 @@ public class TestAuth extends BookKeeperClusterTestCase {
 
     @Test
     public void testCloseMethodCalledOnAuthProvider() throws Exception {
+        LogCloseCallsBookieAuthProviderFactory.closeCountersOnFactory.set(0);
+        LogCloseCallsBookieAuthProviderFactory.closeCountersOnConnections.set(0);
+        LogCloseCallsBookieAuthProviderFactory.initCountersOnFactory.set(0);
+        LogCloseCallsBookieAuthProviderFactory.initCountersOnConnections.set(0);
+        LogCloseCallsClientAuthProviderFactory.initCountersOnFactory.set(0);
+        LogCloseCallsClientAuthProviderFactory.closeCountersOnFactory.set(0);
+
         ServerConfiguration bookieConf = newServerConfiguration();
         bookieConf.setBookieAuthProviderFactoryClass(
                 LogCloseCallsBookieAuthProviderFactory.class.getName());
@@ -271,6 +304,11 @@ public class TestAuth extends BookKeeperClusterTestCase {
         } catch (BKException.BKUnauthorizedAccessException bke) {
             // bookie should have sent a negative response before
             // breaking the conneciton
+            assertEquals(ProtocolVersion.ProtocolV3, protocolVersion);
+        } catch (BKException.BKNotEnoughBookiesException nebe) {
+            // With V2 we don't get the authorization error, but rather just
+            // fail to write to bookies.
+            assertEquals(ProtocolVersion.ProtocolV2, protocolVersion);
         }
         assertFalse(ledgerId.get() == -1);
         assertEquals("Shouldn't have entry", 0, entryCount(ledgerId.get(), bookieConf, clientConf));