You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by da...@apache.org on 2017/05/10 00:22:45 UTC

kudu git commit: KUDU-1999: Spark connector should login with Kerberos credentials on driver

Repository: kudu
Updated Branches:
  refs/heads/master c4c796a25 -> a877566e9


KUDU-1999: Spark connector should login with Kerberos credentials on driver

Tested on a secure cluster using the Spark ITBLL job:

spark2-submit \
    --deploy-mode=cluster \
    --master=yarn \
    --principal=dan \
    --keytab dan.keytab \
    --class org.apache.kudu.spark.tools.IntegrationTestBigLinkedList \
    kudu-spark-tools-1.4.0-SNAPSHOT.jar generate \
    --master-addrs=kudu-spark-secure-1.gce.cloudera.com

Without some very major changes to our test infrastructure it's not
possible to test this code in unit tests, since it relies on a secure
Yarn cluster being available.

note: long-running jobs will continue to fail, since credentials are
still not refreshed.

Change-Id: If87a470c1cf99ea52668f22b72f1f7331877ec63
Reviewed-on: http://gerrit.cloudera.org:8080/6822
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/a877566e
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/a877566e
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/a877566e

Branch: refs/heads/master
Commit: a877566e9477242c015758d105c8e616248af7c6
Parents: c4c796a
Author: Dan Burkert <da...@apache.org>
Authored: Mon May 8 17:53:36 2017 -0700
Committer: Dan Burkert <da...@apache.org>
Committed: Wed May 10 00:22:08 2017 +0000

----------------------------------------------------------------------
 docs/developing.adoc                            |  2 +-
 .../tools/IntegrationTestBigLinkedList.scala    |  2 +-
 .../apache/kudu/spark/kudu/DefaultSource.scala  |  4 +-
 .../apache/kudu/spark/kudu/KuduContext.scala    | 86 +++++++++++++++++---
 .../apache/kudu/spark/kudu/TestContext.scala    |  2 +-
 5 files changed, 80 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/a877566e/docs/developing.adoc
----------------------------------------------------------------------
diff --git a/docs/developing.adoc b/docs/developing.adoc
index d1cc0ec..1cc6312 100644
--- a/docs/developing.adoc
+++ b/docs/developing.adoc
@@ -122,7 +122,7 @@ df.registerTempTable("kudu_table")
 val filteredDF = sqlContext.sql("select id from kudu_table where id >= 5").show()
 
 // Use KuduContext to create, delete, or write to Kudu tables
-val kuduContext = new KuduContext("kudu.master:7051")
+val kuduContext = new KuduContext("kudu.master:7051", sqlContext.sparkContext)
 
 // Create a new Kudu table from a dataframe schema
 // NB: No rows from the dataframe are inserted into the table

http://git-wip-us.apache.org/repos/asf/kudu/blob/a877566e/java/kudu-spark-tools/src/main/scala/org/apache/kudu/spark/tools/IntegrationTestBigLinkedList.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark-tools/src/main/scala/org/apache/kudu/spark/tools/IntegrationTestBigLinkedList.scala b/java/kudu-spark-tools/src/main/scala/org/apache/kudu/spark/tools/IntegrationTestBigLinkedList.scala
index 52647f8..3c82c46 100644
--- a/java/kudu-spark-tools/src/main/scala/org/apache/kudu/spark/tools/IntegrationTestBigLinkedList.scala
+++ b/java/kudu-spark-tools/src/main/scala/org/apache/kudu/spark/tools/IntegrationTestBigLinkedList.scala
@@ -152,7 +152,7 @@ object Generator {
 
   def run(args: Args, sc: SparkContext): Unit = {
 
-    val kc = new KuduContext(args.masterAddrs)
+    val kc = new KuduContext(args.masterAddrs, sc)
     val applicationId = sc.applicationId
 
     val client: KuduClient = kc.syncClient

http://git-wip-us.apache.org/repos/asf/kudu/blob/a877566e/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
index 4faaa79..f5da69b 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
@@ -123,14 +123,14 @@ class KuduRelation(private val tableName: String,
                    private val masterAddrs: String,
                    private val operationType: OperationType,
                    private val userSchema: Option[StructType])(
-                    val sqlContext: SQLContext)
+                   val sqlContext: SQLContext)
   extends BaseRelation
     with PrunedFilteredScan
     with InsertableRelation {
 
   import KuduRelation._
 
-  private val context: KuduContext = new KuduContext(masterAddrs)
+  private val context: KuduContext = new KuduContext(masterAddrs, sqlContext.sparkContext)
   private val table: KuduTable = context.syncClient.openTable(tableName)
 
   override def unhandledFilters(filters: Array[Filter]): Array[Filter] =

http://git-wip-us.apache.org/repos/asf/kudu/blob/a877566e/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
index 8c80ea0..8ca1e03 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
@@ -17,30 +17,41 @@
 
 package org.apache.kudu.spark.kudu
 
+import java.security.{AccessController, PrivilegedAction}
 import java.util
+import javax.security.auth.Subject
+import javax.security.auth.login.{AppConfigurationEntry, Configuration, LoginContext}
 
+import scala.collection.JavaConverters._
 import scala.collection.mutable
-
 import org.apache.hadoop.util.ShutdownHookManager
-import org.apache.spark.SparkContext
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.types.{DataType, DataTypes, StructType}
-import org.apache.spark.sql.{DataFrame, Row}
-
 import org.apache.kudu.annotations.InterfaceStability
 import org.apache.kudu.client.SessionConfiguration.FlushMode
 import org.apache.kudu.client._
+import org.apache.kudu.spark.kudu
 import org.apache.kudu.{ColumnSchema, Schema, Type}
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.types.{DataType, DataTypes, StructType}
+import org.apache.spark.sql.{DataFrame, Row}
+import org.slf4j.{Logger, LoggerFactory}
 
 /**
   * KuduContext is a serializable container for Kudu client connections.
   *
   * If a Kudu client connection is needed as part of a Spark application, a
-  * [[KuduContext]] should used as a broadcast variable in the job in order to
-  * share connections among the tasks in a JVM.
+  * [[KuduContext]] should be created in the driver, and shared with executors
+  * as a serializable field.
   */
 @InterfaceStability.Unstable
-class KuduContext(kuduMaster: String) extends Serializable {
+class KuduContext(val kuduMaster: String,
+                  sc: SparkContext) extends Serializable {
+  import kudu.KuduContext._
+
+  @Deprecated()
+  def this(kuduMaster: String) {
+    this(kuduMaster, new SparkContext())
+  }
 
   @transient lazy val syncClient = {
     val c = KuduConnection.getSyncClient(kuduMaster)
@@ -59,8 +70,11 @@ class KuduContext(kuduMaster: String) extends Serializable {
   }
 
   // Visible for testing.
-  private[kudu] val authnCredentials : Array[Byte] =
-    syncClient.exportAuthenticationCredentials()
+  private[kudu] val authnCredentials : Array[Byte] = {
+    Subject.doAs(getSubject(sc), new PrivilegedAction[Array[Byte]] {
+      override def run(): Array[Byte] = syncClient.exportAuthenticationCredentials()
+    })
+  }
 
   /**
     * Create an RDD from a Kudu table.
@@ -240,6 +254,56 @@ class KuduContext(kuduMaster: String) extends Serializable {
   }
 }
 
+private object KuduContext {
+  val Log: Logger = LoggerFactory.getLogger(classOf[KuduContext])
+
+  /**
+    * Returns a new Kerberos-authenticated [[Subject]] if the Spark context contains
+    * principal and keytab options, otherwise returns the currently active subject.
+    *
+    * The keytab and principal options should be set when deploying a Spark
+    * application in cluster mode with Yarn against a secure Kudu cluster. Spark
+    * internally will grab HDFS and HBase delegation tokens (see
+    * [[org.apache.spark.deploy.SparkSubmit]]), so we do something similar.
+    *
+    * This method can only be called on the driver, where the SparkContext is
+    * available.
+    *
+    * @return A Kerberos-authenticated subject if the Spark context contains
+    *         principal and keytab options, otherwise returns the currently
+    *         active subject
+    */
+  private def getSubject(sc: SparkContext): Subject = {
+    val subject = Subject.getSubject(AccessController.getContext)
+
+    val principal = sc.getConf.getOption("spark.yarn.principal").getOrElse(return subject)
+    val keytab = sc.getConf.getOption("spark.yarn.keytab").getOrElse(return subject)
+
+    Log.info(s"Logging in as principal $principal with keytab $keytab")
+
+    val conf = new Configuration {
+      override def getAppConfigurationEntry(name: String): Array[AppConfigurationEntry] = {
+        val options = Map(
+          "principal" -> principal,
+          "keyTab" -> keytab,
+          "useKeyTab" -> "true",
+          "useTicketCache" -> "false",
+          "doNotPrompt" -> "true",
+          "refreshKrb5Config" -> "true"
+        )
+
+        Array(new AppConfigurationEntry("com.sun.security.auth.module.Krb5LoginModule",
+                                        AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
+                                        options.asJava))
+      }
+    }
+
+    val loginContext = new LoginContext("kudu-spark", new Subject(), null, conf)
+    loginContext.login()
+    loginContext.getSubject
+  }
+}
+
 private object KuduConnection {
   private[kudu] val syncCache = new mutable.HashMap[String, KuduClient]()
   private[kudu] val asyncCache = new mutable.HashMap[String, AsyncKuduClient]()

http://git-wip-us.apache.org/repos/asf/kudu/blob/a877566e/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/TestContext.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/TestContext.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/TestContext.scala
index f46d290..2748f9a 100644
--- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/TestContext.scala
+++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/TestContext.scala
@@ -77,7 +77,7 @@ trait TestContext extends BeforeAndAfterAll { self: Suite =>
     kuduClient = new KuduClientBuilder(miniCluster.getMasterAddresses).build()
     assert(miniCluster.waitForTabletServers(1))
 
-    kuduContext = new KuduContext(miniCluster.getMasterAddresses)
+    kuduContext = new KuduContext(miniCluster.getMasterAddresses, sc)
 
     val tableOptions = new CreateTableOptions().setRangePartitionColumns(List("key").asJava)
                                                .setNumReplicas(1)