You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by tg...@apache.org on 2015/04/13 16:49:52 UTC
spark git commit: [SPARK-6207] [YARN] [SQL] Adds delegation tokens
for metastore to conf.
Repository: spark
Updated Branches:
refs/heads/master b29663eee -> 77620be76
[SPARK-6207] [YARN] [SQL] Adds delegation tokens for metastore to conf.
Adds hive2-metastore delegation token to conf when running in secure mode.
Without this change, running on YARN in cluster mode fails with a
GSS exception.
This is a rough patch that adds a dependency to spark/yarn on hive-exec.
I'm looking for suggestions on how to make this patch better.
This contribution is my original work and that I licenses the work to the
Apache Spark project under the project's open source licenses.
Author: Doug Balog <doug.balogtarget.com>
Author: Doug Balog <do...@target.com>
Closes #5031 from dougb/SPARK-6207 and squashes the following commits:
3e9ac16 [Doug Balog] [SPARK-6207] Fixes minor code spacing issues.
e260765 [Doug Balog] [SPARK-6207] Second pass at adding Hive delegation token to conf. - Use reflection instead of adding dependency on hive. - Tested on Hive 0.13 and Hadoop 2.4.1
1ab1729 [Doug Balog] Merge branch 'master' of git://github.com/apache/spark into SPARK-6207
bf356d2 [Doug Balog] [SPARK-6207] [YARN] [SQL] Adds delegation tokens for metastore to conf. Adds hive2-metastore delagations token to conf when running in securemode. Without this change, runing on YARN in cluster mode fails with a GSS exception.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/77620be7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/77620be7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/77620be7
Branch: refs/heads/master
Commit: 77620be76e82b6cdaae406cd752d3272656f5fe0
Parents: b29663e
Author: Doug Balog <do...@target.com>
Authored: Mon Apr 13 09:49:58 2015 -0500
Committer: Thomas Graves <tg...@apache.org>
Committed: Mon Apr 13 09:49:58 2015 -0500
----------------------------------------------------------------------
.../org/apache/spark/deploy/yarn/Client.scala | 63 ++++++++++++++++++++
1 file changed, 63 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/77620be7/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index c1effd3..1091ff5 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -22,17 +22,21 @@ import java.nio.ByteBuffer
import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, HashMap, ListBuffer, Map}
+import scala.reflect.runtime.universe
import scala.util.{Try, Success, Failure}
import com.google.common.base.Objects
import org.apache.hadoop.io.DataOutputBuffer
import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
import org.apache.hadoop.fs._
import org.apache.hadoop.fs.permission.FsPermission
+import org.apache.hadoop.io.Text
import org.apache.hadoop.mapred.Master
import org.apache.hadoop.mapreduce.MRJobConfig
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
+import org.apache.hadoop.security.token.Token
import org.apache.hadoop.util.StringUtils
import org.apache.hadoop.yarn.api._
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
@@ -220,6 +224,7 @@ private[spark] class Client(
val dst = new Path(fs.getHomeDirectory(), appStagingDir)
val nns = getNameNodesToAccess(sparkConf) + dst
obtainTokensForNamenodes(nns, hadoopConf, credentials)
+ obtainTokenForHiveMetastore(hadoopConf, credentials)
val replication = sparkConf.getInt("spark.yarn.submit.file.replication",
fs.getDefaultReplication(dst)).toShort
@@ -937,6 +942,64 @@ object Client extends Logging {
}
/**
+ * Obtains token for the Hive metastore and adds them to the credentials.
+ */
+ private def obtainTokenForHiveMetastore(conf: Configuration, credentials: Credentials) {
+ if (UserGroupInformation.isSecurityEnabled) {
+ val mirror = universe.runtimeMirror(getClass.getClassLoader)
+
+ try {
+ val hiveClass = mirror.classLoader.loadClass("org.apache.hadoop.hive.ql.metadata.Hive")
+ val hive = hiveClass.getMethod("get").invoke(null)
+
+ val hiveConf = hiveClass.getMethod("getConf").invoke(hive)
+ val hiveConfClass = mirror.classLoader.loadClass("org.apache.hadoop.hive.conf.HiveConf")
+
+ val hiveConfGet = (param:String) => Option(hiveConfClass
+ .getMethod("get", classOf[java.lang.String])
+ .invoke(hiveConf, param))
+
+ val metastore_uri = hiveConfGet("hive.metastore.uris")
+
+ // Check for local metastore
+ if (metastore_uri != None && metastore_uri.get.toString.size > 0) {
+ val metastore_kerberos_principal_conf_var = mirror.classLoader
+ .loadClass("org.apache.hadoop.hive.conf.HiveConf$ConfVars")
+ .getField("METASTORE_KERBEROS_PRINCIPAL").get("varname").toString
+
+ val principal = hiveConfGet(metastore_kerberos_principal_conf_var)
+
+ val username = Option(UserGroupInformation.getCurrentUser().getUserName)
+ if (principal != None && username != None) {
+ val tokenStr = hiveClass.getMethod("getDelegationToken",
+ classOf[java.lang.String], classOf[java.lang.String])
+ .invoke(hive, username.get, principal.get).asInstanceOf[java.lang.String]
+
+ val hive2Token = new Token[DelegationTokenIdentifier]()
+ hive2Token.decodeFromUrlString(tokenStr)
+ credentials.addToken(new Text("hive.server2.delegation.token"),hive2Token)
+ logDebug("Added hive.Server2.delegation.token to conf.")
+ hiveClass.getMethod("closeCurrent").invoke(null)
+ } else {
+ logError("Username or principal == NULL")
+ logError(s"""username=${username.getOrElse("(NULL)")}""")
+ logError(s"""principal=${principal.getOrElse("(NULL)")}""")
+ throw new IllegalArgumentException("username and/or principal is equal to null!")
+ }
+ } else {
+ logDebug("HiveMetaStore configured in localmode")
+ }
+ } catch {
+ case e:java.lang.NoSuchMethodException => { logInfo("Hive Method not found " + e); return }
+ case e:java.lang.ClassNotFoundException => { logInfo("Hive Class not found " + e); return }
+ case e:Exception => { logError("Unexpected Exception " + e)
+ throw new RuntimeException("Unexpected exception", e)
+ }
+ }
+ }
+ }
+
+ /**
* Return whether the two file systems are the same.
*/
private def compareFs(srcFs: FileSystem, destFs: FileSystem): Boolean = {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org