You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2020/03/17 17:02:14 UTC

[kudu] 01/02: [java] fix Kudu Ranger plugin when Ranger is Kerberized

This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 5fce4225a3e315bd0c8ad0e2e7fd9b068edef240
Author: Hao Hao <ha...@cloudera.com>
AuthorDate: Wed Mar 11 00:17:37 2020 -0700

    [java] fix Kudu Ranger plugin when Ranger is Kerberized
    
    Kudu Ranger plugin currently failed to connect to the Ranger service if
    it is Kerberized. This patch fixes it by login 'kudu' user via calling
    UserGroupInformation.loginUserFromKeytab, since Ranger client-side
    library RangerAdminRESTClient uses UserGroupInformation to determine if
    Kerberos is enabled. Note that the Ranger client side library also handles
    Kerberos re-login/renewal via calling UserGroupInformation.checkTGTAndReloginFromKeytab.
    
    This patch also introduces two new properties 'principal' and 'keytab'
    in SubprocessConfiguration as they are generic enough for other subprocess
    type to use as well.
    
    Since MiniRanger is not available yet, I did manual tests to ensure
    the Ranger plugin is able to connect to the Ranger service on a secured
    cluster (even after running for longer than the Kerberos ticket and renewal
    lifetime).
    
    Change-Id: Ibe043293ea9cc1c2f43a331603dc1e3b36ff6ae0
    Reviewed-on: http://gerrit.cloudera.org:8080/15414
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
    Tested-by: Hao Hao <ha...@cloudera.com>
---
 .../kudu/subprocess/KuduSubprocessException.java   | 12 +++++-
 .../kudu/subprocess/SubprocessConfiguration.java   | 44 ++++++++++++++++++++--
 .../apache/kudu/subprocess/SubprocessExecutor.java |  7 ++--
 .../kudu/subprocess/echo/EchoSubprocessMain.java   |  3 +-
 .../subprocess/ranger/RangerProtocolHandler.java   |  5 ++-
 .../subprocess/ranger/RangerSubprocessMain.java    |  7 +++-
 .../ranger/authorization/RangerKuduAuthorizer.java | 25 +++++++++++-
 .../kudu/subprocess/echo/TestEchoSubprocess.java   | 13 +++++--
 .../subprocess/ranger/TestRangerSubprocess.java    |  5 ++-
 9 files changed, 101 insertions(+), 20 deletions(-)

diff --git a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/KuduSubprocessException.java b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/KuduSubprocessException.java
index 34ff107..53e11b6 100644
--- a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/KuduSubprocessException.java
+++ b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/KuduSubprocessException.java
@@ -27,12 +27,22 @@ public final class KuduSubprocessException extends RuntimeException {
 
   /**
    * Constructs a new runtime exception with the specified detail
+   * message.
+   *
+   * @param  message the detail message
+   */
+  public KuduSubprocessException(String message) {
+    super(message);
+  }
+
+  /**
+   * Constructs a new runtime exception with the specified detail
    * message and cause.
    *
    * @param  message the detail message
    * @param  cause the cause
    */
-  KuduSubprocessException(String message, Throwable cause) {
+  public KuduSubprocessException(String message, Throwable cause) {
     super(message, cause);
   }
 }
diff --git a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessConfiguration.java b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessConfiguration.java
index f7e8991..7234695 100644
--- a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessConfiguration.java
+++ b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessConfiguration.java
@@ -36,11 +36,15 @@ public class SubprocessConfiguration {
   private int maxMsgParserThreads;
   private static final int MAX_MSG_PARSER_THREADS_DEFAULT = 3;
   private int maxMsgBytes;
+  private String keytabFile;
+  private static final String KEYTAB_FILE_DEFAULT = "";
+  private String servicePrincipal;
+  private static final String SERVICE_PRINCIPAL_DEFAULT = "";
 
   @VisibleForTesting
   static final int MAX_MESSAGE_BYTES_DEFAULT = 1024 * 1024;
 
-  SubprocessConfiguration(String[] args) {
+  public SubprocessConfiguration(String[] args) {
     parse(args);
   }
 
@@ -69,6 +73,20 @@ public class SubprocessConfiguration {
   }
 
   /**
+   * @return the path to the service keytab file
+   */
+  public String getKeytabFile() {
+    return keytabFile;
+  }
+
+  /**
+   * @return the principal name of the service to load from the keytab file
+   */
+  public String getServicePrincipal() {
+    return servicePrincipal;
+  }
+
+  /**
    * Parses the arguments according to the specified options.
    *
    * @param args the subprocess arguments
@@ -99,18 +117,38 @@ public class SubprocessConfiguration {
     maxMsgOpt.setRequired(false);
     options.addOption(maxMsgOpt);
 
+    final String keytabFileLongOpt = "keytab";
+    Option keytabOpt = new Option(
+        "k", keytabFileLongOpt, /* hasArg= */true,
+        "The path to the service keytab file");
+    keytabOpt.setRequired(false);
+    options.addOption(keytabOpt);
+
+    final String principalLongOpt = "principal";
+    Option principalOpt = new Option(
+        "i", principalLongOpt, /* hasArg= */true,
+        "The service principal name to load from the keytab file");
+    principalOpt.setRequired(false);
+    options.addOption(principalOpt);
+
     CommandLineParser parser = new BasicParser();
     try {
       CommandLine cmd = parser.parse(options, args);
       String queueSize = cmd.getOptionValue(queueSizeLongOpt);
-      String maxParserThreads = cmd.getOptionValue(maxMsgParserThreadsLongOpt);
-      String maxMsgBytes = cmd.getOptionValue(maxMsgBytesLongOpt);
       this.queueSize = queueSize == null ?
           QUEUE_SIZE_DEFAULT : Integer.parseInt(queueSize);
+      String maxParserThreads = cmd.getOptionValue(maxMsgParserThreadsLongOpt);
       this.maxMsgParserThreads = maxParserThreads == null ?
           MAX_MSG_PARSER_THREADS_DEFAULT : Integer.parseInt(maxParserThreads);
+      String maxMsgBytes = cmd.getOptionValue(maxMsgBytesLongOpt);
       this.maxMsgBytes = maxMsgBytes == null ?
           MAX_MESSAGE_BYTES_DEFAULT : Integer.parseInt(maxMsgBytes);
+      String keytab = cmd.getOptionValue(keytabFileLongOpt);
+      this.keytabFile = keytab == null ?
+          KEYTAB_FILE_DEFAULT : keytab;
+      String principal = cmd.getOptionValue(principalLongOpt);
+      this.servicePrincipal = principal == null ?
+          SERVICE_PRINCIPAL_DEFAULT : principal;
     } catch (ParseException e) {
       throw new KuduSubprocessException("Cannot parse the subprocess command line", e);
     }
diff --git a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessExecutor.java b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessExecutor.java
index ced6890..e9c06e5 100644
--- a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessExecutor.java
+++ b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessExecutor.java
@@ -76,7 +76,7 @@ public class SubprocessExecutor {
   /**
    * Executes the subprocess with the given arguments and protocol processor.
    *
-   * @param args the subprocess arguments
+   * @param conf the subprocess configuration
    * @param protocolHandler the subprocess protocol handler
    * @param timeoutMs the maximum time to wait for subprocess tasks to finish, -1 means
    *                  no time out and the tasks will continue execute until it finishes
@@ -85,9 +85,8 @@ public class SubprocessExecutor {
    * @throws TimeoutException if the wait timed out
    */
   @VisibleForTesting
-  public void run(String[] args, ProtocolHandler protocolHandler, long timeoutMs)
+  public void run(SubprocessConfiguration conf, ProtocolHandler protocolHandler, long timeoutMs)
       throws InterruptedException, ExecutionException, TimeoutException {
-    SubprocessConfiguration conf = new SubprocessConfiguration(args);
     int maxMsgParserThread = conf.getMaxMsgParserThreads();
     int queueSize = conf.getQueueSize();
     int maxMessageBytes = conf.getMaxMessageBytes();
@@ -181,7 +180,7 @@ public class SubprocessExecutor {
       throws ExecutionException, InterruptedException {
     Preconditions.checkArgument(timeoutMs != -1);
     try {
-      run(args, handler, timeoutMs);
+      run(new SubprocessConfiguration(args), handler, timeoutMs);
     } catch (TimeoutException e) {
       // no-op
     }
diff --git a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/echo/EchoSubprocessMain.java b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/echo/EchoSubprocessMain.java
index 08877b7..f884c3d 100644
--- a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/echo/EchoSubprocessMain.java
+++ b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/echo/EchoSubprocessMain.java
@@ -19,6 +19,7 @@ package org.apache.kudu.subprocess.echo;
 
 import org.apache.yetus.audience.InterfaceAudience;
 
+import org.apache.kudu.subprocess.SubprocessConfiguration;
 import org.apache.kudu.subprocess.SubprocessExecutor;
 
 @InterfaceAudience.Private
@@ -27,6 +28,6 @@ class EchoSubprocessMain {
   public static void main(String[] args) throws Exception {
     SubprocessExecutor subprocessExecutor = new SubprocessExecutor();
     EchoProtocolHandler protocolHandler = new EchoProtocolHandler();
-    subprocessExecutor.run(args, protocolHandler, /* timeoutMs= */-1);
+    subprocessExecutor.run(new SubprocessConfiguration(args), protocolHandler, /* timeoutMs= */-1);
   }
 }
diff --git a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/ranger/RangerProtocolHandler.java b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/ranger/RangerProtocolHandler.java
index aad4c50..e57041e 100644
--- a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/ranger/RangerProtocolHandler.java
+++ b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/ranger/RangerProtocolHandler.java
@@ -17,6 +17,7 @@
 
 package org.apache.kudu.subprocess.ranger;
 
+import com.google.common.base.Preconditions;
 import org.apache.ranger.plugin.policyengine.RangerAccessResult;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
@@ -42,8 +43,8 @@ class RangerProtocolHandler extends ProtocolHandler<RangerRequestListPB,
   @InterfaceAudience.LimitedPrivate("Test")
   static RangerKuduAuthorizer authz = new RangerKuduAuthorizer();
 
-  RangerProtocolHandler() {
-    authz.init();
+  RangerProtocolHandler(String servicePrincipal, String keytab) {
+    authz.init(servicePrincipal, keytab);
   }
 
   @Override
diff --git a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/ranger/RangerSubprocessMain.java b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/ranger/RangerSubprocessMain.java
index 238c13f..4ce633b 100644
--- a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/ranger/RangerSubprocessMain.java
+++ b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/ranger/RangerSubprocessMain.java
@@ -19,6 +19,7 @@ package org.apache.kudu.subprocess.ranger;
 
 import org.apache.yetus.audience.InterfaceAudience;
 
+import org.apache.kudu.subprocess.SubprocessConfiguration;
 import org.apache.kudu.subprocess.SubprocessExecutor;
 
 // The Ranger subprocess that wraps the Kudu Ranger plugin. For the
@@ -33,7 +34,9 @@ class RangerSubprocessMain {
 
   public static void main(String[] args) throws Exception {
     SubprocessExecutor subprocessExecutor = new SubprocessExecutor();
-    RangerProtocolHandler protocolProcessor = new RangerProtocolHandler();
-    subprocessExecutor.run(args, protocolProcessor, /* timeoutMs= */-1);
+    SubprocessConfiguration conf = new SubprocessConfiguration(args);
+    RangerProtocolHandler protocolHandler = new RangerProtocolHandler(conf.getServicePrincipal(),
+                                                                      conf.getKeytabFile());
+    subprocessExecutor.run(conf, protocolHandler, /* timeoutMs= */-1);
   }
 }
diff --git a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/ranger/authorization/RangerKuduAuthorizer.java b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/ranger/authorization/RangerKuduAuthorizer.java
index 1c0040a..1e895a5 100644
--- a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/ranger/authorization/RangerKuduAuthorizer.java
+++ b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/ranger/authorization/RangerKuduAuthorizer.java
@@ -17,6 +17,7 @@
 
 package org.apache.kudu.subprocess.ranger.authorization;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
@@ -41,6 +42,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.kudu.ranger.Ranger.RangerRequestListPB;
 import org.apache.kudu.ranger.Ranger.RangerRequestPB;
+import org.apache.kudu.subprocess.KuduSubprocessException;
 
 public class RangerKuduAuthorizer {
   private static final Logger LOG = LoggerFactory.getLogger(RangerKuduAuthorizer.class);
@@ -65,10 +67,29 @@ public class RangerKuduAuthorizer {
   /**
    * Initializes the Ranger Kudu plugin, which has to be called explicitly
    * before doing any authorizations.
+   *
+   * @param servicePrincipal the principal name for Kudu to load from the keytab file
+   * @param keytab the path to the Kudu keytab file
    */
-  public void init() {
-    LOG.info("Initializing Ranger Kudu plugin");
+  public void init(String servicePrincipal, String keytab) {
+    // Determine if Kerberos is enabled in the Hadoop configuration. Kerberos should
+    // also be enabled in the Kudu master.
+    if (UserGroupInformation.isSecurityEnabled()) {
+      if (servicePrincipal.isEmpty() || keytab.isEmpty()) {
+        throw new KuduSubprocessException("Kudu principal and Keytab file must be " +
+                                          "provided when Kerberos is enabled in Ranger");
+      }
+      // When Kerberos is enabled, login with the Kudu principal and keytab
+      // before initializing the Ranger plugin.
+      try {
+        LOG.debug("Login with Kudu principal: {}, and keytab: {}", servicePrincipal, keytab);
+        UserGroupInformation.loginUserFromKeytab(servicePrincipal, keytab);
+      } catch (IOException e) {
+        throw new KuduSubprocessException("Failed to login with Kudu principal/keytab", e);
+      }
+    }
     plugin.init();
+    LOG.info("Finished Ranger Kudu plugin initialization");
   }
 
   /**
diff --git a/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/echo/TestEchoSubprocess.java b/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/echo/TestEchoSubprocess.java
index fbaaa52..294e643 100644
--- a/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/echo/TestEchoSubprocess.java
+++ b/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/echo/TestEchoSubprocess.java
@@ -35,6 +35,7 @@ import org.apache.kudu.subprocess.OutboundResponse;
 import org.apache.kudu.subprocess.Subprocess.EchoResponsePB;
 import org.apache.kudu.subprocess.Subprocess.SubprocessMetricsPB;
 import org.apache.kudu.subprocess.Subprocess.SubprocessResponsePB;
+import org.apache.kudu.subprocess.SubprocessConfiguration;
 import org.apache.kudu.subprocess.SubprocessExecutor;
 import org.apache.kudu.subprocess.SubprocessTestUtil;
 import org.apache.kudu.test.junit.RetryRule;
@@ -226,7 +227,8 @@ public class TestEchoSubprocess extends SubprocessTestUtil {
     SubprocessExecutor executor = setUpExecutorIO(NO_ERR, /*injectIOError*/false);
     requestSenderPipe.write("malformed".getBytes(StandardCharsets.UTF_8));
     Throwable thrown = Assert.assertThrows(ExecutionException.class,
-        () -> executor.run(NO_ARGS, new EchoProtocolHandler(), TIMEOUT_MS));
+        () -> executor.run(new SubprocessConfiguration(NO_ARGS),
+                           new EchoProtocolHandler(), TIMEOUT_MS));
     Assert.assertTrue(thrown.getMessage().contains("Unable to read the protobuf message"));
   }
 
@@ -245,7 +247,8 @@ public class TestEchoSubprocess extends SubprocessTestUtil {
     // times out before CompletableFuture.get() is called on the writer.
     assertIncludingSuppressedThrows(IOException.class,
         "Unable to write to print stream",
-        () -> executor.run(NO_ARGS, new EchoProtocolHandler(), TIMEOUT_MS));
+        () -> executor.run(new SubprocessConfiguration(NO_ARGS),
+                           new EchoProtocolHandler(), TIMEOUT_MS));
   }
 
   /**
@@ -260,7 +263,8 @@ public class TestEchoSubprocess extends SubprocessTestUtil {
     sendRequestToPipe(createEchoSubprocessRequest(MESSAGE));
     assertIncludingSuppressedThrows(ExecutionException.class,
         "Unable to put the message to the queue",
-        () -> executor.run(NO_ARGS, new EchoProtocolHandler(), TIMEOUT_MS));
+        () -> executor.run(new SubprocessConfiguration(NO_ARGS),
+                           new EchoProtocolHandler(), TIMEOUT_MS));
   }
 
   /**
@@ -275,7 +279,8 @@ public class TestEchoSubprocess extends SubprocessTestUtil {
     sendRequestToPipe(createEchoSubprocessRequest("b"));
     executor.blockWriteMs(1000);
     Assert.assertThrows(TimeoutException.class,
-        () -> executor.run(NO_ARGS, new EchoProtocolHandler(), TIMEOUT_MS));
+        () -> executor.run(new SubprocessConfiguration(NO_ARGS),
+                           new EchoProtocolHandler(), TIMEOUT_MS));
 
     // We should see a single message in the outbound queue. The other one is
     // blocked writing.
diff --git a/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/ranger/TestRangerSubprocess.java b/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/ranger/TestRangerSubprocess.java
index d856386..f018c2a 100644
--- a/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/ranger/TestRangerSubprocess.java
+++ b/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/ranger/TestRangerSubprocess.java
@@ -39,6 +39,7 @@ import org.apache.kudu.ranger.Ranger.RangerRequestListPB;
 import org.apache.kudu.ranger.Ranger.RangerRequestPB;
 import org.apache.kudu.ranger.Ranger.RangerResponseListPB;
 import org.apache.kudu.subprocess.Subprocess.SubprocessRequestPB;
+import org.apache.kudu.subprocess.SubprocessConfiguration;
 import org.apache.kudu.subprocess.SubprocessExecutor;
 import org.apache.kudu.subprocess.SubprocessTestUtil;
 import org.apache.kudu.subprocess.ranger.authorization.RangerKuduAuthorizer;
@@ -125,7 +126,9 @@ public class TestRangerSubprocess extends SubprocessTestUtil {
     // We expect the executor to time out since it is non cancelable
     // if no exception encountered.
     assertThrows(TimeoutException.class,
-        () -> executor.run(NO_ARGS, new RangerProtocolHandler(), TIMEOUT_MS));
+        () -> executor.run(new SubprocessConfiguration(NO_ARGS),
+                           new RangerProtocolHandler(/* servicePrincipal= */"", /* keytab= */""),
+                           TIMEOUT_MS));
 
     RangerResponseListPB resp = receiveResponse().getResponse().unpack(RangerResponseListPB.class);
     assertTrue(resp.getResponses(/* index= */0).getAllowed());