You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by da...@apache.org on 2017/05/10 00:22:45 UTC
kudu git commit: KUDU-1999: Spark connector should login with
Kerberos credentials on driver
Repository: kudu
Updated Branches:
refs/heads/master c4c796a25 -> a877566e9
KUDU-1999: Spark connector should login with Kerberos credentials on driver
Tested on a secure cluster using the Spark ITBLL job:
spark2-submit \
--deploy-mode=cluster \
--master=yarn \
--principal=dan \
--keytab dan.keytab \
--class org.apache.kudu.spark.tools.IntegrationTestBigLinkedList \
kudu-spark-tools-1.4.0-SNAPSHOT.jar generate \
--master-addrs=kudu-spark-secure-1.gce.cloudera.com
Without some very major changes to our test infrastructure it's not
possible to test this code in unit tests, since it relies on a secure
Yarn cluster being available.
note: long-running jobs will continue to fail, since credentials are
still not refreshed.
Change-Id: If87a470c1cf99ea52668f22b72f1f7331877ec63
Reviewed-on: http://gerrit.cloudera.org:8080/6822
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <to...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/a877566e
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/a877566e
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/a877566e
Branch: refs/heads/master
Commit: a877566e9477242c015758d105c8e616248af7c6
Parents: c4c796a
Author: Dan Burkert <da...@apache.org>
Authored: Mon May 8 17:53:36 2017 -0700
Committer: Dan Burkert <da...@apache.org>
Committed: Wed May 10 00:22:08 2017 +0000
----------------------------------------------------------------------
docs/developing.adoc | 2 +-
.../tools/IntegrationTestBigLinkedList.scala | 2 +-
.../apache/kudu/spark/kudu/DefaultSource.scala | 4 +-
.../apache/kudu/spark/kudu/KuduContext.scala | 86 +++++++++++++++++---
.../apache/kudu/spark/kudu/TestContext.scala | 2 +-
5 files changed, 80 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/a877566e/docs/developing.adoc
----------------------------------------------------------------------
diff --git a/docs/developing.adoc b/docs/developing.adoc
index d1cc0ec..1cc6312 100644
--- a/docs/developing.adoc
+++ b/docs/developing.adoc
@@ -122,7 +122,7 @@ df.registerTempTable("kudu_table")
val filteredDF = sqlContext.sql("select id from kudu_table where id >= 5").show()
// Use KuduContext to create, delete, or write to Kudu tables
-val kuduContext = new KuduContext("kudu.master:7051")
+val kuduContext = new KuduContext("kudu.master:7051", sqlContext.sparkContext)
// Create a new Kudu table from a dataframe schema
// NB: No rows from the dataframe are inserted into the table
http://git-wip-us.apache.org/repos/asf/kudu/blob/a877566e/java/kudu-spark-tools/src/main/scala/org/apache/kudu/spark/tools/IntegrationTestBigLinkedList.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark-tools/src/main/scala/org/apache/kudu/spark/tools/IntegrationTestBigLinkedList.scala b/java/kudu-spark-tools/src/main/scala/org/apache/kudu/spark/tools/IntegrationTestBigLinkedList.scala
index 52647f8..3c82c46 100644
--- a/java/kudu-spark-tools/src/main/scala/org/apache/kudu/spark/tools/IntegrationTestBigLinkedList.scala
+++ b/java/kudu-spark-tools/src/main/scala/org/apache/kudu/spark/tools/IntegrationTestBigLinkedList.scala
@@ -152,7 +152,7 @@ object Generator {
def run(args: Args, sc: SparkContext): Unit = {
- val kc = new KuduContext(args.masterAddrs)
+ val kc = new KuduContext(args.masterAddrs, sc)
val applicationId = sc.applicationId
val client: KuduClient = kc.syncClient
http://git-wip-us.apache.org/repos/asf/kudu/blob/a877566e/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
index 4faaa79..f5da69b 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
@@ -123,14 +123,14 @@ class KuduRelation(private val tableName: String,
private val masterAddrs: String,
private val operationType: OperationType,
private val userSchema: Option[StructType])(
- val sqlContext: SQLContext)
+ val sqlContext: SQLContext)
extends BaseRelation
with PrunedFilteredScan
with InsertableRelation {
import KuduRelation._
- private val context: KuduContext = new KuduContext(masterAddrs)
+ private val context: KuduContext = new KuduContext(masterAddrs, sqlContext.sparkContext)
private val table: KuduTable = context.syncClient.openTable(tableName)
override def unhandledFilters(filters: Array[Filter]): Array[Filter] =
http://git-wip-us.apache.org/repos/asf/kudu/blob/a877566e/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
index 8c80ea0..8ca1e03 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
@@ -17,30 +17,41 @@
package org.apache.kudu.spark.kudu
+import java.security.{AccessController, PrivilegedAction}
import java.util
+import javax.security.auth.Subject
+import javax.security.auth.login.{AppConfigurationEntry, Configuration, LoginContext}
+import scala.collection.JavaConverters._
import scala.collection.mutable
-
import org.apache.hadoop.util.ShutdownHookManager
-import org.apache.spark.SparkContext
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.types.{DataType, DataTypes, StructType}
-import org.apache.spark.sql.{DataFrame, Row}
-
import org.apache.kudu.annotations.InterfaceStability
import org.apache.kudu.client.SessionConfiguration.FlushMode
import org.apache.kudu.client._
+import org.apache.kudu.spark.kudu
import org.apache.kudu.{ColumnSchema, Schema, Type}
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.types.{DataType, DataTypes, StructType}
+import org.apache.spark.sql.{DataFrame, Row}
+import org.slf4j.{Logger, LoggerFactory}
/**
* KuduContext is a serializable container for Kudu client connections.
*
* If a Kudu client connection is needed as part of a Spark application, a
- * [[KuduContext]] should used as a broadcast variable in the job in order to
- * share connections among the tasks in a JVM.
+ * [[KuduContext]] should be created in the driver, and shared with executors
+ * as a serializable field.
*/
@InterfaceStability.Unstable
-class KuduContext(kuduMaster: String) extends Serializable {
+class KuduContext(val kuduMaster: String,
+ sc: SparkContext) extends Serializable {
+ import kudu.KuduContext._
+
+ @Deprecated()
+ def this(kuduMaster: String) {
+ this(kuduMaster, new SparkContext())
+ }
@transient lazy val syncClient = {
val c = KuduConnection.getSyncClient(kuduMaster)
@@ -59,8 +70,11 @@ class KuduContext(kuduMaster: String) extends Serializable {
}
// Visible for testing.
- private[kudu] val authnCredentials : Array[Byte] =
- syncClient.exportAuthenticationCredentials()
+ private[kudu] val authnCredentials : Array[Byte] = {
+ Subject.doAs(getSubject(sc), new PrivilegedAction[Array[Byte]] {
+ override def run(): Array[Byte] = syncClient.exportAuthenticationCredentials()
+ })
+ }
/**
* Create an RDD from a Kudu table.
@@ -240,6 +254,56 @@ class KuduContext(kuduMaster: String) extends Serializable {
}
}
+private object KuduContext {
+ val Log: Logger = LoggerFactory.getLogger(classOf[KuduContext])
+
+ /**
+ * Returns a new Kerberos-authenticated [[Subject]] if the Spark context contains
+ * principal and keytab options, otherwise returns the currently active subject.
+ *
+ * The keytab and principal options should be set when deploying a Spark
+ * application in cluster mode with Yarn against a secure Kudu cluster. Spark
+ * internally will grab HDFS and HBase delegation tokens (see
+ * [[org.apache.spark.deploy.SparkSubmit]]), so we do something similar.
+ *
+ * This method can only be called on the driver, where the SparkContext is
+ * available.
+ *
+ * @return A Kerberos-authenticated subject if the Spark context contains
+ * principal and keytab options, otherwise returns the currently
+ * active subject
+ */
+ private def getSubject(sc: SparkContext): Subject = {
+ val subject = Subject.getSubject(AccessController.getContext)
+
+ val principal = sc.getConf.getOption("spark.yarn.principal").getOrElse(return subject)
+ val keytab = sc.getConf.getOption("spark.yarn.keytab").getOrElse(return subject)
+
+ Log.info(s"Logging in as principal $principal with keytab $keytab")
+
+ val conf = new Configuration {
+ override def getAppConfigurationEntry(name: String): Array[AppConfigurationEntry] = {
+ val options = Map(
+ "principal" -> principal,
+ "keyTab" -> keytab,
+ "useKeyTab" -> "true",
+ "useTicketCache" -> "false",
+ "doNotPrompt" -> "true",
+ "refreshKrb5Config" -> "true"
+ )
+
+ Array(new AppConfigurationEntry("com.sun.security.auth.module.Krb5LoginModule",
+ AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
+ options.asJava))
+ }
+ }
+
+ val loginContext = new LoginContext("kudu-spark", new Subject(), null, conf)
+ loginContext.login()
+ loginContext.getSubject
+ }
+}
+
private object KuduConnection {
private[kudu] val syncCache = new mutable.HashMap[String, KuduClient]()
private[kudu] val asyncCache = new mutable.HashMap[String, AsyncKuduClient]()
http://git-wip-us.apache.org/repos/asf/kudu/blob/a877566e/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/TestContext.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/TestContext.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/TestContext.scala
index f46d290..2748f9a 100644
--- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/TestContext.scala
+++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/TestContext.scala
@@ -77,7 +77,7 @@ trait TestContext extends BeforeAndAfterAll { self: Suite =>
kuduClient = new KuduClientBuilder(miniCluster.getMasterAddresses).build()
assert(miniCluster.waitForTabletServers(1))
- kuduContext = new KuduContext(miniCluster.getMasterAddresses)
+ kuduContext = new KuduContext(miniCluster.getMasterAddresses, sc)
val tableOptions = new CreateTableOptions().setRangePartitionColumns(List("key").asJava)
.setNumReplicas(1)