You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2016/12/07 00:23:40 UTC
[03/18] spark git commit: [SPARK-18662] Move resource managers to
separate directory
http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManager.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManager.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManager.scala
deleted file mode 100644
index c4c07b4..0000000
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManager.scala
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn.security
-
-import java.util.ServiceLoader
-
-import scala.collection.JavaConverters._
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.security.Credentials
-
-import org.apache.spark.SparkConf
-import org.apache.spark.internal.Logging
-import org.apache.spark.util.Utils
-
-/**
- * A ConfigurableCredentialManager to manage all the registered credential providers and offer
- * APIs for other modules to obtain credentials as well as renewal time. By default
- * [[HDFSCredentialProvider]], [[HiveCredentialProvider]] and [[HBaseCredentialProvider]] will
- * be loaded in if not explicitly disabled, any plugged-in credential provider wants to be
- * managed by ConfigurableCredentialManager needs to implement [[ServiceCredentialProvider]]
- * interface and put into resources/META-INF/services to be loaded by ServiceLoader.
- *
- * Also each credential provider is controlled by
- * spark.yarn.security.credentials.{service}.enabled, it will not be loaded in if set to false.
- */
-private[yarn] final class ConfigurableCredentialManager(
- sparkConf: SparkConf, hadoopConf: Configuration) extends Logging {
- private val deprecatedProviderEnabledConfig = "spark.yarn.security.tokens.%s.enabled"
- private val providerEnabledConfig = "spark.yarn.security.credentials.%s.enabled"
-
- // Maintain all the registered credential providers
- private val credentialProviders = {
- val providers = ServiceLoader.load(classOf[ServiceCredentialProvider],
- Utils.getContextOrSparkClassLoader).asScala
-
- // Filter out credentials in which spark.yarn.security.credentials.{service}.enabled is false.
- providers.filter { p =>
- sparkConf.getOption(providerEnabledConfig.format(p.serviceName))
- .orElse {
- sparkConf.getOption(deprecatedProviderEnabledConfig.format(p.serviceName)).map { c =>
- logWarning(s"${deprecatedProviderEnabledConfig.format(p.serviceName)} is deprecated, " +
- s"using ${providerEnabledConfig.format(p.serviceName)} instead")
- c
- }
- }.map(_.toBoolean).getOrElse(true)
- }.map { p => (p.serviceName, p) }.toMap
- }
-
- /**
- * Get credential provider for the specified service.
- */
- def getServiceCredentialProvider(service: String): Option[ServiceCredentialProvider] = {
- credentialProviders.get(service)
- }
-
- /**
- * Obtain credentials from all the registered providers.
- * @return nearest time of next renewal, Long.MaxValue if all the credentials aren't renewable,
- * otherwise the nearest renewal time of any credentials will be returned.
- */
- def obtainCredentials(hadoopConf: Configuration, creds: Credentials): Long = {
- credentialProviders.values.flatMap { provider =>
- if (provider.credentialsRequired(hadoopConf)) {
- provider.obtainCredentials(hadoopConf, sparkConf, creds)
- } else {
- logDebug(s"Service ${provider.serviceName} does not require a token." +
- s" Check your configuration to see if security is disabled or not.")
- None
- }
- }.foldLeft(Long.MaxValue)(math.min)
- }
-
- /**
- * Create an [[AMCredentialRenewer]] instance, caller should be responsible to stop this
- * instance when it is not used. AM will use it to renew credentials periodically.
- */
- def credentialRenewer(): AMCredentialRenewer = {
- new AMCredentialRenewer(sparkConf, hadoopConf, this)
- }
-
- /**
- * Create an [[CredentialUpdater]] instance, caller should be resposible to stop this intance
- * when it is not used. Executors and driver (client mode) will use it to update credentials.
- * periodically.
- */
- def credentialUpdater(): CredentialUpdater = {
- new CredentialUpdater(sparkConf, hadoopConf, this)
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/CredentialUpdater.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/CredentialUpdater.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/CredentialUpdater.scala
deleted file mode 100644
index 5df4fbd..0000000
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/CredentialUpdater.scala
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn.security
-
-import java.util.concurrent.{Executors, TimeUnit}
-
-import scala.util.control.NonFatal
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.security.{Credentials, UserGroupInformation}
-
-import org.apache.spark.SparkConf
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.deploy.yarn.config._
-import org.apache.spark.internal.Logging
-import org.apache.spark.util.{ThreadUtils, Utils}
-
-private[spark] class CredentialUpdater(
- sparkConf: SparkConf,
- hadoopConf: Configuration,
- credentialManager: ConfigurableCredentialManager) extends Logging {
-
- @volatile private var lastCredentialsFileSuffix = 0
-
- private val credentialsFile = sparkConf.get(CREDENTIALS_FILE_PATH)
- private val freshHadoopConf =
- SparkHadoopUtil.get.getConfBypassingFSCache(
- hadoopConf, new Path(credentialsFile).toUri.getScheme)
-
- private val credentialUpdater =
- Executors.newSingleThreadScheduledExecutor(
- ThreadUtils.namedThreadFactory("Credential Refresh Thread"))
-
- // This thread wakes up and picks up new credentials from HDFS, if any.
- private val credentialUpdaterRunnable =
- new Runnable {
- override def run(): Unit = Utils.logUncaughtExceptions(updateCredentialsIfRequired())
- }
-
- /** Start the credential updater task */
- def start(): Unit = {
- val startTime = sparkConf.get(CREDENTIALS_RENEWAL_TIME)
- val remainingTime = startTime - System.currentTimeMillis()
- if (remainingTime <= 0) {
- credentialUpdater.schedule(credentialUpdaterRunnable, 1, TimeUnit.MINUTES)
- } else {
- logInfo(s"Scheduling credentials refresh from HDFS in $remainingTime millis.")
- credentialUpdater.schedule(credentialUpdaterRunnable, remainingTime, TimeUnit.MILLISECONDS)
- }
- }
-
- private def updateCredentialsIfRequired(): Unit = {
- val timeToNextUpdate = try {
- val credentialsFilePath = new Path(credentialsFile)
- val remoteFs = FileSystem.get(freshHadoopConf)
- SparkHadoopUtil.get.listFilesSorted(
- remoteFs, credentialsFilePath.getParent,
- credentialsFilePath.getName, SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION)
- .lastOption.map { credentialsStatus =>
- val suffix = SparkHadoopUtil.get.getSuffixForCredentialsPath(credentialsStatus.getPath)
- if (suffix > lastCredentialsFileSuffix) {
- logInfo("Reading new credentials from " + credentialsStatus.getPath)
- val newCredentials = getCredentialsFromHDFSFile(remoteFs, credentialsStatus.getPath)
- lastCredentialsFileSuffix = suffix
- UserGroupInformation.getCurrentUser.addCredentials(newCredentials)
- logInfo("Credentials updated from credentials file.")
-
- val remainingTime = getTimeOfNextUpdateFromFileName(credentialsStatus.getPath)
- - System.currentTimeMillis()
- if (remainingTime <= 0) TimeUnit.MINUTES.toMillis(1) else remainingTime
- } else {
- // If current credential file is older than expected, sleep 1 hour and check again.
- TimeUnit.HOURS.toMillis(1)
- }
- }.getOrElse {
- // Wait for 1 minute to check again if there's no credential file currently
- TimeUnit.MINUTES.toMillis(1)
- }
- } catch {
- // Since the file may get deleted while we are reading it, catch the Exception and come
- // back in an hour to try again
- case NonFatal(e) =>
- logWarning("Error while trying to update credentials, will try again in 1 hour", e)
- TimeUnit.HOURS.toMillis(1)
- }
-
- credentialUpdater.schedule(
- credentialUpdaterRunnable, timeToNextUpdate, TimeUnit.MILLISECONDS)
- }
-
- private def getCredentialsFromHDFSFile(remoteFs: FileSystem, tokenPath: Path): Credentials = {
- val stream = remoteFs.open(tokenPath)
- try {
- val newCredentials = new Credentials()
- newCredentials.readTokenStorageStream(stream)
- newCredentials
- } finally {
- stream.close()
- }
- }
-
- private def getTimeOfNextUpdateFromFileName(credentialsPath: Path): Long = {
- val name = credentialsPath.getName
- val index = name.lastIndexOf(SparkHadoopUtil.SPARK_YARN_CREDS_COUNTER_DELIM)
- val slice = name.substring(0, index)
- val last2index = slice.lastIndexOf(SparkHadoopUtil.SPARK_YARN_CREDS_COUNTER_DELIM)
- name.substring(last2index + 1, index).toLong
- }
-
- def stop(): Unit = {
- credentialUpdater.shutdown()
- }
-
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HBaseCredentialProvider.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HBaseCredentialProvider.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HBaseCredentialProvider.scala
deleted file mode 100644
index 5571df0..0000000
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HBaseCredentialProvider.scala
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn.security
-
-import scala.reflect.runtime.universe
-import scala.util.control.NonFatal
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.security.Credentials
-import org.apache.hadoop.security.token.{Token, TokenIdentifier}
-
-import org.apache.spark.SparkConf
-import org.apache.spark.internal.Logging
-
-private[security] class HBaseCredentialProvider extends ServiceCredentialProvider with Logging {
-
- override def serviceName: String = "hbase"
-
- override def obtainCredentials(
- hadoopConf: Configuration,
- sparkConf: SparkConf,
- creds: Credentials): Option[Long] = {
- try {
- val mirror = universe.runtimeMirror(getClass.getClassLoader)
- val obtainToken = mirror.classLoader.
- loadClass("org.apache.hadoop.hbase.security.token.TokenUtil").
- getMethod("obtainToken", classOf[Configuration])
-
- logDebug("Attempting to fetch HBase security token.")
- val token = obtainToken.invoke(null, hbaseConf(hadoopConf))
- .asInstanceOf[Token[_ <: TokenIdentifier]]
- logInfo(s"Get token from HBase: ${token.toString}")
- creds.addToken(token.getService, token)
- } catch {
- case NonFatal(e) =>
- logDebug(s"Failed to get token from service $serviceName", e)
- }
-
- None
- }
-
- override def credentialsRequired(hadoopConf: Configuration): Boolean = {
- hbaseConf(hadoopConf).get("hbase.security.authentication") == "kerberos"
- }
-
- private def hbaseConf(conf: Configuration): Configuration = {
- try {
- val mirror = universe.runtimeMirror(getClass.getClassLoader)
- val confCreate = mirror.classLoader.
- loadClass("org.apache.hadoop.hbase.HBaseConfiguration").
- getMethod("create", classOf[Configuration])
- confCreate.invoke(null, conf).asInstanceOf[Configuration]
- } catch {
- case NonFatal(e) =>
- logDebug("Fail to invoke HBaseConfiguration", e)
- conf
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProvider.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProvider.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProvider.scala
deleted file mode 100644
index 8d06d73..0000000
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProvider.scala
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn.security
-
-import java.io.{ByteArrayInputStream, DataInputStream}
-
-import scala.collection.JavaConverters._
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
-import org.apache.hadoop.mapred.Master
-import org.apache.hadoop.security.Credentials
-
-import org.apache.spark.{SparkConf, SparkException}
-import org.apache.spark.deploy.yarn.config._
-import org.apache.spark.internal.Logging
-import org.apache.spark.internal.config._
-
-private[security] class HDFSCredentialProvider extends ServiceCredentialProvider with Logging {
- // Token renewal interval, this value will be set in the first call,
- // if None means no token renewer specified, so cannot get token renewal interval.
- private var tokenRenewalInterval: Option[Long] = null
-
- override val serviceName: String = "hdfs"
-
- override def obtainCredentials(
- hadoopConf: Configuration,
- sparkConf: SparkConf,
- creds: Credentials): Option[Long] = {
- // NameNode to access, used to get tokens from different FileSystems
- nnsToAccess(hadoopConf, sparkConf).foreach { dst =>
- val dstFs = dst.getFileSystem(hadoopConf)
- logInfo("getting token for namenode: " + dst)
- dstFs.addDelegationTokens(getTokenRenewer(hadoopConf), creds)
- }
-
- // Get the token renewal interval if it is not set. It will only be called once.
- if (tokenRenewalInterval == null) {
- tokenRenewalInterval = getTokenRenewalInterval(hadoopConf, sparkConf)
- }
-
- // Get the time of next renewal.
- tokenRenewalInterval.map { interval =>
- creds.getAllTokens.asScala
- .filter(_.getKind == DelegationTokenIdentifier.HDFS_DELEGATION_KIND)
- .map { t =>
- val identifier = new DelegationTokenIdentifier()
- identifier.readFields(new DataInputStream(new ByteArrayInputStream(t.getIdentifier)))
- identifier.getIssueDate + interval
- }.foldLeft(0L)(math.max)
- }
- }
-
- private def getTokenRenewalInterval(
- hadoopConf: Configuration, sparkConf: SparkConf): Option[Long] = {
- // We cannot use the tokens generated with renewer yarn. Trying to renew
- // those will fail with an access control issue. So create new tokens with the logged in
- // user as renewer.
- sparkConf.get(PRINCIPAL).map { renewer =>
- val creds = new Credentials()
- nnsToAccess(hadoopConf, sparkConf).foreach { dst =>
- val dstFs = dst.getFileSystem(hadoopConf)
- dstFs.addDelegationTokens(renewer, creds)
- }
- val t = creds.getAllTokens.asScala
- .filter(_.getKind == DelegationTokenIdentifier.HDFS_DELEGATION_KIND)
- .head
- val newExpiration = t.renew(hadoopConf)
- val identifier = new DelegationTokenIdentifier()
- identifier.readFields(new DataInputStream(new ByteArrayInputStream(t.getIdentifier)))
- val interval = newExpiration - identifier.getIssueDate
- logInfo(s"Renewal Interval is $interval")
- interval
- }
- }
-
- private def getTokenRenewer(conf: Configuration): String = {
- val delegTokenRenewer = Master.getMasterPrincipal(conf)
- logDebug("delegation token renewer is: " + delegTokenRenewer)
- if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
- val errorMessage = "Can't get Master Kerberos principal for use as renewer"
- logError(errorMessage)
- throw new SparkException(errorMessage)
- }
-
- delegTokenRenewer
- }
-
- private def nnsToAccess(hadoopConf: Configuration, sparkConf: SparkConf): Set[Path] = {
- sparkConf.get(NAMENODES_TO_ACCESS).map(new Path(_)).toSet +
- sparkConf.get(STAGING_DIR).map(new Path(_))
- .getOrElse(FileSystem.get(hadoopConf).getHomeDirectory)
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HiveCredentialProvider.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HiveCredentialProvider.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HiveCredentialProvider.scala
deleted file mode 100644
index 16d8fc3..0000000
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HiveCredentialProvider.scala
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn.security
-
-import java.lang.reflect.UndeclaredThrowableException
-import java.security.PrivilegedExceptionAction
-
-import scala.reflect.runtime.universe
-import scala.util.control.NonFatal
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
-import org.apache.hadoop.io.Text
-import org.apache.hadoop.security.{Credentials, UserGroupInformation}
-import org.apache.hadoop.security.token.Token
-
-import org.apache.spark.SparkConf
-import org.apache.spark.internal.Logging
-import org.apache.spark.util.Utils
-
-private[security] class HiveCredentialProvider extends ServiceCredentialProvider with Logging {
-
- override def serviceName: String = "hive"
-
- private def hiveConf(hadoopConf: Configuration): Configuration = {
- try {
- val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader)
- // the hive configuration class is a subclass of Hadoop Configuration, so can be cast down
- // to a Configuration and used without reflection
- val hiveConfClass = mirror.classLoader.loadClass("org.apache.hadoop.hive.conf.HiveConf")
- // using the (Configuration, Class) constructor allows the current configuration to be
- // included in the hive config.
- val ctor = hiveConfClass.getDeclaredConstructor(classOf[Configuration],
- classOf[Object].getClass)
- ctor.newInstance(hadoopConf, hiveConfClass).asInstanceOf[Configuration]
- } catch {
- case NonFatal(e) =>
- logDebug("Fail to create Hive Configuration", e)
- hadoopConf
- }
- }
-
- override def credentialsRequired(hadoopConf: Configuration): Boolean = {
- UserGroupInformation.isSecurityEnabled &&
- hiveConf(hadoopConf).getTrimmed("hive.metastore.uris", "").nonEmpty
- }
-
- override def obtainCredentials(
- hadoopConf: Configuration,
- sparkConf: SparkConf,
- creds: Credentials): Option[Long] = {
- val conf = hiveConf(hadoopConf)
-
- val principalKey = "hive.metastore.kerberos.principal"
- val principal = conf.getTrimmed(principalKey, "")
- require(principal.nonEmpty, s"Hive principal $principalKey undefined")
- val metastoreUri = conf.getTrimmed("hive.metastore.uris", "")
- require(metastoreUri.nonEmpty, "Hive metastore uri undefined")
-
- val currentUser = UserGroupInformation.getCurrentUser()
- logDebug(s"Getting Hive delegation token for ${currentUser.getUserName()} against " +
- s"$principal at $metastoreUri")
-
- val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader)
- val hiveClass = mirror.classLoader.loadClass("org.apache.hadoop.hive.ql.metadata.Hive")
- val hiveConfClass = mirror.classLoader.loadClass("org.apache.hadoop.hive.conf.HiveConf")
- val closeCurrent = hiveClass.getMethod("closeCurrent")
-
- try {
- // get all the instance methods before invoking any
- val getDelegationToken = hiveClass.getMethod("getDelegationToken",
- classOf[String], classOf[String])
- val getHive = hiveClass.getMethod("get", hiveConfClass)
-
- doAsRealUser {
- val hive = getHive.invoke(null, conf)
- val tokenStr = getDelegationToken.invoke(hive, currentUser.getUserName(), principal)
- .asInstanceOf[String]
- val hive2Token = new Token[DelegationTokenIdentifier]()
- hive2Token.decodeFromUrlString(tokenStr)
- logInfo(s"Get Token from hive metastore: ${hive2Token.toString}")
- creds.addToken(new Text("hive.server2.delegation.token"), hive2Token)
- }
- } catch {
- case NonFatal(e) =>
- logDebug(s"Fail to get token from service $serviceName", e)
- } finally {
- Utils.tryLogNonFatalError {
- closeCurrent.invoke(null)
- }
- }
-
- None
- }
-
- /**
- * Run some code as the real logged in user (which may differ from the current user, for
- * example, when using proxying).
- */
- private def doAsRealUser[T](fn: => T): T = {
- val currentUser = UserGroupInformation.getCurrentUser()
- val realUser = Option(currentUser.getRealUser()).getOrElse(currentUser)
-
- // For some reason the Scala-generated anonymous class ends up causing an
- // UndeclaredThrowableException, even if you annotate the method with @throws.
- try {
- realUser.doAs(new PrivilegedExceptionAction[T]() {
- override def run(): T = fn
- })
- } catch {
- case e: UndeclaredThrowableException => throw Option(e.getCause()).getOrElse(e)
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ServiceCredentialProvider.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ServiceCredentialProvider.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ServiceCredentialProvider.scala
deleted file mode 100644
index 4e3fcce..0000000
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ServiceCredentialProvider.scala
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn.security
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.security.{Credentials, UserGroupInformation}
-
-import org.apache.spark.SparkConf
-
-/**
- * A credential provider for a service. User must implement this if they need to access a
- * secure service from Spark.
- */
-trait ServiceCredentialProvider {
-
- /**
- * Name of the service to provide credentials. This name should unique, Spark internally will
- * use this name to differentiate credential provider.
- */
- def serviceName: String
-
- /**
- * To decide whether credential is required for this service. By default it based on whether
- * Hadoop security is enabled.
- */
- def credentialsRequired(hadoopConf: Configuration): Boolean = {
- UserGroupInformation.isSecurityEnabled
- }
-
- /**
- * Obtain credentials for this service and get the time of the next renewal.
- * @param hadoopConf Configuration of current Hadoop Compatible system.
- * @param sparkConf Spark configuration.
- * @param creds Credentials to add tokens and security keys to.
- * @return If this Credential is renewable and can be renewed, return the time of the next
- * renewal, otherwise None should be returned.
- */
- def obtainCredentials(
- hadoopConf: Configuration,
- sparkConf: SparkConf,
- creds: Credentials): Option[Long]
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/main/scala/org/apache/spark/launcher/YarnCommandBuilderUtils.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/launcher/YarnCommandBuilderUtils.scala b/yarn/src/main/scala/org/apache/spark/launcher/YarnCommandBuilderUtils.scala
deleted file mode 100644
index 6c3556a..0000000
--- a/yarn/src/main/scala/org/apache/spark/launcher/YarnCommandBuilderUtils.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.launcher
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.ListBuffer
-import scala.util.Properties
-
-/**
- * Exposes methods from the launcher library that are used by the YARN backend.
- */
-private[spark] object YarnCommandBuilderUtils {
-
- def quoteForBatchScript(arg: String): String = {
- CommandBuilderUtils.quoteForBatchScript(arg)
- }
-
- def findJarsDir(sparkHome: String): String = {
- val scalaVer = Properties.versionNumberString
- .split("\\.")
- .take(2)
- .mkString(".")
- CommandBuilderUtils.findJarsDir(sparkHome, scalaVer, true)
- }
-
- /**
- * Adds the perm gen configuration to the list of java options if needed and not yet added.
- *
- * Note that this method adds the option based on the local JVM version; if the node where
- * the container is running has a different Java version, there's a risk that the option will
- * not be added (e.g. if the AM is running Java 8 but the container's node is set up to use
- * Java 7).
- */
- def addPermGenSizeOpt(args: ListBuffer[String]): Unit = {
- CommandBuilderUtils.addPermGenSizeOpt(args.asJava)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerExtensionService.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerExtensionService.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerExtensionService.scala
deleted file mode 100644
index 4ed2852..0000000
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerExtensionService.scala
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster
-
-import java.util.concurrent.atomic.AtomicBoolean
-
-import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId}
-
-import org.apache.spark.SparkContext
-import org.apache.spark.deploy.yarn.config._
-import org.apache.spark.internal.Logging
-import org.apache.spark.util.Utils
-
-/**
- * An extension service that can be loaded into a Spark YARN scheduler.
- * A Service that can be started and stopped.
- *
- * 1. For implementations to be loadable by `SchedulerExtensionServices`,
- * they must provide an empty constructor.
- * 2. The `stop()` operation MUST be idempotent, and succeed even if `start()` was
- * never invoked.
- */
-trait SchedulerExtensionService {
-
- /**
- * Start the extension service. This should be a no-op if
- * called more than once.
- * @param binding binding to the spark application and YARN
- */
- def start(binding: SchedulerExtensionServiceBinding): Unit
-
- /**
- * Stop the service
- * The `stop()` operation MUST be idempotent, and succeed even if `start()` was
- * never invoked.
- */
- def stop(): Unit
-}
-
-/**
- * Binding information for a [[SchedulerExtensionService]].
- *
- * The attempt ID will be set if the service is started within a YARN application master;
- * there is then a different attempt ID for every time that AM is restarted.
- * When the service binding is instantiated in client mode, there's no attempt ID, as it lacks
- * this information.
- * @param sparkContext current spark context
- * @param applicationId YARN application ID
- * @param attemptId YARN attemptID. This will always be unset in client mode, and always set in
- * cluster mode.
- */
-case class SchedulerExtensionServiceBinding(
- sparkContext: SparkContext,
- applicationId: ApplicationId,
- attemptId: Option[ApplicationAttemptId] = None)
-
-/**
- * Container for [[SchedulerExtensionService]] instances.
- *
- * Loads Extension Services from the configuration property
- * `"spark.yarn.services"`, instantiates and starts them.
- * When stopped, it stops all child entries.
- *
- * The order in which child extension services are started and stopped
- * is undefined.
- */
-private[spark] class SchedulerExtensionServices extends SchedulerExtensionService
- with Logging {
- private var serviceOption: Option[String] = None
- private var services: List[SchedulerExtensionService] = Nil
- private val started = new AtomicBoolean(false)
- private var binding: SchedulerExtensionServiceBinding = _
-
- /**
- * Binding operation will load the named services and call bind on them too; the
- * entire set of services are then ready for `init()` and `start()` calls.
- *
- * @param binding binding to the spark application and YARN
- */
- def start(binding: SchedulerExtensionServiceBinding): Unit = {
- if (started.getAndSet(true)) {
- logWarning("Ignoring re-entrant start operation")
- return
- }
- require(binding.sparkContext != null, "Null context parameter")
- require(binding.applicationId != null, "Null appId parameter")
- this.binding = binding
- val sparkContext = binding.sparkContext
- val appId = binding.applicationId
- val attemptId = binding.attemptId
- logInfo(s"Starting Yarn extension services with app $appId and attemptId $attemptId")
-
- services = sparkContext.conf.get(SCHEDULER_SERVICES).map { sClass =>
- val instance = Utils.classForName(sClass)
- .newInstance()
- .asInstanceOf[SchedulerExtensionService]
- // bind this service
- instance.start(binding)
- logInfo(s"Service $sClass started")
- instance
- }.toList
- }
-
- /**
- * Get the list of services.
- *
- * @return a list of services; Nil until the service is started
- */
- def getServices: List[SchedulerExtensionService] = services
-
- /**
- * Stop the services; idempotent.
- *
- */
- override def stop(): Unit = {
- if (started.getAndSet(false)) {
- logInfo(s"Stopping $this")
- services.foreach { s =>
- Utils.tryLogNonFatalError(s.stop())
- }
- }
- }
-
- override def toString(): String = s"""SchedulerExtensionServices
- |(serviceOption=$serviceOption,
- | services=$services,
- | started=$started)""".stripMargin
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
deleted file mode 100644
index 60da356..0000000
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster
-
-import scala.collection.mutable.ArrayBuffer
-
-import org.apache.hadoop.yarn.api.records.YarnApplicationState
-
-import org.apache.spark.{SparkContext, SparkException}
-import org.apache.spark.deploy.yarn.{Client, ClientArguments, YarnSparkHadoopUtil}
-import org.apache.spark.internal.Logging
-import org.apache.spark.launcher.SparkAppHandle
-import org.apache.spark.scheduler.TaskSchedulerImpl
-
-private[spark] class YarnClientSchedulerBackend(
- scheduler: TaskSchedulerImpl,
- sc: SparkContext)
- extends YarnSchedulerBackend(scheduler, sc)
- with Logging {
-
- private var client: Client = null
- private var monitorThread: MonitorThread = null
-
- /**
- * Create a Yarn client to submit an application to the ResourceManager.
- * This waits until the application is running.
- */
- override def start() {
- val driverHost = conf.get("spark.driver.host")
- val driverPort = conf.get("spark.driver.port")
- val hostport = driverHost + ":" + driverPort
- sc.ui.foreach { ui => conf.set("spark.driver.appUIAddress", ui.webUrl) }
-
- val argsArrayBuf = new ArrayBuffer[String]()
- argsArrayBuf += ("--arg", hostport)
-
- logDebug("ClientArguments called with: " + argsArrayBuf.mkString(" "))
- val args = new ClientArguments(argsArrayBuf.toArray)
- totalExpectedExecutors = YarnSparkHadoopUtil.getInitialTargetExecutorNumber(conf)
- client = new Client(args, conf)
- bindToYarn(client.submitApplication(), None)
-
- // SPARK-8687: Ensure all necessary properties have already been set before
- // we initialize our driver scheduler backend, which serves these properties
- // to the executors
- super.start()
- waitForApplication()
-
- // SPARK-8851: In yarn-client mode, the AM still does the credentials refresh. The driver
- // reads the credentials from HDFS, just like the executors and updates its own credentials
- // cache.
- if (conf.contains("spark.yarn.credentials.file")) {
- YarnSparkHadoopUtil.get.startCredentialUpdater(conf)
- }
- monitorThread = asyncMonitorApplication()
- monitorThread.start()
- }
-
- /**
- * Report the state of the application until it is running.
- * If the application has finished, failed or been killed in the process, throw an exception.
- * This assumes both `client` and `appId` have already been set.
- */
- private def waitForApplication(): Unit = {
- assert(client != null && appId.isDefined, "Application has not been submitted yet!")
- val (state, _) = client.monitorApplication(appId.get, returnOnRunning = true) // blocking
- if (state == YarnApplicationState.FINISHED ||
- state == YarnApplicationState.FAILED ||
- state == YarnApplicationState.KILLED) {
- throw new SparkException("Yarn application has already ended! " +
- "It might have been killed or unable to launch application master.")
- }
- if (state == YarnApplicationState.RUNNING) {
- logInfo(s"Application ${appId.get} has started running.")
- }
- }
-
- /**
- * We create this class for SPARK-9519. Basically when we interrupt the monitor thread it's
- * because the SparkContext is being shut down(sc.stop() called by user code), but if
- * monitorApplication return, it means the Yarn application finished before sc.stop() was called,
- * which means we should call sc.stop() here, and we don't allow the monitor to be interrupted
- * before SparkContext stops successfully.
- */
- private class MonitorThread extends Thread {
- private var allowInterrupt = true
-
- override def run() {
- try {
- val (state, _) = client.monitorApplication(appId.get, logApplicationReport = false)
- logError(s"Yarn application has already exited with state $state!")
- allowInterrupt = false
- sc.stop()
- } catch {
- case e: InterruptedException => logInfo("Interrupting monitor thread")
- }
- }
-
- def stopMonitor(): Unit = {
- if (allowInterrupt) {
- this.interrupt()
- }
- }
- }
-
- /**
- * Monitor the application state in a separate thread.
- * If the application has exited for any reason, stop the SparkContext.
- * This assumes both `client` and `appId` have already been set.
- */
- private def asyncMonitorApplication(): MonitorThread = {
- assert(client != null && appId.isDefined, "Application has not been submitted yet!")
- val t = new MonitorThread
- t.setName("Yarn application state monitor")
- t.setDaemon(true)
- t
- }
-
- /**
- * Stop the scheduler. This assumes `start()` has already been called.
- */
- override def stop() {
- assert(client != null, "Attempted to stop this scheduler before starting it!")
- if (monitorThread != null) {
- monitorThread.stopMonitor()
- }
-
- // Report a final state to the launcher if one is connected. This is needed since in client
- // mode this backend doesn't let the app monitor loop run to completion, so it does not report
- // the final state itself.
- //
- // Note: there's not enough information at this point to provide a better final state,
- // so assume the application was successful.
- client.reportLauncherState(SparkAppHandle.State.FINISHED)
-
- super.stop()
- YarnSparkHadoopUtil.get.stopCredentialUpdater()
- client.stop()
- logInfo("Stopped")
- }
-
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterManager.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterManager.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterManager.scala
deleted file mode 100644
index 64cd1bd..0000000
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterManager.scala
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster
-
-import org.apache.spark.{SparkContext, SparkException}
-import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl}
-
-/**
- * Cluster Manager for creation of Yarn scheduler and backend
- */
-private[spark] class YarnClusterManager extends ExternalClusterManager {
-
- override def canCreate(masterURL: String): Boolean = {
- masterURL == "yarn"
- }
-
- override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = {
- sc.deployMode match {
- case "cluster" => new YarnClusterScheduler(sc)
- case "client" => new YarnScheduler(sc)
- case _ => throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn")
- }
- }
-
- override def createSchedulerBackend(sc: SparkContext,
- masterURL: String,
- scheduler: TaskScheduler): SchedulerBackend = {
- sc.deployMode match {
- case "cluster" =>
- new YarnClusterSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc)
- case "client" =>
- new YarnClientSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc)
- case _ =>
- throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn")
- }
- }
-
- override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = {
- scheduler.asInstanceOf[TaskSchedulerImpl].initialize(backend)
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
deleted file mode 100644
index 96c9151..0000000
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster
-
-import org.apache.spark._
-import org.apache.spark.deploy.yarn.ApplicationMaster
-
-/**
- * This is a simple extension to ClusterScheduler - to ensure that appropriate initialization of
- * ApplicationMaster, etc is done
- */
-private[spark] class YarnClusterScheduler(sc: SparkContext) extends YarnScheduler(sc) {
-
- logInfo("Created YarnClusterScheduler")
-
- override def postStartHook() {
- ApplicationMaster.sparkContextInitialized(sc)
- super.postStartHook()
- logInfo("YarnClusterScheduler.postStartHook done")
- }
-
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
deleted file mode 100644
index 4f3d5eb..0000000
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster
-
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-
-import org.apache.spark.SparkContext
-import org.apache.spark.deploy.yarn.{ApplicationMaster, YarnSparkHadoopUtil}
-import org.apache.spark.scheduler.TaskSchedulerImpl
-import org.apache.spark.util.Utils
-
-private[spark] class YarnClusterSchedulerBackend(
- scheduler: TaskSchedulerImpl,
- sc: SparkContext)
- extends YarnSchedulerBackend(scheduler, sc) {
-
- override def start() {
- val attemptId = ApplicationMaster.getAttemptId
- bindToYarn(attemptId.getApplicationId(), Some(attemptId))
- super.start()
- totalExpectedExecutors = YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sc.conf)
- }
-
- override def getDriverLogUrls: Option[Map[String, String]] = {
- var driverLogs: Option[Map[String, String]] = None
- try {
- val yarnConf = new YarnConfiguration(sc.hadoopConfiguration)
- val containerId = YarnSparkHadoopUtil.get.getContainerId
-
- val httpAddress = System.getenv(Environment.NM_HOST.name()) +
- ":" + System.getenv(Environment.NM_HTTP_PORT.name())
- // lookup appropriate http scheme for container log urls
- val yarnHttpPolicy = yarnConf.get(
- YarnConfiguration.YARN_HTTP_POLICY_KEY,
- YarnConfiguration.YARN_HTTP_POLICY_DEFAULT
- )
- val user = Utils.getCurrentUserName()
- val httpScheme = if (yarnHttpPolicy == "HTTPS_ONLY") "https://" else "http://"
- val baseUrl = s"$httpScheme$httpAddress/node/containerlogs/$containerId/$user"
- logDebug(s"Base URL for logs: $baseUrl")
- driverLogs = Some(Map(
- "stdout" -> s"$baseUrl/stdout?start=-4096",
- "stderr" -> s"$baseUrl/stderr?start=-4096"))
- } catch {
- case e: Exception =>
- logInfo("Error while building AM log links, so AM" +
- " logs link will not appear in application UI", e)
- }
- driverLogs
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnScheduler.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnScheduler.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnScheduler.scala
deleted file mode 100644
index 0293821..0000000
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnScheduler.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster
-
-import org.apache.hadoop.yarn.util.RackResolver
-import org.apache.log4j.{Level, Logger}
-
-import org.apache.spark._
-import org.apache.spark.scheduler.TaskSchedulerImpl
-import org.apache.spark.util.Utils
-
-private[spark] class YarnScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc) {
-
- // RackResolver logs an INFO message whenever it resolves a rack, which is way too often.
- if (Logger.getLogger(classOf[RackResolver]).getLevel == null) {
- Logger.getLogger(classOf[RackResolver]).setLevel(Level.WARN)
- }
-
- // By default, rack is unknown
- override def getRackForHost(hostPort: String): Option[String] = {
- val host = Utils.parseHostPort(hostPort)._1
- Option(RackResolver.resolve(sc.hadoopConfiguration, host).getNetworkLocation)
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
deleted file mode 100644
index 2f9ea19..0000000
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ /dev/null
@@ -1,315 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster
-
-import scala.concurrent.{ExecutionContext, Future}
-import scala.util.{Failure, Success}
-import scala.util.control.NonFatal
-
-import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId}
-
-import org.apache.spark.SparkContext
-import org.apache.spark.internal.Logging
-import org.apache.spark.rpc._
-import org.apache.spark.scheduler._
-import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
-import org.apache.spark.ui.JettyUtils
-import org.apache.spark.util.{RpcUtils, ThreadUtils}
-
-/**
- * Abstract Yarn scheduler backend that contains common logic
- * between the client and cluster Yarn scheduler backends.
- */
-private[spark] abstract class YarnSchedulerBackend(
- scheduler: TaskSchedulerImpl,
- sc: SparkContext)
- extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) {
-
- override val minRegisteredRatio =
- if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
- 0.8
- } else {
- super.minRegisteredRatio
- }
-
- protected var totalExpectedExecutors = 0
-
- private val yarnSchedulerEndpoint = new YarnSchedulerEndpoint(rpcEnv)
-
- private val yarnSchedulerEndpointRef = rpcEnv.setupEndpoint(
- YarnSchedulerBackend.ENDPOINT_NAME, yarnSchedulerEndpoint)
-
- private implicit val askTimeout = RpcUtils.askRpcTimeout(sc.conf)
-
- /** Application ID. */
- protected var appId: Option[ApplicationId] = None
-
- /** Attempt ID. This is unset for client-mode schedulers */
- private var attemptId: Option[ApplicationAttemptId] = None
-
- /** Scheduler extension services. */
- private val services: SchedulerExtensionServices = new SchedulerExtensionServices()
-
- // Flag to specify whether this schedulerBackend should be reset.
- private var shouldResetOnAmRegister = false
-
- /**
- * Bind to YARN. This *must* be done before calling [[start()]].
- *
- * @param appId YARN application ID
- * @param attemptId Optional YARN attempt ID
- */
- protected def bindToYarn(appId: ApplicationId, attemptId: Option[ApplicationAttemptId]): Unit = {
- this.appId = Some(appId)
- this.attemptId = attemptId
- }
-
- override def start() {
- require(appId.isDefined, "application ID unset")
- val binding = SchedulerExtensionServiceBinding(sc, appId.get, attemptId)
- services.start(binding)
- super.start()
- }
-
- override def stop(): Unit = {
- try {
- // SPARK-12009: To prevent Yarn allocator from requesting backup for the executors which
- // was Stopped by SchedulerBackend.
- requestTotalExecutors(0, 0, Map.empty)
- super.stop()
- } finally {
- services.stop()
- }
- }
-
- /**
- * Get the attempt ID for this run, if the cluster manager supports multiple
- * attempts. Applications run in client mode will not have attempt IDs.
- * This attempt ID only includes attempt counter, like "1", "2".
- *
- * @return The application attempt id, if available.
- */
- override def applicationAttemptId(): Option[String] = {
- attemptId.map(_.getAttemptId.toString)
- }
-
- /**
- * Get an application ID associated with the job.
- * This returns the string value of [[appId]] if set, otherwise
- * the locally-generated ID from the superclass.
- * @return The application ID
- */
- override def applicationId(): String = {
- appId.map(_.toString).getOrElse {
- logWarning("Application ID is not initialized yet.")
- super.applicationId
- }
- }
-
- /**
- * Request executors from the ApplicationMaster by specifying the total number desired.
- * This includes executors already pending or running.
- */
- override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = {
- yarnSchedulerEndpointRef.ask[Boolean](
- RequestExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount))
- }
-
- /**
- * Request that the ApplicationMaster kill the specified executors.
- */
- override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = {
- yarnSchedulerEndpointRef.ask[Boolean](KillExecutors(executorIds))
- }
-
- override def sufficientResourcesRegistered(): Boolean = {
- totalRegisteredExecutors.get() >= totalExpectedExecutors * minRegisteredRatio
- }
-
- /**
- * Add filters to the SparkUI.
- */
- private def addWebUIFilter(
- filterName: String,
- filterParams: Map[String, String],
- proxyBase: String): Unit = {
- if (proxyBase != null && proxyBase.nonEmpty) {
- System.setProperty("spark.ui.proxyBase", proxyBase)
- }
-
- val hasFilter =
- filterName != null && filterName.nonEmpty &&
- filterParams != null && filterParams.nonEmpty
- if (hasFilter) {
- logInfo(s"Add WebUI Filter. $filterName, $filterParams, $proxyBase")
- conf.set("spark.ui.filters", filterName)
- filterParams.foreach { case (k, v) => conf.set(s"spark.$filterName.param.$k", v) }
- scheduler.sc.ui.foreach { ui => JettyUtils.addFilters(ui.getHandlers, conf) }
- }
- }
-
- override def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = {
- new YarnDriverEndpoint(rpcEnv, properties)
- }
-
- /**
- * Reset the state of SchedulerBackend to the initial state. This is happened when AM is failed
- * and re-registered itself to driver after a failure. The stale state in driver should be
- * cleaned.
- */
- override protected def reset(): Unit = {
- super.reset()
- sc.executorAllocationManager.foreach(_.reset())
- }
-
- /**
- * Override the DriverEndpoint to add extra logic for the case when an executor is disconnected.
- * This endpoint communicates with the executors and queries the AM for an executor's exit
- * status when the executor is disconnected.
- */
- private class YarnDriverEndpoint(rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
- extends DriverEndpoint(rpcEnv, sparkProperties) {
-
- /**
- * When onDisconnected is received at the driver endpoint, the superclass DriverEndpoint
- * handles it by assuming the Executor was lost for a bad reason and removes the executor
- * immediately.
- *
- * In YARN's case however it is crucial to talk to the application master and ask why the
- * executor had exited. If the executor exited for some reason unrelated to the running tasks
- * (e.g., preemption), according to the application master, then we pass that information down
- * to the TaskSetManager to inform the TaskSetManager that tasks on that lost executor should
- * not count towards a job failure.
- */
- override def onDisconnected(rpcAddress: RpcAddress): Unit = {
- addressToExecutorId.get(rpcAddress).foreach { executorId =>
- if (disableExecutor(executorId)) {
- yarnSchedulerEndpoint.handleExecutorDisconnectedFromDriver(executorId, rpcAddress)
- }
- }
- }
- }
-
- /**
- * An [[RpcEndpoint]] that communicates with the ApplicationMaster.
- */
- private class YarnSchedulerEndpoint(override val rpcEnv: RpcEnv)
- extends ThreadSafeRpcEndpoint with Logging {
- private var amEndpoint: Option[RpcEndpointRef] = None
-
- private[YarnSchedulerBackend] def handleExecutorDisconnectedFromDriver(
- executorId: String,
- executorRpcAddress: RpcAddress): Unit = {
- val removeExecutorMessage = amEndpoint match {
- case Some(am) =>
- val lossReasonRequest = GetExecutorLossReason(executorId)
- am.ask[ExecutorLossReason](lossReasonRequest, askTimeout)
- .map { reason => RemoveExecutor(executorId, reason) }(ThreadUtils.sameThread)
- .recover {
- case NonFatal(e) =>
- logWarning(s"Attempted to get executor loss reason" +
- s" for executor id ${executorId} at RPC address ${executorRpcAddress}," +
- s" but got no response. Marking as slave lost.", e)
- RemoveExecutor(executorId, SlaveLost())
- }(ThreadUtils.sameThread)
- case None =>
- logWarning("Attempted to check for an executor loss reason" +
- " before the AM has registered!")
- Future.successful(RemoveExecutor(executorId, SlaveLost("AM is not yet registered.")))
- }
-
- removeExecutorMessage
- .flatMap { message =>
- driverEndpoint.ask[Boolean](message)
- }(ThreadUtils.sameThread)
- .onFailure {
- case NonFatal(e) => logError(
- s"Error requesting driver to remove executor $executorId after disconnection.", e)
- }(ThreadUtils.sameThread)
- }
-
- override def receive: PartialFunction[Any, Unit] = {
- case RegisterClusterManager(am) =>
- logInfo(s"ApplicationMaster registered as $am")
- amEndpoint = Option(am)
- if (!shouldResetOnAmRegister) {
- shouldResetOnAmRegister = true
- } else {
- // AM is already registered before, this potentially means that AM failed and
- // a new one registered after the failure. This will only happen in yarn-client mode.
- reset()
- }
-
- case AddWebUIFilter(filterName, filterParams, proxyBase) =>
- addWebUIFilter(filterName, filterParams, proxyBase)
-
- case r @ RemoveExecutor(executorId, reason) =>
- logWarning(reason.toString)
- driverEndpoint.ask[Boolean](r).onFailure {
- case e =>
- logError("Error requesting driver to remove executor" +
- s" $executorId for reason $reason", e)
- }(ThreadUtils.sameThread)
- }
-
-
- override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
- case r: RequestExecutors =>
- amEndpoint match {
- case Some(am) =>
- am.ask[Boolean](r).andThen {
- case Success(b) => context.reply(b)
- case Failure(NonFatal(e)) =>
- logError(s"Sending $r to AM was unsuccessful", e)
- context.sendFailure(e)
- }(ThreadUtils.sameThread)
- case None =>
- logWarning("Attempted to request executors before the AM has registered!")
- context.reply(false)
- }
-
- case k: KillExecutors =>
- amEndpoint match {
- case Some(am) =>
- am.ask[Boolean](k).andThen {
- case Success(b) => context.reply(b)
- case Failure(NonFatal(e)) =>
- logError(s"Sending $k to AM was unsuccessful", e)
- context.sendFailure(e)
- }(ThreadUtils.sameThread)
- case None =>
- logWarning("Attempted to kill executors before the AM has registered!")
- context.reply(false)
- }
-
- case RetrieveLastAllocatedExecutorId =>
- context.reply(currentExecutorIdCounter)
- }
-
- override def onDisconnected(remoteAddress: RpcAddress): Unit = {
- if (amEndpoint.exists(_.address == remoteAddress)) {
- logWarning(s"ApplicationMaster has disassociated: $remoteAddress")
- amEndpoint = None
- }
- }
- }
-}
-
-private[spark] object YarnSchedulerBackend {
- val ENDPOINT_NAME = "YarnScheduler"
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/test/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
----------------------------------------------------------------------
diff --git a/yarn/src/test/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider b/yarn/src/test/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
deleted file mode 100644
index d0ef5ef..0000000
--- a/yarn/src/test/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
+++ /dev/null
@@ -1 +0,0 @@
-org.apache.spark.deploy.yarn.security.TestCredentialProvider
http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/yarn/src/test/resources/log4j.properties b/yarn/src/test/resources/log4j.properties
deleted file mode 100644
index d13454d..0000000
--- a/yarn/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,31 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# Set everything to be logged to the file target/unit-tests.log
-log4j.rootCategory=DEBUG, file
-log4j.appender.file=org.apache.log4j.FileAppender
-log4j.appender.file.append=true
-log4j.appender.file.file=target/unit-tests.log
-log4j.appender.file.layout=org.apache.log4j.PatternLayout
-log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
-
-# Ignore messages below warning level from a few verbose libraries.
-log4j.logger.com.sun.jersey=WARN
-log4j.logger.org.apache.hadoop=WARN
-log4j.logger.org.eclipse.jetty=WARN
-log4j.logger.org.mortbay=WARN
-log4j.logger.org.spark_project.jetty=WARN
http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
----------------------------------------------------------------------
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
deleted file mode 100644
index 9c3b18e..0000000
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import java.io.{File, FileOutputStream, OutputStreamWriter}
-import java.nio.charset.StandardCharsets
-import java.util.Properties
-import java.util.concurrent.TimeUnit
-
-import scala.collection.JavaConverters._
-import scala.concurrent.duration._
-import scala.language.postfixOps
-
-import com.google.common.io.Files
-import org.apache.commons.lang3.SerializationUtils
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.server.MiniYARNCluster
-import org.scalatest.{BeforeAndAfterAll, Matchers}
-import org.scalatest.concurrent.Eventually._
-
-import org.apache.spark._
-import org.apache.spark.deploy.yarn.config._
-import org.apache.spark.internal.Logging
-import org.apache.spark.launcher._
-import org.apache.spark.util.Utils
-
-abstract class BaseYarnClusterSuite
- extends SparkFunSuite with BeforeAndAfterAll with Matchers with Logging {
-
- // log4j configuration for the YARN containers, so that their output is collected
- // by YARN instead of trying to overwrite unit-tests.log.
- protected val LOG4J_CONF = """
- |log4j.rootCategory=DEBUG, console
- |log4j.appender.console=org.apache.log4j.ConsoleAppender
- |log4j.appender.console.target=System.err
- |log4j.appender.console.layout=org.apache.log4j.PatternLayout
- |log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
- |log4j.logger.org.apache.hadoop=WARN
- |log4j.logger.org.eclipse.jetty=WARN
- |log4j.logger.org.mortbay=WARN
- |log4j.logger.org.spark_project.jetty=WARN
- """.stripMargin
-
- private var yarnCluster: MiniYARNCluster = _
- protected var tempDir: File = _
- private var fakeSparkJar: File = _
- protected var hadoopConfDir: File = _
- private var logConfDir: File = _
-
- var oldSystemProperties: Properties = null
-
- def newYarnConfig(): YarnConfiguration
-
- override def beforeAll() {
- super.beforeAll()
- oldSystemProperties = SerializationUtils.clone(System.getProperties)
-
- tempDir = Utils.createTempDir()
- logConfDir = new File(tempDir, "log4j")
- logConfDir.mkdir()
- System.setProperty("SPARK_YARN_MODE", "true")
-
- val logConfFile = new File(logConfDir, "log4j.properties")
- Files.write(LOG4J_CONF, logConfFile, StandardCharsets.UTF_8)
-
- // Disable the disk utilization check to avoid the test hanging when people's disks are
- // getting full.
- val yarnConf = newYarnConfig()
- yarnConf.set("yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage",
- "100.0")
-
- yarnCluster = new MiniYARNCluster(getClass().getName(), 1, 1, 1)
- yarnCluster.init(yarnConf)
- yarnCluster.start()
-
- // There's a race in MiniYARNCluster in which start() may return before the RM has updated
- // its address in the configuration. You can see this in the logs by noticing that when
- // MiniYARNCluster prints the address, it still has port "0" assigned, although later the
- // test works sometimes:
- //
- // INFO MiniYARNCluster: MiniYARN ResourceManager address: blah:0
- //
- // That log message prints the contents of the RM_ADDRESS config variable. If you check it
- // later on, it looks something like this:
- //
- // INFO YarnClusterSuite: RM address in configuration is blah:42631
- //
- // This hack loops for a bit waiting for the port to change, and fails the test if it hasn't
- // done so in a timely manner (defined to be 10 seconds).
- val config = yarnCluster.getConfig()
- val deadline = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(10)
- while (config.get(YarnConfiguration.RM_ADDRESS).split(":")(1) == "0") {
- if (System.currentTimeMillis() > deadline) {
- throw new IllegalStateException("Timed out waiting for RM to come up.")
- }
- logDebug("RM address still not set in configuration, waiting...")
- TimeUnit.MILLISECONDS.sleep(100)
- }
-
- logInfo(s"RM address in configuration is ${config.get(YarnConfiguration.RM_ADDRESS)}")
-
- fakeSparkJar = File.createTempFile("sparkJar", null, tempDir)
- hadoopConfDir = new File(tempDir, Client.LOCALIZED_CONF_DIR)
- assert(hadoopConfDir.mkdir())
- File.createTempFile("token", ".txt", hadoopConfDir)
- }
-
- override def afterAll() {
- try {
- yarnCluster.stop()
- } finally {
- System.setProperties(oldSystemProperties)
- super.afterAll()
- }
- }
-
- protected def runSpark(
- clientMode: Boolean,
- klass: String,
- appArgs: Seq[String] = Nil,
- sparkArgs: Seq[(String, String)] = Nil,
- extraClassPath: Seq[String] = Nil,
- extraJars: Seq[String] = Nil,
- extraConf: Map[String, String] = Map(),
- extraEnv: Map[String, String] = Map()): SparkAppHandle.State = {
- val deployMode = if (clientMode) "client" else "cluster"
- val propsFile = createConfFile(extraClassPath = extraClassPath, extraConf = extraConf)
- val env = Map("YARN_CONF_DIR" -> hadoopConfDir.getAbsolutePath()) ++ extraEnv
-
- val launcher = new SparkLauncher(env.asJava)
- if (klass.endsWith(".py")) {
- launcher.setAppResource(klass)
- } else {
- launcher.setMainClass(klass)
- launcher.setAppResource(fakeSparkJar.getAbsolutePath())
- }
- launcher.setSparkHome(sys.props("spark.test.home"))
- .setMaster("yarn")
- .setDeployMode(deployMode)
- .setConf("spark.executor.instances", "1")
- .setPropertiesFile(propsFile)
- .addAppArgs(appArgs.toArray: _*)
-
- sparkArgs.foreach { case (name, value) =>
- if (value != null) {
- launcher.addSparkArg(name, value)
- } else {
- launcher.addSparkArg(name)
- }
- }
- extraJars.foreach(launcher.addJar)
-
- val handle = launcher.startApplication()
- try {
- eventually(timeout(2 minutes), interval(1 second)) {
- assert(handle.getState().isFinal())
- }
- } finally {
- handle.kill()
- }
-
- handle.getState()
- }
-
- /**
- * This is a workaround for an issue with yarn-cluster mode: the Client class will not provide
- * any sort of error when the job process finishes successfully, but the job itself fails. So
- * the tests enforce that something is written to a file after everything is ok to indicate
- * that the job succeeded.
- */
- protected def checkResult(finalState: SparkAppHandle.State, result: File): Unit = {
- checkResult(finalState, result, "success")
- }
-
- protected def checkResult(
- finalState: SparkAppHandle.State,
- result: File,
- expected: String): Unit = {
- finalState should be (SparkAppHandle.State.FINISHED)
- val resultString = Files.toString(result, StandardCharsets.UTF_8)
- resultString should be (expected)
- }
-
- protected def mainClassName(klass: Class[_]): String = {
- klass.getName().stripSuffix("$")
- }
-
- protected def createConfFile(
- extraClassPath: Seq[String] = Nil,
- extraConf: Map[String, String] = Map()): String = {
- val props = new Properties()
- props.put(SPARK_JARS.key, "local:" + fakeSparkJar.getAbsolutePath())
-
- val testClasspath = new TestClasspathBuilder()
- .buildClassPath(
- logConfDir.getAbsolutePath() +
- File.pathSeparator +
- extraClassPath.mkString(File.pathSeparator))
- .asScala
- .mkString(File.pathSeparator)
-
- props.put("spark.driver.extraClassPath", testClasspath)
- props.put("spark.executor.extraClassPath", testClasspath)
-
- // SPARK-4267: make sure java options are propagated correctly.
- props.setProperty("spark.driver.extraJavaOptions", "-Dfoo=\"one two three\"")
- props.setProperty("spark.executor.extraJavaOptions", "-Dfoo=\"one two three\"")
-
- yarnCluster.getConfig().asScala.foreach { e =>
- props.setProperty("spark.hadoop." + e.getKey(), e.getValue())
- }
- sys.props.foreach { case (k, v) =>
- if (k.startsWith("spark.")) {
- props.setProperty(k, v)
- }
- }
- extraConf.foreach { case (k, v) => props.setProperty(k, v) }
-
- val propsFile = File.createTempFile("spark", ".properties", tempDir)
- val writer = new OutputStreamWriter(new FileOutputStream(propsFile), StandardCharsets.UTF_8)
- props.store(writer, "Spark properties.")
- writer.close()
- propsFile.getAbsolutePath()
- }
-
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
----------------------------------------------------------------------
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
deleted file mode 100644
index b696e08..0000000
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
+++ /dev/null
@@ -1,204 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import java.net.URI
-
-import scala.collection.mutable.HashMap
-import scala.collection.mutable.Map
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.FileStatus
-import org.apache.hadoop.fs.FileSystem
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.yarn.api.records.LocalResource
-import org.apache.hadoop.yarn.api.records.LocalResourceType
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility
-import org.apache.hadoop.yarn.util.ConverterUtils
-import org.mockito.Mockito.when
-import org.scalatest.mock.MockitoSugar
-
-import org.apache.spark.{SparkConf, SparkFunSuite}
-import org.apache.spark.deploy.yarn.config._
-
-class ClientDistributedCacheManagerSuite extends SparkFunSuite with MockitoSugar {
-
- class MockClientDistributedCacheManager extends ClientDistributedCacheManager {
- override def getVisibility(conf: Configuration, uri: URI, statCache: Map[URI, FileStatus]):
- LocalResourceVisibility = {
- LocalResourceVisibility.PRIVATE
- }
- }
-
- test("test getFileStatus empty") {
- val distMgr = new ClientDistributedCacheManager()
- val fs = mock[FileSystem]
- val uri = new URI("/tmp/testing")
- when(fs.getFileStatus(new Path(uri))).thenReturn(new FileStatus())
- val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
- val stat = distMgr.getFileStatus(fs, uri, statCache)
- assert(stat.getPath() === null)
- }
-
- test("test getFileStatus cached") {
- val distMgr = new ClientDistributedCacheManager()
- val fs = mock[FileSystem]
- val uri = new URI("/tmp/testing")
- val realFileStatus = new FileStatus(10, false, 1, 1024, 10, 10, null, "testOwner",
- null, new Path("/tmp/testing"))
- when(fs.getFileStatus(new Path(uri))).thenReturn(new FileStatus())
- val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus](uri -> realFileStatus)
- val stat = distMgr.getFileStatus(fs, uri, statCache)
- assert(stat.getPath().toString() === "/tmp/testing")
- }
-
- test("test addResource") {
- val distMgr = new MockClientDistributedCacheManager()
- val fs = mock[FileSystem]
- val conf = new Configuration()
- val destPath = new Path("file:///foo.invalid.com:8080/tmp/testing")
- val localResources = HashMap[String, LocalResource]()
- val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
- when(fs.getFileStatus(destPath)).thenReturn(new FileStatus())
-
- distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, "link",
- statCache, false)
- val resource = localResources("link")
- assert(resource.getVisibility() === LocalResourceVisibility.PRIVATE)
- assert(ConverterUtils.getPathFromYarnURL(resource.getResource()) === destPath)
- assert(resource.getTimestamp() === 0)
- assert(resource.getSize() === 0)
- assert(resource.getType() === LocalResourceType.FILE)
-
- val sparkConf = new SparkConf(false)
- distMgr.updateConfiguration(sparkConf)
- assert(sparkConf.get(CACHED_FILES) === Seq("file:/foo.invalid.com:8080/tmp/testing#link"))
- assert(sparkConf.get(CACHED_FILES_TIMESTAMPS) === Seq(0L))
- assert(sparkConf.get(CACHED_FILES_SIZES) === Seq(0L))
- assert(sparkConf.get(CACHED_FILES_VISIBILITIES) === Seq(LocalResourceVisibility.PRIVATE.name()))
- assert(sparkConf.get(CACHED_FILES_TYPES) === Seq(LocalResourceType.FILE.name()))
-
- // add another one and verify both there and order correct
- val realFileStatus = new FileStatus(20, false, 1, 1024, 10, 30, null, "testOwner",
- null, new Path("/tmp/testing2"))
- val destPath2 = new Path("file:///foo.invalid.com:8080/tmp/testing2")
- when(fs.getFileStatus(destPath2)).thenReturn(realFileStatus)
- distMgr.addResource(fs, conf, destPath2, localResources, LocalResourceType.FILE, "link2",
- statCache, false)
- val resource2 = localResources("link2")
- assert(resource2.getVisibility() === LocalResourceVisibility.PRIVATE)
- assert(ConverterUtils.getPathFromYarnURL(resource2.getResource()) === destPath2)
- assert(resource2.getTimestamp() === 10)
- assert(resource2.getSize() === 20)
- assert(resource2.getType() === LocalResourceType.FILE)
-
- val sparkConf2 = new SparkConf(false)
- distMgr.updateConfiguration(sparkConf2)
-
- val files = sparkConf2.get(CACHED_FILES)
- val sizes = sparkConf2.get(CACHED_FILES_SIZES)
- val timestamps = sparkConf2.get(CACHED_FILES_TIMESTAMPS)
- val visibilities = sparkConf2.get(CACHED_FILES_VISIBILITIES)
-
- assert(files(0) === "file:/foo.invalid.com:8080/tmp/testing#link")
- assert(timestamps(0) === 0)
- assert(sizes(0) === 0)
- assert(visibilities(0) === LocalResourceVisibility.PRIVATE.name())
-
- assert(files(1) === "file:/foo.invalid.com:8080/tmp/testing2#link2")
- assert(timestamps(1) === 10)
- assert(sizes(1) === 20)
- assert(visibilities(1) === LocalResourceVisibility.PRIVATE.name())
- }
-
- test("test addResource link null") {
- val distMgr = new MockClientDistributedCacheManager()
- val fs = mock[FileSystem]
- val conf = new Configuration()
- val destPath = new Path("file:///foo.invalid.com:8080/tmp/testing")
- val localResources = HashMap[String, LocalResource]()
- val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
- when(fs.getFileStatus(destPath)).thenReturn(new FileStatus())
-
- intercept[Exception] {
- distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, null,
- statCache, false)
- }
- assert(localResources.get("link") === None)
- assert(localResources.size === 0)
- }
-
- test("test addResource appmaster only") {
- val distMgr = new MockClientDistributedCacheManager()
- val fs = mock[FileSystem]
- val conf = new Configuration()
- val destPath = new Path("file:///foo.invalid.com:8080/tmp/testing")
- val localResources = HashMap[String, LocalResource]()
- val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
- val realFileStatus = new FileStatus(20, false, 1, 1024, 10, 30, null, "testOwner",
- null, new Path("/tmp/testing"))
- when(fs.getFileStatus(destPath)).thenReturn(realFileStatus)
-
- distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE, "link",
- statCache, true)
- val resource = localResources("link")
- assert(resource.getVisibility() === LocalResourceVisibility.PRIVATE)
- assert(ConverterUtils.getPathFromYarnURL(resource.getResource()) === destPath)
- assert(resource.getTimestamp() === 10)
- assert(resource.getSize() === 20)
- assert(resource.getType() === LocalResourceType.ARCHIVE)
-
- val sparkConf = new SparkConf(false)
- distMgr.updateConfiguration(sparkConf)
- assert(sparkConf.get(CACHED_FILES) === Nil)
- assert(sparkConf.get(CACHED_FILES_TIMESTAMPS) === Nil)
- assert(sparkConf.get(CACHED_FILES_SIZES) === Nil)
- assert(sparkConf.get(CACHED_FILES_VISIBILITIES) === Nil)
- assert(sparkConf.get(CACHED_FILES_TYPES) === Nil)
- }
-
- test("test addResource archive") {
- val distMgr = new MockClientDistributedCacheManager()
- val fs = mock[FileSystem]
- val conf = new Configuration()
- val destPath = new Path("file:///foo.invalid.com:8080/tmp/testing")
- val localResources = HashMap[String, LocalResource]()
- val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
- val realFileStatus = new FileStatus(20, false, 1, 1024, 10, 30, null, "testOwner",
- null, new Path("/tmp/testing"))
- when(fs.getFileStatus(destPath)).thenReturn(realFileStatus)
-
- distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE, "link",
- statCache, false)
- val resource = localResources("link")
- assert(resource.getVisibility() === LocalResourceVisibility.PRIVATE)
- assert(ConverterUtils.getPathFromYarnURL(resource.getResource()) === destPath)
- assert(resource.getTimestamp() === 10)
- assert(resource.getSize() === 20)
- assert(resource.getType() === LocalResourceType.ARCHIVE)
-
- val sparkConf = new SparkConf(false)
- distMgr.updateConfiguration(sparkConf)
- assert(sparkConf.get(CACHED_FILES) === Seq("file:/foo.invalid.com:8080/tmp/testing#link"))
- assert(sparkConf.get(CACHED_FILES_SIZES) === Seq(20L))
- assert(sparkConf.get(CACHED_FILES_TIMESTAMPS) === Seq(10L))
- assert(sparkConf.get(CACHED_FILES_VISIBILITIES) === Seq(LocalResourceVisibility.PRIVATE.name()))
- assert(sparkConf.get(CACHED_FILES_TYPES) === Seq(LocalResourceType.ARCHIVE.name()))
- }
-
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org