You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ab...@apache.org on 2021/08/10 18:29:45 UTC

[kudu] 03/03: [java] KUDU-1921 Add ability to require authn/encryption to Java client

This is an automated email from the ASF dual-hosted git repository.

abukor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit d65d75fb6473e1ce1db108758289456a71101690
Author: Attila Bukor <ab...@apache.org>
AuthorDate: Tue Jul 27 15:01:31 2021 +0200

    [java] KUDU-1921 Add ability to require authn/encryption to Java client
    
    Change-Id: Ic951b2090a4933eca70dc53b6f93cdcff5a74929
    Reviewed-on: http://gerrit.cloudera.org:8080/17732
    Tested-by: Kudu Jenkins
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
---
 .../org/apache/kudu/client/AsyncKuduClient.java    | 50 +++++++++++++++++-
 .../java/org/apache/kudu/client/Connection.java    | 17 +++++-
 .../org/apache/kudu/client/ConnectionCache.java    | 19 ++++++-
 .../java/org/apache/kudu/client/KuduClient.java    | 32 ++++++++++++
 .../java/org/apache/kudu/client/Negotiator.java    | 29 ++++++++--
 .../org/apache/kudu/client/TestNegotiator.java     |  3 +-
 .../java/org/apache/kudu/client/TestSecurity.java  | 61 ++++++++++++++++++++++
 7 files changed, 201 insertions(+), 10 deletions(-)

diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index cfa03cd..390bdd3 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -425,7 +425,9 @@ public class AsyncKuduClient implements AutoCloseable {
     this.requestTracker = new RequestTracker(clientId);
 
     this.securityContext = new SecurityContext();
-    this.connectionCache = new ConnectionCache(securityContext, bootstrap, b.saslProtocolName);
+    this.connectionCache = new ConnectionCache(securityContext, bootstrap, b.saslProtocolName,
+        b.requireAuthentication, !b.encryptionPolicy.equals(EncryptionPolicy.OPTIONAL),
+        b.encryptionPolicy.equals(EncryptionPolicy.REQUIRED));
     this.tokenReacquirer = new AuthnTokenReacquirer(this);
     this.authzTokenCache = new AuthzTokenCache(this);
   }
@@ -2726,6 +2728,18 @@ public class AsyncKuduClient implements AutoCloseable {
     }
   }
 
+  enum EncryptionPolicy {
+    // Optional, it uses encrypted connection if the server supports it,
+    // but it can connect to insecure servers too.
+    OPTIONAL,
+    // Only connects to remote servers that support encryption, fails
+    // otherwise. It can connect to insecure servers only locally.
+    REQUIRED_REMOTE,
+    // Only connects to any server, including on the loopback interface,
+    // that support encryption, fails otherwise.
+    REQUIRED,
+  }
+
   /**
    * Builder class to use in order to connect to Kudu.
    * All the parameters beyond those in the constructors are optional.
@@ -2746,6 +2760,8 @@ public class AsyncKuduClient implements AutoCloseable {
     private int workerCount = DEFAULT_WORKER_COUNT;
     private boolean statisticsDisabled = false;
     private String saslProtocolName = "kudu";
+    private boolean requireAuthentication = false;
+    private EncryptionPolicy encryptionPolicy = EncryptionPolicy.OPTIONAL;
 
     /**
      * Creates a new builder for a client that will connect to the specified masters.
@@ -2870,6 +2886,38 @@ public class AsyncKuduClient implements AutoCloseable {
     }
 
     /**
+     * Require authentication for the connection to a remote server.
+     *
+     * If it's set to true, the client will require mutual authentication between
+     * the server and the client. If the server doesn't support authentication,
+     * or it's disabled, the client will fail to connect.
+     */
+    public AsyncKuduClientBuilder requireAuthentication(boolean requireAuthentication) {
+      this.requireAuthentication = requireAuthentication;
+      return this;
+    }
+
+    /**
+     * Require encryption for the connection to a remote server.
+     *
+     * If it's set to REQUIRED_REMOTE or REQUIRED, the client will
+     * require encrypting the traffic between the server and the client.
+     * If the server doesn't support encryption, or if it's disabled, the
+     * client will fail to connect.
+     *
+     * Loopback connections are encrypted only if 'encryption_policy' is
+     * set to REQUIRED, or if it's required by the server.
+     *
+     * The default value is OPTIONAL, which allows connecting to servers without
+     * encryption as well, but it will still attempt to use it if the server
+     * supports it.
+     */
+    public AsyncKuduClientBuilder encryptionPolicy(EncryptionPolicy encryptionPolicy) {
+      this.encryptionPolicy = encryptionPolicy;
+      return this;
+    }
+
+    /**
      * Creates the client bootstrap for Netty. The user can specify the executor, but
      * if they don't, we'll use a simple thread pool.
      */
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java b/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java
index 74445ef..0cc7900 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java
@@ -110,6 +110,12 @@ class Connection extends SimpleChannelInboundHandler<Object> {
 
   private final String saslProtocolName;
 
+  private final boolean requireAuthentication;
+
+  private final boolean requireEncryption;
+
+  private final boolean encryptLoopback;
+
   /** The underlying Netty's socket channel. */
   private SocketChannel channel;
 
@@ -187,7 +193,10 @@ class Connection extends SimpleChannelInboundHandler<Object> {
              SecurityContext securityContext,
              Bootstrap bootstrap,
              CredentialsPolicy credentialsPolicy,
-             String saslProtocolName) {
+             String saslProtocolName,
+             boolean requireAuthentication,
+             boolean requireEncryption,
+             boolean encryptLoopback) {
     this.serverInfo = serverInfo;
     this.securityContext = securityContext;
     this.saslProtocolName = saslProtocolName;
@@ -195,6 +204,9 @@ class Connection extends SimpleChannelInboundHandler<Object> {
     this.credentialsPolicy = credentialsPolicy;
     this.bootstrap = bootstrap.clone();
     this.bootstrap.handler(new ConnectionChannelInitializer());
+    this.requireAuthentication = requireAuthentication;
+    this.requireEncryption = requireEncryption;
+    this.encryptLoopback = encryptLoopback;
   }
 
   /** {@inheritDoc} */
@@ -213,7 +225,8 @@ class Connection extends SimpleChannelInboundHandler<Object> {
     }
     ctx.writeAndFlush(Unpooled.wrappedBuffer(CONNECTION_HEADER), ctx.voidPromise());
     Negotiator negotiator = new Negotiator(serverInfo.getAndCanonicalizeHostname(), securityContext,
-        (credentialsPolicy == CredentialsPolicy.PRIMARY_CREDENTIALS), saslProtocolName);
+        (credentialsPolicy == CredentialsPolicy.PRIMARY_CREDENTIALS), saslProtocolName,
+        requireAuthentication, requireEncryption, encryptLoopback);
     ctx.pipeline().addBefore(ctx.name(), "negotiation", negotiator);
     negotiator.sendHello(ctx);
   }
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
index 705f962..706bc98 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
@@ -58,6 +58,12 @@ class ConnectionCache {
 
   private final String saslProtocolName;
 
+  private boolean requireAuthentication;
+
+  private boolean requireEncryption;
+
+  private boolean encryptLoopback;
+
   /**
    * Container mapping server IP/port into the established connection from the client to the
    * server. It may be up to two connections per server: one established with secondary
@@ -70,10 +76,16 @@ class ConnectionCache {
   /** Create a new empty ConnectionCache given the specified parameters. */
   ConnectionCache(SecurityContext securityContext,
                   Bootstrap bootstrap,
-                  String saslProtocolName) {
+                  String saslProtocolName,
+                  boolean requireAuthentication,
+                  boolean requireEncryption,
+                  boolean encryptLoopback) {
     this.securityContext = securityContext;
     this.bootstrap = bootstrap;
     this.saslProtocolName = saslProtocolName;
+    this.requireAuthentication = requireAuthentication;
+    this.requireEncryption = requireEncryption;
+    this.encryptLoopback = encryptLoopback;
   }
 
   /**
@@ -127,7 +139,10 @@ class ConnectionCache {
                                 securityContext,
                                 bootstrap,
                                 credentialsPolicy,
-                                saslProtocolName);
+                                saslProtocolName,
+                                requireAuthentication,
+                                requireEncryption,
+                                encryptLoopback);
         connections.add(result);
         // There can be at most 2 connections to the same destination: one with primary and another
         // with secondary credentials.
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java
index ad518a6..3cc1d31 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java
@@ -623,6 +623,38 @@ public class KuduClient implements AutoCloseable {
     }
 
     /**
+     * Require authentication for the connection to a remote server.
+     *
+     * If it's set to true, the client will require mutual authentication between
+     * the server and the client. If the server doesn't support authentication,
+     * or it's disabled, the client will fail to connect.
+     */
+    public KuduClientBuilder requireAuthentication(boolean requireAuthentication) {
+      clientBuilder.requireAuthentication(requireAuthentication);
+      return this;
+    }
+
+    /**
+     * Require encryption for the connection to a remote server.
+     *
+     * If it's set to REQUIRED or REQUIRED_LOOPBACK, the client will
+     * require encrypting the traffic between the server and the client.
+     * If the server doesn't support encryption, or if it's disabled, the
+     * client will fail to connect.
+     *
+     * Loopback connections are encrypted only if 'encryption_policy' is
+     * set to REQUIRE_LOOPBACK, or if it's required by the server.
+     *
+     * The default value is OPTIONAL, which allows connecting to servers without
+     * encryption as well, but it will still attempt to use it if the server
+     * supports it.
+     */
+    public KuduClientBuilder encryptionPolicy(AsyncKuduClient.EncryptionPolicy encryptionPolicy) {
+      clientBuilder.encryptionPolicy(encryptionPolicy);
+      return this;
+    }
+
+    /**
      * Creates a new client that connects to the masters.
      * Doesn't block and won't throw an exception if the masters don't exist.
      * @return a new asynchronous Kudu client
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java b/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
index 234984b..a9c511d 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
@@ -222,7 +222,13 @@ public class Negotiator extends SimpleChannelInboundHandler<CallResponse> {
 
   private Certificate peerCert;
 
-  private String saslProtocolName;
+  private final String saslProtocolName;
+
+  private final boolean requireAuthentication;
+
+  private final boolean requireEncryption;
+
+  private final boolean encryptLoopback;
 
   @InterfaceAudience.LimitedPrivate("Test")
   boolean overrideLoopbackForTests;
@@ -230,10 +236,16 @@ public class Negotiator extends SimpleChannelInboundHandler<CallResponse> {
   public Negotiator(String remoteHostname,
                     SecurityContext securityContext,
                     boolean ignoreAuthnToken,
-                    String saslProtocolName) {
+                    String saslProtocolName,
+                    boolean requireAuthentication,
+                    boolean requireEncryption,
+                    boolean encryptLoopback) {
     this.remoteHostname = remoteHostname;
     this.securityContext = securityContext;
     this.saslProtocolName = saslProtocolName;
+    this.requireAuthentication = requireAuthentication;
+    this.requireEncryption = requireEncryption;
+    this.encryptLoopback = encryptLoopback;
     SignedTokenPB token = securityContext.getAuthenticationToken();
     if (token != null) {
       if (ignoreAuthnToken) {
@@ -264,7 +276,7 @@ public class Negotiator extends SimpleChannelInboundHandler<CallResponse> {
     for (RpcHeader.RpcFeatureFlag flag : SUPPORTED_RPC_FEATURES) {
       builder.addSupportedFeatures(flag);
     }
-    if (isLoopbackConnection(ctx.channel())) {
+    if (isLoopbackConnection(ctx.channel()) && !encryptLoopback) {
       builder.addSupportedFeatures(RpcFeatureFlag.TLS_AUTHENTICATION_ONLY);
     }
 
@@ -371,6 +383,10 @@ public class Negotiator extends SimpleChannelInboundHandler<CallResponse> {
     serverFeatures = getFeatureFlags(response);
     // If the server supports TLS, we will always speak TLS to it.
     final boolean negotiatedTls = serverFeatures.contains(RpcFeatureFlag.TLS);
+    if (!negotiatedTls && requireEncryption) {
+      throw new NonRecoverableException(Status.NotAuthorized(
+          "server does not support required TLS encryption"));
+    }
 
     // Check the negotiated authentication type sent by the server.
     chosenAuthnType = chooseAuthenticationType(response);
@@ -473,6 +489,11 @@ public class Negotiator extends SimpleChannelInboundHandler<CallResponse> {
 
     if (chosenMech != null) {
       LOG.debug("SASL mechanism {} chosen for peer {}", chosenMech.name(), remoteHostname);
+      if (chosenMech.equals(SaslMechanism.PLAIN) && requireAuthentication) {
+        String message = "client requires authentication, " +
+            "but server does not have Kerberos enabled";
+        throw new NonRecoverableException(Status.NotAuthorized(message));
+      }
       return;
     }
 
@@ -658,7 +679,7 @@ public class Negotiator extends SimpleChannelInboundHandler<CallResponse> {
 
     // Don't wrap the TLS socket if we are using TLS for authentication only.
     boolean isAuthOnly = serverFeatures.contains(RpcFeatureFlag.TLS_AUTHENTICATION_ONLY) &&
-        isLoopbackConnection(ctx.channel());
+        isLoopbackConnection(ctx.channel()) && !encryptLoopback;
     if (!isAuthOnly) {
       ctx.pipeline().addFirst("tls", handler);
     }
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestNegotiator.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestNegotiator.java
index 0da8368..5734795 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestNegotiator.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestNegotiator.java
@@ -117,7 +117,8 @@ public class TestNegotiator {
   }
 
   private void startNegotiation(boolean fakeLoopback) {
-    Negotiator negotiator = new Negotiator("127.0.0.1", secContext, false, "kudu");
+    Negotiator negotiator = new Negotiator("127.0.0.1", secContext, false, "kudu",
+        false, false, false);
     negotiator.overrideLoopbackForTests = fakeLoopback;
     embedder = new EmbeddedChannel(negotiator);
     negotiator.sendHello(embedder.pipeline().firstContext());
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestSecurity.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestSecurity.java
index 7d88d34..cbe84d4 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestSecurity.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestSecurity.java
@@ -524,4 +524,65 @@ public class TestSecurity {
         getBasicCreateTableOptions()));
   }
 
+  @Test(timeout = 60000)
+  public void testKuduRequireAuthenticationInsecureCluster() throws Exception {
+    try {
+      KuduClient client = new KuduClient.KuduClientBuilder(harness.getMasterAddressesAsString())
+          .requireAuthentication(true)
+          .build();
+      client.createTable("TestSecurity-authentication-required-1",
+          getBasicSchema(), getBasicCreateTableOptions());
+      Assert.fail("client shouldn't be able to connect to the cluster.");
+    } catch (NonRecoverableException e) {
+      Assert.assertThat(e.getMessage(), CoreMatchers.containsString(
+          "client requires authentication, but server does not have Kerberos enabled"
+      ));
+    }
+  }
+
+  @Test(timeout = 60000)
+  @KuduTestHarness.MasterServerConfig(flags = {"--rpc_encryption=disabled",
+      "--rpc_authentication=disabled"})
+  @KuduTestHarness.TabletServerConfig(flags = {"--rpc_encryption=disabled",
+      "--rpc_authentication=disabled"})
+  public void testKuduRequireEncryptionInsecureCluster() throws Exception {
+    try {
+      KuduClient client = new KuduClient.KuduClientBuilder(harness.getMasterAddressesAsString())
+          .encryptionPolicy(AsyncKuduClient.EncryptionPolicy.REQUIRED_REMOTE)
+          .build();
+      client.createTable("TestSecurity-encryption-required-1",
+          getBasicSchema(), getBasicCreateTableOptions());
+      Assert.fail("client shouldn't be able to connect to the cluster.");
+    } catch (NonRecoverableException e) {
+      Assert.assertThat(e.getMessage(), CoreMatchers.containsString(
+          "server does not support required TLS encryption"
+      ));
+    }
+  }
+
+  @Test
+  @KuduTestHarness.EnableKerberos
+  public void testKuduRequireAuthenticationAndEncryptionSecureCluster() throws KuduException {
+    KuduClient client = new KuduClient.KuduClientBuilder(harness.getMasterAddressesAsString())
+        .requireAuthentication(true)
+        .encryptionPolicy(AsyncKuduClient.EncryptionPolicy.REQUIRED)
+        .build();
+    KuduTable table = client.createTable("TestSecurity-authentication-required-1",
+        getBasicSchema(), getBasicCreateTableOptions());
+    Assert.assertNotNull(table);
+  }
+
+  @Test
+  @KuduTestHarness.MasterServerConfig(flags = {"--rpc_encryption=disabled",
+      "--rpc_authentication=disabled"})
+  @KuduTestHarness.TabletServerConfig(flags = {"--rpc_encryption=disabled",
+      "--rpc_authentication=disabled"})
+  public void testKuduOptionalEncryption() throws KuduException {
+    KuduClient client = new KuduClient.KuduClientBuilder(harness.getMasterAddressesAsString())
+        .encryptionPolicy(AsyncKuduClient.EncryptionPolicy.OPTIONAL)
+        .build();
+    KuduTable table = client.createTable("testSecurity-encryption-optional-1",
+        getBasicSchema(), getBasicCreateTableOptions());
+    Assert.assertNotNull(table);
+  }
 }