You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2017/03/03 20:53:48 UTC

[5/5] kudu git commit: mapreduce: support for running on secure clusters

mapreduce: support for running on secure clusters

This adds the appropriate hooks to grab authentication credentials at
job submission time and add them to the job's Credentials object as a
Hadoop "Token". The tasks then grab the Token and import it into the
client they create before using it.

It's not possible to test this since we don't have support for running
Kerberized Yarn clusters in the MiniCluster environment. I tested
manually on a secure cluster using ImportTsv, ITBLL, and RowCounter
jobs.

Change-Id: Ieed43b9c8646aaee549078a26850e7e7bdecd802
Reviewed-on: http://gerrit.cloudera.org:8080/6237
Tested-by: Kudu Jenkins
Reviewed-by: Jean-Daniel Cryans <jd...@apache.org>
(cherry picked from commit e9dfbe1e5dd38d4f77ab79537e7e5c6d30e56a1d)
Reviewed-on: http://gerrit.cloudera.org:8080/6242
Reviewed-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/6139cdd6
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/6139cdd6
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/6139cdd6

Branch: refs/heads/branch-1.3.x
Commit: 6139cdd69e781b2adb05c1ee3b6642c1287d902c
Parents: dabf958
Author: Todd Lipcon <to...@apache.org>
Authored: Wed Mar 1 22:38:39 2017 -0800
Committer: Todd Lipcon <to...@apache.org>
Committed: Fri Mar 3 20:47:33 2017 +0000

----------------------------------------------------------------------
 .../tools/IntegrationTestBigLinkedList.java     |   2 +
 .../org/apache/kudu/client/AsyncKuduClient.java |   9 ++
 .../java/org/apache/kudu/client/KuduClient.java |   8 ++
 .../kudu/mapreduce/CommandLineParser.java       |   5 +-
 .../kudu/mapreduce/KuduTableInputFormat.java    |   1 +
 .../kudu/mapreduce/KuduTableMapReduceUtil.java  | 104 +++++++++++++++++++
 .../kudu/mapreduce/KuduTableOutputFormat.java   |   1 +
 7 files changed, 128 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/6139cdd6/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/IntegrationTestBigLinkedList.java
----------------------------------------------------------------------
diff --git a/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/IntegrationTestBigLinkedList.java b/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/IntegrationTestBigLinkedList.java
index 19b2ba1..6171026 100644
--- a/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/IntegrationTestBigLinkedList.java
+++ b/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/IntegrationTestBigLinkedList.java
@@ -774,7 +774,9 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
       // Lack of YARN-445 means we can't auto-jstack on timeout, so disabling the timeout gives
       // us a chance to do it manually.
       job.getConfiguration().setInt("mapreduce.task.timeout", 0);
+
       KuduTableMapReduceUtil.addDependencyJars(job);
+      KuduTableMapReduceUtil.addCredentialsToJob(client, job);
 
       boolean success = job.waitForCompletion(true);
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/6139cdd6/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index acda901..6639284 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -48,6 +48,7 @@ import javax.annotation.concurrent.GuardedBy;
 import javax.security.auth.Subject;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.net.HostAndPort;
@@ -579,6 +580,14 @@ public class AsyncKuduClient implements AutoCloseable {
   }
 
   /**
+   * @return the list of master addresses, stringified using commas to separate
+   * them
+   */
+  public String getMasterAddressesAsString() {
+    return Joiner.on(",").join(masterAddresses);
+  }
+
+  /**
    * Check if statistics collection is enabled for this client.
    * @return true if it is enabled, else false
    */

http://git-wip-us.apache.org/repos/asf/kudu/blob/6139cdd6/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java
index 371d196..b23e5c2 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java
@@ -298,6 +298,14 @@ public class KuduClient implements AutoCloseable {
     return asyncClient.getDefaultAdminOperationTimeoutMs();
   }
 
+  /**
+   * @return the list of master addresses, stringified using commas to separate
+   * them
+   */
+  public String getMasterAddressesAsString() {
+    return asyncClient.getMasterAddressesAsString();
+  }
+
   // Helper method to handle joining and transforming the Exception we receive.
   static <R> R joinAndHandleException(Deferred<R> deferred) throws KuduException {
     try {

http://git-wip-us.apache.org/repos/asf/kudu/blob/6139cdd6/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/CommandLineParser.java
----------------------------------------------------------------------
diff --git a/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/CommandLineParser.java b/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/CommandLineParser.java
index 830cb7b..5b701ed 100644
--- a/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/CommandLineParser.java
+++ b/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/CommandLineParser.java
@@ -18,7 +18,6 @@
 package org.apache.kudu.mapreduce;
 
 import org.apache.hadoop.conf.Configuration;
-
 import org.apache.kudu.annotations.InterfaceAudience;
 import org.apache.kudu.annotations.InterfaceStability;
 import org.apache.kudu.client.AsyncKuduClient;
@@ -118,11 +117,13 @@ public class CommandLineParser {
    * @return a kudu client
    */
   public KuduClient getClient() {
-    return new KuduClient.KuduClientBuilder(getMasterAddresses())
+    KuduClient c = new KuduClient.KuduClientBuilder(getMasterAddresses())
         .defaultOperationTimeoutMs(getOperationTimeoutMs())
         .defaultAdminOperationTimeoutMs(getAdminOperationTimeoutMs())
         .defaultSocketReadTimeoutMs(getSocketReadTimeoutMs())
         .build();
+    KuduTableMapReduceUtil.importCredentialsFromCurrentSubject(c);
+    return c;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kudu/blob/6139cdd6/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableInputFormat.java
----------------------------------------------------------------------
diff --git a/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableInputFormat.java b/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableInputFormat.java
index 91f4825..8f98170 100644
--- a/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableInputFormat.java
+++ b/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableInputFormat.java
@@ -211,6 +211,7 @@ public class KuduTableInputFormat extends InputFormat<NullWritable, RowResult>
     this.client = new KuduClient.KuduClientBuilder(masterAddresses)
                                 .defaultOperationTimeoutMs(operationTimeoutMs)
                                 .build();
+    KuduTableMapReduceUtil.importCredentialsFromCurrentSubject(client);
     this.nameServer = conf.get(NAME_SERVER_KEY);
     this.cacheBlocks = conf.getBoolean(SCAN_CACHE_BLOCKS, false);
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/6139cdd6/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableMapReduceUtil.java
----------------------------------------------------------------------
diff --git a/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableMapReduceUtil.java b/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableMapReduceUtil.java
index 6488163..570b464 100644
--- a/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableMapReduceUtil.java
+++ b/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableMapReduceUtil.java
@@ -19,6 +19,7 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.net.URL;
 import java.net.URLDecoder;
+import java.security.AccessController;
 import java.util.ArrayList;
 import java.util.Enumeration;
 import java.util.HashMap;
@@ -29,6 +30,8 @@ import java.util.Set;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipFile;
 
+import javax.security.auth.Subject;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.net.util.Base64;
@@ -36,18 +39,26 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.util.StringUtils;
 
 import org.apache.kudu.annotations.InterfaceAudience;
 import org.apache.kudu.annotations.InterfaceStability;
 import org.apache.kudu.client.AsyncKuduClient;
 import org.apache.kudu.client.ColumnRangePredicate;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduException;
 import org.apache.kudu.client.KuduPredicate;
 import org.apache.kudu.client.KuduTable;
 import org.apache.kudu.client.Operation;
 
+import com.google.common.base.Preconditions;
+
 /**
  * Utility class to setup MR jobs that use Kudu as an input and/or output.
  */
@@ -59,6 +70,14 @@ public class KuduTableMapReduceUtil {
   private static final Log LOG = LogFactory.getLog(KuduTableMapReduceUtil.class);
 
   /**
+   * "Secret key alias" used in Job Credentials to store the Kudu authentication
+   * credentials. This acts as a key in a Hadoop Credentials object.
+   */
+  private static final Text AUTHN_CREDENTIALS_ALIAS = new Text("kudu.authn.credentials");
+
+  private static final Text KUDU_TOKEN_KIND = new Text("kudu-authn-data");
+
+  /**
    * Doesn't need instantiation
    */
   private KuduTableMapReduceUtil() { }
@@ -96,6 +115,19 @@ public class KuduTableMapReduceUtil {
     }
 
     /**
+     * Add credentials to the job so that tasks run as the user that submitted
+     * the job.
+     */
+    protected void addCredentialsToJob(String masterAddresses, long operationTimeoutMs)
+        throws KuduException {
+      try (KuduClient client = new KuduClient.KuduClientBuilder(masterAddresses)
+          .defaultOperationTimeoutMs(operationTimeoutMs)
+          .build()) {
+        KuduTableMapReduceUtil.addCredentialsToJob(client, job);
+      }
+    }
+
+    /**
      * Configures the job using the passed parameters.
      * @throws IOException If addDependencies is enabled and a problem is encountered reading
      * files on the filesystem
@@ -137,6 +169,7 @@ public class KuduTableMapReduceUtil {
       if (addDependencies) {
         addDependencyJars(job);
       }
+      addCredentialsToJob(masterAddresses, operationTimeoutMs);
     }
   }
 
@@ -202,6 +235,8 @@ public class KuduTableMapReduceUtil {
       if (addDependencies) {
         addDependencyJars(job);
       }
+
+      addCredentialsToJob(masterAddresses, operationTimeoutMs);
     }
   }
 
@@ -363,6 +398,75 @@ public class KuduTableMapReduceUtil {
   }
 
   /**
+   * Export the credentials from a {@link KuduClient} and store them in the given MapReduce
+   * {@link Job} so that {@link KuduClient}s created from within tasks of that job can
+   * authenticate to Kudu.
+   *
+   * This must be used before submitting a job when running against a Kudu cluster
+   * configured to require authentication. If using {@link TableInputFormatConfigurator},
+   * {@link TableOutputFormatConfigurator} or another such utility class, this is called
+   * automatically and does not need to be called.
+   *
+   * @param client the client whose credentials to export
+   * @param job the job to configure
+   * @throws KuduException if credentials cannot be exported
+   */
+  public static void addCredentialsToJob(KuduClient client, Job job)
+      throws KuduException {
+    Preconditions.checkNotNull(client);
+    Preconditions.checkNotNull(job);
+
+    byte[] authnCreds = client.exportAuthenticationCredentials();
+    Text service = new Text(client.getMasterAddressesAsString());
+    job.getCredentials().addToken(AUTHN_CREDENTIALS_ALIAS,
+        new Token<TokenIdentifier>(null, authnCreds, KUDU_TOKEN_KIND, service));
+  }
+
+  /**
+   * Import credentials from the current thread's JAAS {@link Subject} into the provided
+   * {@link KuduClient}.
+   *
+   * This must be called for any clients created within a MapReduce job in order to
+   * adopt the credentials added by {@link #addCredentialsToJob(KuduClient, Job)}.
+   * When using {@link KuduTableInputFormat} or {@link KuduTableOutputFormat}, the
+   * implementation automatically handles creating the client and importing necessary
+   * credentials. As such, this is only necessary in jobs that explicitly create a
+   * {@link KuduClient}.
+   *
+   * If no appropriate credentials are found, does nothing.
+   */
+  public static void importCredentialsFromCurrentSubject(KuduClient client) {
+    Subject subj = Subject.getSubject(AccessController.getContext());
+    if (subj == null) {
+      return;
+    }
+    Text service = new Text(client.getMasterAddressesAsString());
+    // Find the Hadoop credentials stored within the JAAS subject.
+    Set<Credentials> credSet = subj.getPrivateCredentials(Credentials.class);
+    if (credSet == null) {
+      return;
+    }
+    for (Credentials creds : credSet) {
+      for (Token<?> tok : creds.getAllTokens()) {
+        if (!tok.getKind().equals(KUDU_TOKEN_KIND)) {
+          continue;
+        }
+        // Only import credentials relevant to the service corresponding to
+        // 'client'. This is necessary if we want to support a job which
+        // reads from one cluster and writes to another.
+        if (!tok.getService().equals(service)) {
+          LOG.debug("Not importing credentials for service " + service +
+              "(expecting service " + service + ")");
+          continue;
+        }
+        LOG.debug("Importing credentials for service " + service);
+        client.importAuthenticationCredentials(tok.getPassword());
+        return;
+      }
+    }
+  }
+
+  /**
    * Add the Kudu dependency jars as well as jars for any of the configured
    * job classes to the job configuration, so that JobClient will ship them
    * to the cluster and add them to the DistributedCache.

http://git-wip-us.apache.org/repos/asf/kudu/blob/6139cdd6/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableOutputFormat.java
----------------------------------------------------------------------
diff --git a/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableOutputFormat.java b/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableOutputFormat.java
index bb64a1e..1a93a36 100644
--- a/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableOutputFormat.java
+++ b/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableOutputFormat.java
@@ -119,6 +119,7 @@ public class KuduTableOutputFormat extends OutputFormat<NullWritable,Operation>
     this.client = new KuduClient.KuduClientBuilder(masterAddress)
         .defaultOperationTimeoutMs(operationTimeoutMs)
         .build();
+    KuduTableMapReduceUtil.importCredentialsFromCurrentSubject(client);
     try {
       this.table = client.openTable(tableName);
     } catch (Exception ex) {