You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2016/12/07 00:23:40 UTC

[03/18] spark git commit: [SPARK-18662] Move resource managers to separate directory

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManager.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManager.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManager.scala
deleted file mode 100644
index c4c07b4..0000000
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManager.scala
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn.security
-
-import java.util.ServiceLoader
-
-import scala.collection.JavaConverters._
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.security.Credentials
-
-import org.apache.spark.SparkConf
-import org.apache.spark.internal.Logging
-import org.apache.spark.util.Utils
-
-/**
- * A ConfigurableCredentialManager to manage all the registered credential providers and offer
- * APIs for other modules to obtain credentials as well as renewal time. By default
- * [[HDFSCredentialProvider]], [[HiveCredentialProvider]] and [[HBaseCredentialProvider]] will
- * be loaded in if not explicitly disabled, any plugged-in credential provider wants to be
- * managed by ConfigurableCredentialManager needs to implement [[ServiceCredentialProvider]]
- * interface and put into resources/META-INF/services to be loaded by ServiceLoader.
- *
- * Also each credential provider is controlled by
- * spark.yarn.security.credentials.{service}.enabled, it will not be loaded in if set to false.
- */
-private[yarn] final class ConfigurableCredentialManager(
-    sparkConf: SparkConf, hadoopConf: Configuration) extends Logging {
-  private val deprecatedProviderEnabledConfig = "spark.yarn.security.tokens.%s.enabled"
-  private val providerEnabledConfig = "spark.yarn.security.credentials.%s.enabled"
-
-  // Maintain all the registered credential providers
-  private val credentialProviders = {
-    val providers = ServiceLoader.load(classOf[ServiceCredentialProvider],
-      Utils.getContextOrSparkClassLoader).asScala
-
-    // Filter out credentials in which spark.yarn.security.credentials.{service}.enabled is false.
-    providers.filter { p =>
-      sparkConf.getOption(providerEnabledConfig.format(p.serviceName))
-        .orElse {
-          sparkConf.getOption(deprecatedProviderEnabledConfig.format(p.serviceName)).map { c =>
-            logWarning(s"${deprecatedProviderEnabledConfig.format(p.serviceName)} is deprecated, " +
-              s"using ${providerEnabledConfig.format(p.serviceName)} instead")
-            c
-          }
-        }.map(_.toBoolean).getOrElse(true)
-    }.map { p => (p.serviceName, p) }.toMap
-  }
-
-  /**
-   * Get credential provider for the specified service.
-   */
-  def getServiceCredentialProvider(service: String): Option[ServiceCredentialProvider] = {
-    credentialProviders.get(service)
-  }
-
-  /**
-   * Obtain credentials from all the registered providers.
-   * @return nearest time of next renewal, Long.MaxValue if all the credentials aren't renewable,
-   *         otherwise the nearest renewal time of any credentials will be returned.
-   */
-  def obtainCredentials(hadoopConf: Configuration, creds: Credentials): Long = {
-    credentialProviders.values.flatMap { provider =>
-      if (provider.credentialsRequired(hadoopConf)) {
-        provider.obtainCredentials(hadoopConf, sparkConf, creds)
-      } else {
-        logDebug(s"Service ${provider.serviceName} does not require a token." +
-          s" Check your configuration to see if security is disabled or not.")
-        None
-      }
-    }.foldLeft(Long.MaxValue)(math.min)
-  }
-
-  /**
-   * Create an [[AMCredentialRenewer]] instance, caller should be responsible to stop this
-   * instance when it is not used. AM will use it to renew credentials periodically.
-   */
-  def credentialRenewer(): AMCredentialRenewer = {
-    new AMCredentialRenewer(sparkConf, hadoopConf, this)
-  }
-
-  /**
-   * Create an [[CredentialUpdater]] instance, caller should be resposible to stop this intance
-   * when it is not used. Executors and driver (client mode) will use it to update credentials.
-   * periodically.
-   */
-  def credentialUpdater(): CredentialUpdater = {
-    new CredentialUpdater(sparkConf, hadoopConf, this)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/CredentialUpdater.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/CredentialUpdater.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/CredentialUpdater.scala
deleted file mode 100644
index 5df4fbd..0000000
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/CredentialUpdater.scala
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn.security
-
-import java.util.concurrent.{Executors, TimeUnit}
-
-import scala.util.control.NonFatal
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.security.{Credentials, UserGroupInformation}
-
-import org.apache.spark.SparkConf
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.deploy.yarn.config._
-import org.apache.spark.internal.Logging
-import org.apache.spark.util.{ThreadUtils, Utils}
-
-private[spark] class CredentialUpdater(
-    sparkConf: SparkConf,
-    hadoopConf: Configuration,
-    credentialManager: ConfigurableCredentialManager) extends Logging {
-
-  @volatile private var lastCredentialsFileSuffix = 0
-
-  private val credentialsFile = sparkConf.get(CREDENTIALS_FILE_PATH)
-  private val freshHadoopConf =
-    SparkHadoopUtil.get.getConfBypassingFSCache(
-      hadoopConf, new Path(credentialsFile).toUri.getScheme)
-
-  private val credentialUpdater =
-    Executors.newSingleThreadScheduledExecutor(
-      ThreadUtils.namedThreadFactory("Credential Refresh Thread"))
-
-  // This thread wakes up and picks up new credentials from HDFS, if any.
-  private val credentialUpdaterRunnable =
-    new Runnable {
-      override def run(): Unit = Utils.logUncaughtExceptions(updateCredentialsIfRequired())
-    }
-
-  /** Start the credential updater task */
-  def start(): Unit = {
-    val startTime = sparkConf.get(CREDENTIALS_RENEWAL_TIME)
-    val remainingTime = startTime - System.currentTimeMillis()
-    if (remainingTime <= 0) {
-      credentialUpdater.schedule(credentialUpdaterRunnable, 1, TimeUnit.MINUTES)
-    } else {
-      logInfo(s"Scheduling credentials refresh from HDFS in $remainingTime millis.")
-      credentialUpdater.schedule(credentialUpdaterRunnable, remainingTime, TimeUnit.MILLISECONDS)
-    }
-  }
-
-  private def updateCredentialsIfRequired(): Unit = {
-    val timeToNextUpdate = try {
-      val credentialsFilePath = new Path(credentialsFile)
-      val remoteFs = FileSystem.get(freshHadoopConf)
-      SparkHadoopUtil.get.listFilesSorted(
-        remoteFs, credentialsFilePath.getParent,
-        credentialsFilePath.getName, SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION)
-        .lastOption.map { credentialsStatus =>
-          val suffix = SparkHadoopUtil.get.getSuffixForCredentialsPath(credentialsStatus.getPath)
-          if (suffix > lastCredentialsFileSuffix) {
-            logInfo("Reading new credentials from " + credentialsStatus.getPath)
-            val newCredentials = getCredentialsFromHDFSFile(remoteFs, credentialsStatus.getPath)
-            lastCredentialsFileSuffix = suffix
-            UserGroupInformation.getCurrentUser.addCredentials(newCredentials)
-            logInfo("Credentials updated from credentials file.")
-
-            val remainingTime = getTimeOfNextUpdateFromFileName(credentialsStatus.getPath)
-              - System.currentTimeMillis()
-            if (remainingTime <= 0) TimeUnit.MINUTES.toMillis(1) else remainingTime
-          } else {
-            // If current credential file is older than expected, sleep 1 hour and check again.
-            TimeUnit.HOURS.toMillis(1)
-          }
-      }.getOrElse {
-        // Wait for 1 minute to check again if there's no credential file currently
-        TimeUnit.MINUTES.toMillis(1)
-      }
-    } catch {
-      // Since the file may get deleted while we are reading it, catch the Exception and come
-      // back in an hour to try again
-      case NonFatal(e) =>
-        logWarning("Error while trying to update credentials, will try again in 1 hour", e)
-        TimeUnit.HOURS.toMillis(1)
-    }
-
-    credentialUpdater.schedule(
-      credentialUpdaterRunnable, timeToNextUpdate, TimeUnit.MILLISECONDS)
-  }
-
-  private def getCredentialsFromHDFSFile(remoteFs: FileSystem, tokenPath: Path): Credentials = {
-    val stream = remoteFs.open(tokenPath)
-    try {
-      val newCredentials = new Credentials()
-      newCredentials.readTokenStorageStream(stream)
-      newCredentials
-    } finally {
-      stream.close()
-    }
-  }
-
-  private def getTimeOfNextUpdateFromFileName(credentialsPath: Path): Long = {
-    val name = credentialsPath.getName
-    val index = name.lastIndexOf(SparkHadoopUtil.SPARK_YARN_CREDS_COUNTER_DELIM)
-    val slice = name.substring(0, index)
-    val last2index = slice.lastIndexOf(SparkHadoopUtil.SPARK_YARN_CREDS_COUNTER_DELIM)
-    name.substring(last2index + 1, index).toLong
-  }
-
-  def stop(): Unit = {
-    credentialUpdater.shutdown()
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HBaseCredentialProvider.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HBaseCredentialProvider.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HBaseCredentialProvider.scala
deleted file mode 100644
index 5571df0..0000000
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HBaseCredentialProvider.scala
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn.security
-
-import scala.reflect.runtime.universe
-import scala.util.control.NonFatal
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.security.Credentials
-import org.apache.hadoop.security.token.{Token, TokenIdentifier}
-
-import org.apache.spark.SparkConf
-import org.apache.spark.internal.Logging
-
-private[security] class HBaseCredentialProvider extends ServiceCredentialProvider with Logging {
-
-  override def serviceName: String = "hbase"
-
-  override def obtainCredentials(
-      hadoopConf: Configuration,
-      sparkConf: SparkConf,
-      creds: Credentials): Option[Long] = {
-    try {
-      val mirror = universe.runtimeMirror(getClass.getClassLoader)
-      val obtainToken = mirror.classLoader.
-        loadClass("org.apache.hadoop.hbase.security.token.TokenUtil").
-        getMethod("obtainToken", classOf[Configuration])
-
-      logDebug("Attempting to fetch HBase security token.")
-      val token = obtainToken.invoke(null, hbaseConf(hadoopConf))
-        .asInstanceOf[Token[_ <: TokenIdentifier]]
-      logInfo(s"Get token from HBase: ${token.toString}")
-      creds.addToken(token.getService, token)
-    } catch {
-      case NonFatal(e) =>
-        logDebug(s"Failed to get token from service $serviceName", e)
-    }
-
-    None
-  }
-
-  override def credentialsRequired(hadoopConf: Configuration): Boolean = {
-    hbaseConf(hadoopConf).get("hbase.security.authentication") == "kerberos"
-  }
-
-  private def hbaseConf(conf: Configuration): Configuration = {
-    try {
-      val mirror = universe.runtimeMirror(getClass.getClassLoader)
-      val confCreate = mirror.classLoader.
-        loadClass("org.apache.hadoop.hbase.HBaseConfiguration").
-        getMethod("create", classOf[Configuration])
-      confCreate.invoke(null, conf).asInstanceOf[Configuration]
-    } catch {
-      case NonFatal(e) =>
-        logDebug("Fail to invoke HBaseConfiguration", e)
-        conf
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProvider.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProvider.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProvider.scala
deleted file mode 100644
index 8d06d73..0000000
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProvider.scala
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn.security
-
-import java.io.{ByteArrayInputStream, DataInputStream}
-
-import scala.collection.JavaConverters._
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
-import org.apache.hadoop.mapred.Master
-import org.apache.hadoop.security.Credentials
-
-import org.apache.spark.{SparkConf, SparkException}
-import org.apache.spark.deploy.yarn.config._
-import org.apache.spark.internal.Logging
-import org.apache.spark.internal.config._
-
-private[security] class HDFSCredentialProvider extends ServiceCredentialProvider with Logging {
-  // Token renewal interval, this value will be set in the first call,
-  // if None means no token renewer specified, so cannot get token renewal interval.
-  private var tokenRenewalInterval: Option[Long] = null
-
-  override val serviceName: String = "hdfs"
-
-  override def obtainCredentials(
-      hadoopConf: Configuration,
-      sparkConf: SparkConf,
-      creds: Credentials): Option[Long] = {
-    // NameNode to access, used to get tokens from different FileSystems
-    nnsToAccess(hadoopConf, sparkConf).foreach { dst =>
-      val dstFs = dst.getFileSystem(hadoopConf)
-      logInfo("getting token for namenode: " + dst)
-      dstFs.addDelegationTokens(getTokenRenewer(hadoopConf), creds)
-    }
-
-    // Get the token renewal interval if it is not set. It will only be called once.
-    if (tokenRenewalInterval == null) {
-      tokenRenewalInterval = getTokenRenewalInterval(hadoopConf, sparkConf)
-    }
-
-    // Get the time of next renewal.
-    tokenRenewalInterval.map { interval =>
-      creds.getAllTokens.asScala
-        .filter(_.getKind == DelegationTokenIdentifier.HDFS_DELEGATION_KIND)
-        .map { t =>
-          val identifier = new DelegationTokenIdentifier()
-          identifier.readFields(new DataInputStream(new ByteArrayInputStream(t.getIdentifier)))
-          identifier.getIssueDate + interval
-      }.foldLeft(0L)(math.max)
-    }
-  }
-
-  private def getTokenRenewalInterval(
-      hadoopConf: Configuration, sparkConf: SparkConf): Option[Long] = {
-    // We cannot use the tokens generated with renewer yarn. Trying to renew
-    // those will fail with an access control issue. So create new tokens with the logged in
-    // user as renewer.
-    sparkConf.get(PRINCIPAL).map { renewer =>
-      val creds = new Credentials()
-      nnsToAccess(hadoopConf, sparkConf).foreach { dst =>
-        val dstFs = dst.getFileSystem(hadoopConf)
-        dstFs.addDelegationTokens(renewer, creds)
-      }
-      val t = creds.getAllTokens.asScala
-        .filter(_.getKind == DelegationTokenIdentifier.HDFS_DELEGATION_KIND)
-        .head
-      val newExpiration = t.renew(hadoopConf)
-      val identifier = new DelegationTokenIdentifier()
-      identifier.readFields(new DataInputStream(new ByteArrayInputStream(t.getIdentifier)))
-      val interval = newExpiration - identifier.getIssueDate
-      logInfo(s"Renewal Interval is $interval")
-      interval
-    }
-  }
-
-  private def getTokenRenewer(conf: Configuration): String = {
-    val delegTokenRenewer = Master.getMasterPrincipal(conf)
-    logDebug("delegation token renewer is: " + delegTokenRenewer)
-    if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
-      val errorMessage = "Can't get Master Kerberos principal for use as renewer"
-      logError(errorMessage)
-      throw new SparkException(errorMessage)
-    }
-
-    delegTokenRenewer
-  }
-
-  private def nnsToAccess(hadoopConf: Configuration, sparkConf: SparkConf): Set[Path] = {
-    sparkConf.get(NAMENODES_TO_ACCESS).map(new Path(_)).toSet +
-      sparkConf.get(STAGING_DIR).map(new Path(_))
-        .getOrElse(FileSystem.get(hadoopConf).getHomeDirectory)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HiveCredentialProvider.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HiveCredentialProvider.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HiveCredentialProvider.scala
deleted file mode 100644
index 16d8fc3..0000000
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HiveCredentialProvider.scala
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn.security
-
-import java.lang.reflect.UndeclaredThrowableException
-import java.security.PrivilegedExceptionAction
-
-import scala.reflect.runtime.universe
-import scala.util.control.NonFatal
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
-import org.apache.hadoop.io.Text
-import org.apache.hadoop.security.{Credentials, UserGroupInformation}
-import org.apache.hadoop.security.token.Token
-
-import org.apache.spark.SparkConf
-import org.apache.spark.internal.Logging
-import org.apache.spark.util.Utils
-
-private[security] class HiveCredentialProvider extends ServiceCredentialProvider with Logging {
-
-  override def serviceName: String = "hive"
-
-  private def hiveConf(hadoopConf: Configuration): Configuration = {
-    try {
-      val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader)
-      // the hive configuration class is a subclass of Hadoop Configuration, so can be cast down
-      // to a Configuration and used without reflection
-      val hiveConfClass = mirror.classLoader.loadClass("org.apache.hadoop.hive.conf.HiveConf")
-      // using the (Configuration, Class) constructor allows the current configuration to be
-      // included in the hive config.
-      val ctor = hiveConfClass.getDeclaredConstructor(classOf[Configuration],
-        classOf[Object].getClass)
-      ctor.newInstance(hadoopConf, hiveConfClass).asInstanceOf[Configuration]
-    } catch {
-      case NonFatal(e) =>
-        logDebug("Fail to create Hive Configuration", e)
-        hadoopConf
-    }
-  }
-
-  override def credentialsRequired(hadoopConf: Configuration): Boolean = {
-    UserGroupInformation.isSecurityEnabled &&
-      hiveConf(hadoopConf).getTrimmed("hive.metastore.uris", "").nonEmpty
-  }
-
-  override def obtainCredentials(
-      hadoopConf: Configuration,
-      sparkConf: SparkConf,
-      creds: Credentials): Option[Long] = {
-    val conf = hiveConf(hadoopConf)
-
-    val principalKey = "hive.metastore.kerberos.principal"
-    val principal = conf.getTrimmed(principalKey, "")
-    require(principal.nonEmpty, s"Hive principal $principalKey undefined")
-    val metastoreUri = conf.getTrimmed("hive.metastore.uris", "")
-    require(metastoreUri.nonEmpty, "Hive metastore uri undefined")
-
-    val currentUser = UserGroupInformation.getCurrentUser()
-    logDebug(s"Getting Hive delegation token for ${currentUser.getUserName()} against " +
-      s"$principal at $metastoreUri")
-
-    val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader)
-    val hiveClass = mirror.classLoader.loadClass("org.apache.hadoop.hive.ql.metadata.Hive")
-    val hiveConfClass = mirror.classLoader.loadClass("org.apache.hadoop.hive.conf.HiveConf")
-    val closeCurrent = hiveClass.getMethod("closeCurrent")
-
-    try {
-      // get all the instance methods before invoking any
-      val getDelegationToken = hiveClass.getMethod("getDelegationToken",
-        classOf[String], classOf[String])
-      val getHive = hiveClass.getMethod("get", hiveConfClass)
-
-      doAsRealUser {
-        val hive = getHive.invoke(null, conf)
-        val tokenStr = getDelegationToken.invoke(hive, currentUser.getUserName(), principal)
-          .asInstanceOf[String]
-        val hive2Token = new Token[DelegationTokenIdentifier]()
-        hive2Token.decodeFromUrlString(tokenStr)
-        logInfo(s"Get Token from hive metastore: ${hive2Token.toString}")
-        creds.addToken(new Text("hive.server2.delegation.token"), hive2Token)
-      }
-    } catch {
-      case NonFatal(e) =>
-        logDebug(s"Fail to get token from service $serviceName", e)
-    } finally {
-      Utils.tryLogNonFatalError {
-        closeCurrent.invoke(null)
-      }
-    }
-
-    None
-  }
-
-  /**
-   * Run some code as the real logged in user (which may differ from the current user, for
-   * example, when using proxying).
-   */
-  private def doAsRealUser[T](fn: => T): T = {
-    val currentUser = UserGroupInformation.getCurrentUser()
-    val realUser = Option(currentUser.getRealUser()).getOrElse(currentUser)
-
-   // For some reason the Scala-generated anonymous class ends up causing an
-   // UndeclaredThrowableException, even if you annotate the method with @throws.
-   try {
-      realUser.doAs(new PrivilegedExceptionAction[T]() {
-        override def run(): T = fn
-      })
-    } catch {
-      case e: UndeclaredThrowableException => throw Option(e.getCause()).getOrElse(e)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ServiceCredentialProvider.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ServiceCredentialProvider.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ServiceCredentialProvider.scala
deleted file mode 100644
index 4e3fcce..0000000
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ServiceCredentialProvider.scala
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn.security
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.security.{Credentials, UserGroupInformation}
-
-import org.apache.spark.SparkConf
-
-/**
- * A credential provider for a service. User must implement this if they need to access a
- * secure service from Spark.
- */
-trait ServiceCredentialProvider {
-
-  /**
-   * Name of the service to provide credentials. This name should unique, Spark internally will
-   * use this name to differentiate credential provider.
-   */
-  def serviceName: String
-
-  /**
-   * To decide whether credential is required for this service. By default it based on whether
-   * Hadoop security is enabled.
-   */
-  def credentialsRequired(hadoopConf: Configuration): Boolean = {
-    UserGroupInformation.isSecurityEnabled
-  }
-
-  /**
-   * Obtain credentials for this service and get the time of the next renewal.
-   * @param hadoopConf Configuration of current Hadoop Compatible system.
-   * @param sparkConf Spark configuration.
-   * @param creds Credentials to add tokens and security keys to.
-   * @return If this Credential is renewable and can be renewed, return the time of the next
-   *         renewal, otherwise None should be returned.
-   */
-  def obtainCredentials(
-      hadoopConf: Configuration,
-      sparkConf: SparkConf,
-      creds: Credentials): Option[Long]
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/main/scala/org/apache/spark/launcher/YarnCommandBuilderUtils.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/launcher/YarnCommandBuilderUtils.scala b/yarn/src/main/scala/org/apache/spark/launcher/YarnCommandBuilderUtils.scala
deleted file mode 100644
index 6c3556a..0000000
--- a/yarn/src/main/scala/org/apache/spark/launcher/YarnCommandBuilderUtils.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.launcher
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.ListBuffer
-import scala.util.Properties
-
-/**
- * Exposes methods from the launcher library that are used by the YARN backend.
- */
-private[spark] object YarnCommandBuilderUtils {
-
-  def quoteForBatchScript(arg: String): String = {
-    CommandBuilderUtils.quoteForBatchScript(arg)
-  }
-
-  def findJarsDir(sparkHome: String): String = {
-    val scalaVer = Properties.versionNumberString
-      .split("\\.")
-      .take(2)
-      .mkString(".")
-    CommandBuilderUtils.findJarsDir(sparkHome, scalaVer, true)
-  }
-
-  /**
-   * Adds the perm gen configuration to the list of java options if needed and not yet added.
-   *
-   * Note that this method adds the option based on the local JVM version; if the node where
-   * the container is running has a different Java version, there's a risk that the option will
-   * not be added (e.g. if the AM is running Java 8 but the container's node is set up to use
-   * Java 7).
-   */
-  def addPermGenSizeOpt(args: ListBuffer[String]): Unit = {
-    CommandBuilderUtils.addPermGenSizeOpt(args.asJava)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerExtensionService.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerExtensionService.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerExtensionService.scala
deleted file mode 100644
index 4ed2852..0000000
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerExtensionService.scala
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster
-
-import java.util.concurrent.atomic.AtomicBoolean
-
-import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId}
-
-import org.apache.spark.SparkContext
-import org.apache.spark.deploy.yarn.config._
-import org.apache.spark.internal.Logging
-import org.apache.spark.util.Utils
-
-/**
- * An extension service that can be loaded into a Spark YARN scheduler.
- * A Service that can be started and stopped.
- *
- * 1. For implementations to be loadable by `SchedulerExtensionServices`,
- * they must provide an empty constructor.
- * 2. The `stop()` operation MUST be idempotent, and succeed even if `start()` was
- * never invoked.
- */
-trait SchedulerExtensionService {
-
-  /**
-   * Start the extension service. This should be a no-op if
-   * called more than once.
-   * @param binding binding to the spark application and YARN
-   */
-  def start(binding: SchedulerExtensionServiceBinding): Unit
-
-  /**
-   * Stop the service
-   * The `stop()` operation MUST be idempotent, and succeed even if `start()` was
-   * never invoked.
-   */
-  def stop(): Unit
-}
-
-/**
- * Binding information for a [[SchedulerExtensionService]].
- *
- * The attempt ID will be set if the service is started within a YARN application master;
- * there is then a different attempt ID for every time that AM is restarted.
- * When the service binding is instantiated in client mode, there's no attempt ID, as it lacks
- * this information.
- * @param sparkContext current spark context
- * @param applicationId YARN application ID
- * @param attemptId YARN attemptID. This will always be unset in client mode, and always set in
- *                  cluster mode.
- */
-case class SchedulerExtensionServiceBinding(
-    sparkContext: SparkContext,
-    applicationId: ApplicationId,
-    attemptId: Option[ApplicationAttemptId] = None)
-
-/**
- * Container for [[SchedulerExtensionService]] instances.
- *
- * Loads Extension Services from the configuration property
- * `"spark.yarn.services"`, instantiates and starts them.
- * When stopped, it stops all child entries.
- *
- * The order in which child extension services are started and stopped
- * is undefined.
- */
-private[spark] class SchedulerExtensionServices extends SchedulerExtensionService
-    with Logging {
-  private var serviceOption: Option[String] = None
-  private var services: List[SchedulerExtensionService] = Nil
-  private val started = new AtomicBoolean(false)
-  private var binding: SchedulerExtensionServiceBinding = _
-
-  /**
-   * Binding operation will load the named services and call bind on them too; the
-   * entire set of services are then ready for `init()` and `start()` calls.
-   *
-   * @param binding binding to the spark application and YARN
-   */
-  def start(binding: SchedulerExtensionServiceBinding): Unit = {
-    if (started.getAndSet(true)) {
-      logWarning("Ignoring re-entrant start operation")
-      return
-    }
-    require(binding.sparkContext != null, "Null context parameter")
-    require(binding.applicationId != null, "Null appId parameter")
-    this.binding = binding
-    val sparkContext = binding.sparkContext
-    val appId = binding.applicationId
-    val attemptId = binding.attemptId
-    logInfo(s"Starting Yarn extension services with app $appId and attemptId $attemptId")
-
-    services = sparkContext.conf.get(SCHEDULER_SERVICES).map { sClass =>
-      val instance = Utils.classForName(sClass)
-        .newInstance()
-        .asInstanceOf[SchedulerExtensionService]
-      // bind this service
-      instance.start(binding)
-      logInfo(s"Service $sClass started")
-      instance
-    }.toList
-  }
-
-  /**
-   * Get the list of services.
-   *
-   * @return a list of services; Nil until the service is started
-   */
-  def getServices: List[SchedulerExtensionService] = services
-
-  /**
-   * Stop the services; idempotent.
-   *
-   */
-  override def stop(): Unit = {
-    if (started.getAndSet(false)) {
-      logInfo(s"Stopping $this")
-      services.foreach { s =>
-        Utils.tryLogNonFatalError(s.stop())
-      }
-    }
-  }
-
-  override def toString(): String = s"""SchedulerExtensionServices
-    |(serviceOption=$serviceOption,
-    | services=$services,
-    | started=$started)""".stripMargin
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
deleted file mode 100644
index 60da356..0000000
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster
-
-import scala.collection.mutable.ArrayBuffer
-
-import org.apache.hadoop.yarn.api.records.YarnApplicationState
-
-import org.apache.spark.{SparkContext, SparkException}
-import org.apache.spark.deploy.yarn.{Client, ClientArguments, YarnSparkHadoopUtil}
-import org.apache.spark.internal.Logging
-import org.apache.spark.launcher.SparkAppHandle
-import org.apache.spark.scheduler.TaskSchedulerImpl
-
-private[spark] class YarnClientSchedulerBackend(
-    scheduler: TaskSchedulerImpl,
-    sc: SparkContext)
-  extends YarnSchedulerBackend(scheduler, sc)
-  with Logging {
-
-  private var client: Client = null
-  private var monitorThread: MonitorThread = null
-
-  /**
-   * Create a Yarn client to submit an application to the ResourceManager.
-   * This waits until the application is running.
-   */
-  override def start() {
-    val driverHost = conf.get("spark.driver.host")
-    val driverPort = conf.get("spark.driver.port")
-    val hostport = driverHost + ":" + driverPort
-    sc.ui.foreach { ui => conf.set("spark.driver.appUIAddress", ui.webUrl) }
-
-    val argsArrayBuf = new ArrayBuffer[String]()
-    argsArrayBuf += ("--arg", hostport)
-
-    logDebug("ClientArguments called with: " + argsArrayBuf.mkString(" "))
-    val args = new ClientArguments(argsArrayBuf.toArray)
-    totalExpectedExecutors = YarnSparkHadoopUtil.getInitialTargetExecutorNumber(conf)
-    client = new Client(args, conf)
-    bindToYarn(client.submitApplication(), None)
-
-    // SPARK-8687: Ensure all necessary properties have already been set before
-    // we initialize our driver scheduler backend, which serves these properties
-    // to the executors
-    super.start()
-    waitForApplication()
-
-    // SPARK-8851: In yarn-client mode, the AM still does the credentials refresh. The driver
-    // reads the credentials from HDFS, just like the executors and updates its own credentials
-    // cache.
-    if (conf.contains("spark.yarn.credentials.file")) {
-      YarnSparkHadoopUtil.get.startCredentialUpdater(conf)
-    }
-    monitorThread = asyncMonitorApplication()
-    monitorThread.start()
-  }
-
-  /**
-   * Report the state of the application until it is running.
-   * If the application has finished, failed or been killed in the process, throw an exception.
-   * This assumes both `client` and `appId` have already been set.
-   */
-  private def waitForApplication(): Unit = {
-    assert(client != null && appId.isDefined, "Application has not been submitted yet!")
-    val (state, _) = client.monitorApplication(appId.get, returnOnRunning = true) // blocking
-    if (state == YarnApplicationState.FINISHED ||
-      state == YarnApplicationState.FAILED ||
-      state == YarnApplicationState.KILLED) {
-      throw new SparkException("Yarn application has already ended! " +
-        "It might have been killed or unable to launch application master.")
-    }
-    if (state == YarnApplicationState.RUNNING) {
-      logInfo(s"Application ${appId.get} has started running.")
-    }
-  }
-
-  /**
-   * We create this class for SPARK-9519. Basically when we interrupt the monitor thread it's
-   * because the SparkContext is being shut down(sc.stop() called by user code), but if
-   * monitorApplication return, it means the Yarn application finished before sc.stop() was called,
-   * which means we should call sc.stop() here, and we don't allow the monitor to be interrupted
-   * before SparkContext stops successfully.
-   */
-  private class MonitorThread extends Thread {
-    private var allowInterrupt = true
-
-    override def run() {
-      try {
-        val (state, _) = client.monitorApplication(appId.get, logApplicationReport = false)
-        logError(s"Yarn application has already exited with state $state!")
-        allowInterrupt = false
-        sc.stop()
-      } catch {
-        case e: InterruptedException => logInfo("Interrupting monitor thread")
-      }
-    }
-
-    def stopMonitor(): Unit = {
-      if (allowInterrupt) {
-        this.interrupt()
-      }
-    }
-  }
-
-  /**
-   * Monitor the application state in a separate thread.
-   * If the application has exited for any reason, stop the SparkContext.
-   * This assumes both `client` and `appId` have already been set.
-   */
-  private def asyncMonitorApplication(): MonitorThread = {
-    assert(client != null && appId.isDefined, "Application has not been submitted yet!")
-    val t = new MonitorThread
-    t.setName("Yarn application state monitor")
-    t.setDaemon(true)
-    t
-  }
-
-  /**
-   * Stop the scheduler. This assumes `start()` has already been called.
-   */
-  override def stop() {
-    assert(client != null, "Attempted to stop this scheduler before starting it!")
-    if (monitorThread != null) {
-      monitorThread.stopMonitor()
-    }
-
-    // Report a final state to the launcher if one is connected. This is needed since in client
-    // mode this backend doesn't let the app monitor loop run to completion, so it does not report
-    // the final state itself.
-    //
-    // Note: there's not enough information at this point to provide a better final state,
-    // so assume the application was successful.
-    client.reportLauncherState(SparkAppHandle.State.FINISHED)
-
-    super.stop()
-    YarnSparkHadoopUtil.get.stopCredentialUpdater()
-    client.stop()
-    logInfo("Stopped")
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterManager.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterManager.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterManager.scala
deleted file mode 100644
index 64cd1bd..0000000
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterManager.scala
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster
-
-import org.apache.spark.{SparkContext, SparkException}
-import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl}
-
-/**
- * Cluster Manager for creation of Yarn scheduler and backend
- */
-private[spark] class YarnClusterManager extends ExternalClusterManager {
-
-  override def canCreate(masterURL: String): Boolean = {
-    masterURL == "yarn"
-  }
-
-  override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = {
-    sc.deployMode match {
-      case "cluster" => new YarnClusterScheduler(sc)
-      case "client" => new YarnScheduler(sc)
-      case _ => throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn")
-    }
-  }
-
-  override def createSchedulerBackend(sc: SparkContext,
-      masterURL: String,
-      scheduler: TaskScheduler): SchedulerBackend = {
-    sc.deployMode match {
-      case "cluster" =>
-        new YarnClusterSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc)
-      case "client" =>
-        new YarnClientSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc)
-      case  _ =>
-        throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn")
-    }
-  }
-
-  override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = {
-    scheduler.asInstanceOf[TaskSchedulerImpl].initialize(backend)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
deleted file mode 100644
index 96c9151..0000000
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster
-
-import org.apache.spark._
-import org.apache.spark.deploy.yarn.ApplicationMaster
-
-/**
- * This is a simple extension to ClusterScheduler - to ensure that appropriate initialization of
- * ApplicationMaster, etc is done
- */
-private[spark] class YarnClusterScheduler(sc: SparkContext) extends YarnScheduler(sc) {
-
-  logInfo("Created YarnClusterScheduler")
-
-  override def postStartHook() {
-    ApplicationMaster.sparkContextInitialized(sc)
-    super.postStartHook()
-    logInfo("YarnClusterScheduler.postStartHook done")
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
deleted file mode 100644
index 4f3d5eb..0000000
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster
-
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-
-import org.apache.spark.SparkContext
-import org.apache.spark.deploy.yarn.{ApplicationMaster, YarnSparkHadoopUtil}
-import org.apache.spark.scheduler.TaskSchedulerImpl
-import org.apache.spark.util.Utils
-
-private[spark] class YarnClusterSchedulerBackend(
-    scheduler: TaskSchedulerImpl,
-    sc: SparkContext)
-  extends YarnSchedulerBackend(scheduler, sc) {
-
-  override def start() {
-    val attemptId = ApplicationMaster.getAttemptId
-    bindToYarn(attemptId.getApplicationId(), Some(attemptId))
-    super.start()
-    totalExpectedExecutors = YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sc.conf)
-  }
-
-  override def getDriverLogUrls: Option[Map[String, String]] = {
-    var driverLogs: Option[Map[String, String]] = None
-    try {
-      val yarnConf = new YarnConfiguration(sc.hadoopConfiguration)
-      val containerId = YarnSparkHadoopUtil.get.getContainerId
-
-      val httpAddress = System.getenv(Environment.NM_HOST.name()) +
-        ":" + System.getenv(Environment.NM_HTTP_PORT.name())
-      // lookup appropriate http scheme for container log urls
-      val yarnHttpPolicy = yarnConf.get(
-        YarnConfiguration.YARN_HTTP_POLICY_KEY,
-        YarnConfiguration.YARN_HTTP_POLICY_DEFAULT
-      )
-      val user = Utils.getCurrentUserName()
-      val httpScheme = if (yarnHttpPolicy == "HTTPS_ONLY") "https://" else "http://"
-      val baseUrl = s"$httpScheme$httpAddress/node/containerlogs/$containerId/$user"
-      logDebug(s"Base URL for logs: $baseUrl")
-      driverLogs = Some(Map(
-        "stdout" -> s"$baseUrl/stdout?start=-4096",
-        "stderr" -> s"$baseUrl/stderr?start=-4096"))
-    } catch {
-      case e: Exception =>
-        logInfo("Error while building AM log links, so AM" +
-          " logs link will not appear in application UI", e)
-    }
-    driverLogs
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnScheduler.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnScheduler.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnScheduler.scala
deleted file mode 100644
index 0293821..0000000
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnScheduler.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster
-
-import org.apache.hadoop.yarn.util.RackResolver
-import org.apache.log4j.{Level, Logger}
-
-import org.apache.spark._
-import org.apache.spark.scheduler.TaskSchedulerImpl
-import org.apache.spark.util.Utils
-
-private[spark] class YarnScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc) {
-
-  // RackResolver logs an INFO message whenever it resolves a rack, which is way too often.
-  if (Logger.getLogger(classOf[RackResolver]).getLevel == null) {
-    Logger.getLogger(classOf[RackResolver]).setLevel(Level.WARN)
-  }
-
-  // By default, rack is unknown
-  override def getRackForHost(hostPort: String): Option[String] = {
-    val host = Utils.parseHostPort(hostPort)._1
-    Option(RackResolver.resolve(sc.hadoopConfiguration, host).getNetworkLocation)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
deleted file mode 100644
index 2f9ea19..0000000
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ /dev/null
@@ -1,315 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster
-
-import scala.concurrent.{ExecutionContext, Future}
-import scala.util.{Failure, Success}
-import scala.util.control.NonFatal
-
-import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId}
-
-import org.apache.spark.SparkContext
-import org.apache.spark.internal.Logging
-import org.apache.spark.rpc._
-import org.apache.spark.scheduler._
-import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
-import org.apache.spark.ui.JettyUtils
-import org.apache.spark.util.{RpcUtils, ThreadUtils}
-
-/**
- * Abstract Yarn scheduler backend that contains common logic
- * between the client and cluster Yarn scheduler backends.
- */
-private[spark] abstract class YarnSchedulerBackend(
-    scheduler: TaskSchedulerImpl,
-    sc: SparkContext)
-  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) {
-
-  override val minRegisteredRatio =
-    if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
-      0.8
-    } else {
-      super.minRegisteredRatio
-    }
-
-  protected var totalExpectedExecutors = 0
-
-  private val yarnSchedulerEndpoint = new YarnSchedulerEndpoint(rpcEnv)
-
-  private val yarnSchedulerEndpointRef = rpcEnv.setupEndpoint(
-    YarnSchedulerBackend.ENDPOINT_NAME, yarnSchedulerEndpoint)
-
-  private implicit val askTimeout = RpcUtils.askRpcTimeout(sc.conf)
-
-  /** Application ID. */
-  protected var appId: Option[ApplicationId] = None
-
-  /** Attempt ID. This is unset for client-mode schedulers */
-  private var attemptId: Option[ApplicationAttemptId] = None
-
-  /** Scheduler extension services. */
-  private val services: SchedulerExtensionServices = new SchedulerExtensionServices()
-
-  // Flag to specify whether this schedulerBackend should be reset.
-  private var shouldResetOnAmRegister = false
-
-  /**
-   * Bind to YARN. This *must* be done before calling [[start()]].
-   *
-   * @param appId YARN application ID
-   * @param attemptId Optional YARN attempt ID
-   */
-  protected def bindToYarn(appId: ApplicationId, attemptId: Option[ApplicationAttemptId]): Unit = {
-    this.appId = Some(appId)
-    this.attemptId = attemptId
-  }
-
-  override def start() {
-    require(appId.isDefined, "application ID unset")
-    val binding = SchedulerExtensionServiceBinding(sc, appId.get, attemptId)
-    services.start(binding)
-    super.start()
-  }
-
-  override def stop(): Unit = {
-    try {
-      // SPARK-12009: To prevent Yarn allocator from requesting backup for the executors which
-      // was Stopped by SchedulerBackend.
-      requestTotalExecutors(0, 0, Map.empty)
-      super.stop()
-    } finally {
-      services.stop()
-    }
-  }
-
-  /**
-   * Get the attempt ID for this run, if the cluster manager supports multiple
-   * attempts. Applications run in client mode will not have attempt IDs.
-   * This attempt ID only includes attempt counter, like "1", "2".
-   *
-   * @return The application attempt id, if available.
-   */
-  override def applicationAttemptId(): Option[String] = {
-    attemptId.map(_.getAttemptId.toString)
-  }
-
-  /**
-   * Get an application ID associated with the job.
-   * This returns the string value of [[appId]] if set, otherwise
-   * the locally-generated ID from the superclass.
-   * @return The application ID
-   */
-  override def applicationId(): String = {
-    appId.map(_.toString).getOrElse {
-      logWarning("Application ID is not initialized yet.")
-      super.applicationId
-    }
-  }
-
-  /**
-   * Request executors from the ApplicationMaster by specifying the total number desired.
-   * This includes executors already pending or running.
-   */
-  override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = {
-    yarnSchedulerEndpointRef.ask[Boolean](
-      RequestExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount))
-  }
-
-  /**
-   * Request that the ApplicationMaster kill the specified executors.
-   */
-  override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = {
-    yarnSchedulerEndpointRef.ask[Boolean](KillExecutors(executorIds))
-  }
-
-  override def sufficientResourcesRegistered(): Boolean = {
-    totalRegisteredExecutors.get() >= totalExpectedExecutors * minRegisteredRatio
-  }
-
-  /**
-   * Add filters to the SparkUI.
-   */
-  private def addWebUIFilter(
-      filterName: String,
-      filterParams: Map[String, String],
-      proxyBase: String): Unit = {
-    if (proxyBase != null && proxyBase.nonEmpty) {
-      System.setProperty("spark.ui.proxyBase", proxyBase)
-    }
-
-    val hasFilter =
-      filterName != null && filterName.nonEmpty &&
-      filterParams != null && filterParams.nonEmpty
-    if (hasFilter) {
-      logInfo(s"Add WebUI Filter. $filterName, $filterParams, $proxyBase")
-      conf.set("spark.ui.filters", filterName)
-      filterParams.foreach { case (k, v) => conf.set(s"spark.$filterName.param.$k", v) }
-      scheduler.sc.ui.foreach { ui => JettyUtils.addFilters(ui.getHandlers, conf) }
-    }
-  }
-
-  override def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = {
-    new YarnDriverEndpoint(rpcEnv, properties)
-  }
-
-  /**
-   * Reset the state of SchedulerBackend to the initial state. This is happened when AM is failed
-   * and re-registered itself to driver after a failure. The stale state in driver should be
-   * cleaned.
-   */
-  override protected def reset(): Unit = {
-    super.reset()
-    sc.executorAllocationManager.foreach(_.reset())
-  }
-
-  /**
-   * Override the DriverEndpoint to add extra logic for the case when an executor is disconnected.
-   * This endpoint communicates with the executors and queries the AM for an executor's exit
-   * status when the executor is disconnected.
-   */
-  private class YarnDriverEndpoint(rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
-      extends DriverEndpoint(rpcEnv, sparkProperties) {
-
-    /**
-     * When onDisconnected is received at the driver endpoint, the superclass DriverEndpoint
-     * handles it by assuming the Executor was lost for a bad reason and removes the executor
-     * immediately.
-     *
-     * In YARN's case however it is crucial to talk to the application master and ask why the
-     * executor had exited. If the executor exited for some reason unrelated to the running tasks
-     * (e.g., preemption), according to the application master, then we pass that information down
-     * to the TaskSetManager to inform the TaskSetManager that tasks on that lost executor should
-     * not count towards a job failure.
-     */
-    override def onDisconnected(rpcAddress: RpcAddress): Unit = {
-      addressToExecutorId.get(rpcAddress).foreach { executorId =>
-        if (disableExecutor(executorId)) {
-          yarnSchedulerEndpoint.handleExecutorDisconnectedFromDriver(executorId, rpcAddress)
-        }
-      }
-    }
-  }
-
-  /**
-   * An [[RpcEndpoint]] that communicates with the ApplicationMaster.
-   */
-  private class YarnSchedulerEndpoint(override val rpcEnv: RpcEnv)
-    extends ThreadSafeRpcEndpoint with Logging {
-    private var amEndpoint: Option[RpcEndpointRef] = None
-
-    private[YarnSchedulerBackend] def handleExecutorDisconnectedFromDriver(
-        executorId: String,
-        executorRpcAddress: RpcAddress): Unit = {
-      val removeExecutorMessage = amEndpoint match {
-        case Some(am) =>
-          val lossReasonRequest = GetExecutorLossReason(executorId)
-          am.ask[ExecutorLossReason](lossReasonRequest, askTimeout)
-            .map { reason => RemoveExecutor(executorId, reason) }(ThreadUtils.sameThread)
-            .recover {
-              case NonFatal(e) =>
-                logWarning(s"Attempted to get executor loss reason" +
-                  s" for executor id ${executorId} at RPC address ${executorRpcAddress}," +
-                  s" but got no response. Marking as slave lost.", e)
-                RemoveExecutor(executorId, SlaveLost())
-            }(ThreadUtils.sameThread)
-        case None =>
-          logWarning("Attempted to check for an executor loss reason" +
-            " before the AM has registered!")
-          Future.successful(RemoveExecutor(executorId, SlaveLost("AM is not yet registered.")))
-      }
-
-      removeExecutorMessage
-        .flatMap { message =>
-          driverEndpoint.ask[Boolean](message)
-        }(ThreadUtils.sameThread)
-        .onFailure {
-          case NonFatal(e) => logError(
-            s"Error requesting driver to remove executor $executorId after disconnection.", e)
-        }(ThreadUtils.sameThread)
-    }
-
-    override def receive: PartialFunction[Any, Unit] = {
-      case RegisterClusterManager(am) =>
-        logInfo(s"ApplicationMaster registered as $am")
-        amEndpoint = Option(am)
-        if (!shouldResetOnAmRegister) {
-          shouldResetOnAmRegister = true
-        } else {
-          // AM is already registered before, this potentially means that AM failed and
-          // a new one registered after the failure. This will only happen in yarn-client mode.
-          reset()
-        }
-
-      case AddWebUIFilter(filterName, filterParams, proxyBase) =>
-        addWebUIFilter(filterName, filterParams, proxyBase)
-
-      case r @ RemoveExecutor(executorId, reason) =>
-        logWarning(reason.toString)
-        driverEndpoint.ask[Boolean](r).onFailure {
-          case e =>
-            logError("Error requesting driver to remove executor" +
-              s" $executorId for reason $reason", e)
-        }(ThreadUtils.sameThread)
-    }
-
-
-    override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
-      case r: RequestExecutors =>
-        amEndpoint match {
-          case Some(am) =>
-            am.ask[Boolean](r).andThen {
-              case Success(b) => context.reply(b)
-              case Failure(NonFatal(e)) =>
-                logError(s"Sending $r to AM was unsuccessful", e)
-                context.sendFailure(e)
-            }(ThreadUtils.sameThread)
-          case None =>
-            logWarning("Attempted to request executors before the AM has registered!")
-            context.reply(false)
-        }
-
-      case k: KillExecutors =>
-        amEndpoint match {
-          case Some(am) =>
-            am.ask[Boolean](k).andThen {
-              case Success(b) => context.reply(b)
-              case Failure(NonFatal(e)) =>
-                logError(s"Sending $k to AM was unsuccessful", e)
-                context.sendFailure(e)
-            }(ThreadUtils.sameThread)
-          case None =>
-            logWarning("Attempted to kill executors before the AM has registered!")
-            context.reply(false)
-        }
-
-      case RetrieveLastAllocatedExecutorId =>
-        context.reply(currentExecutorIdCounter)
-    }
-
-    override def onDisconnected(remoteAddress: RpcAddress): Unit = {
-      if (amEndpoint.exists(_.address == remoteAddress)) {
-        logWarning(s"ApplicationMaster has disassociated: $remoteAddress")
-        amEndpoint = None
-      }
-    }
-  }
-}
-
-private[spark] object YarnSchedulerBackend {
-  val ENDPOINT_NAME = "YarnScheduler"
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/test/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
----------------------------------------------------------------------
diff --git a/yarn/src/test/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider b/yarn/src/test/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
deleted file mode 100644
index d0ef5ef..0000000
--- a/yarn/src/test/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
+++ /dev/null
@@ -1 +0,0 @@
-org.apache.spark.deploy.yarn.security.TestCredentialProvider

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/yarn/src/test/resources/log4j.properties b/yarn/src/test/resources/log4j.properties
deleted file mode 100644
index d13454d..0000000
--- a/yarn/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,31 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# Set everything to be logged to the file target/unit-tests.log
-log4j.rootCategory=DEBUG, file
-log4j.appender.file=org.apache.log4j.FileAppender
-log4j.appender.file.append=true
-log4j.appender.file.file=target/unit-tests.log
-log4j.appender.file.layout=org.apache.log4j.PatternLayout
-log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
-
-# Ignore messages below warning level from a few verbose libraries.
-log4j.logger.com.sun.jersey=WARN
-log4j.logger.org.apache.hadoop=WARN
-log4j.logger.org.eclipse.jetty=WARN
-log4j.logger.org.mortbay=WARN
-log4j.logger.org.spark_project.jetty=WARN

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
----------------------------------------------------------------------
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
deleted file mode 100644
index 9c3b18e..0000000
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import java.io.{File, FileOutputStream, OutputStreamWriter}
-import java.nio.charset.StandardCharsets
-import java.util.Properties
-import java.util.concurrent.TimeUnit
-
-import scala.collection.JavaConverters._
-import scala.concurrent.duration._
-import scala.language.postfixOps
-
-import com.google.common.io.Files
-import org.apache.commons.lang3.SerializationUtils
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.server.MiniYARNCluster
-import org.scalatest.{BeforeAndAfterAll, Matchers}
-import org.scalatest.concurrent.Eventually._
-
-import org.apache.spark._
-import org.apache.spark.deploy.yarn.config._
-import org.apache.spark.internal.Logging
-import org.apache.spark.launcher._
-import org.apache.spark.util.Utils
-
-abstract class BaseYarnClusterSuite
-  extends SparkFunSuite with BeforeAndAfterAll with Matchers with Logging {
-
-  // log4j configuration for the YARN containers, so that their output is collected
-  // by YARN instead of trying to overwrite unit-tests.log.
-  protected val LOG4J_CONF = """
-    |log4j.rootCategory=DEBUG, console
-    |log4j.appender.console=org.apache.log4j.ConsoleAppender
-    |log4j.appender.console.target=System.err
-    |log4j.appender.console.layout=org.apache.log4j.PatternLayout
-    |log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
-    |log4j.logger.org.apache.hadoop=WARN
-    |log4j.logger.org.eclipse.jetty=WARN
-    |log4j.logger.org.mortbay=WARN
-    |log4j.logger.org.spark_project.jetty=WARN
-    """.stripMargin
-
-  private var yarnCluster: MiniYARNCluster = _
-  protected var tempDir: File = _
-  private var fakeSparkJar: File = _
-  protected var hadoopConfDir: File = _
-  private var logConfDir: File = _
-
-  var oldSystemProperties: Properties = null
-
-  def newYarnConfig(): YarnConfiguration
-
-  override def beforeAll() {
-    super.beforeAll()
-    oldSystemProperties = SerializationUtils.clone(System.getProperties)
-
-    tempDir = Utils.createTempDir()
-    logConfDir = new File(tempDir, "log4j")
-    logConfDir.mkdir()
-    System.setProperty("SPARK_YARN_MODE", "true")
-
-    val logConfFile = new File(logConfDir, "log4j.properties")
-    Files.write(LOG4J_CONF, logConfFile, StandardCharsets.UTF_8)
-
-    // Disable the disk utilization check to avoid the test hanging when people's disks are
-    // getting full.
-    val yarnConf = newYarnConfig()
-    yarnConf.set("yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage",
-      "100.0")
-
-    yarnCluster = new MiniYARNCluster(getClass().getName(), 1, 1, 1)
-    yarnCluster.init(yarnConf)
-    yarnCluster.start()
-
-    // There's a race in MiniYARNCluster in which start() may return before the RM has updated
-    // its address in the configuration. You can see this in the logs by noticing that when
-    // MiniYARNCluster prints the address, it still has port "0" assigned, although later the
-    // test works sometimes:
-    //
-    //    INFO MiniYARNCluster: MiniYARN ResourceManager address: blah:0
-    //
-    // That log message prints the contents of the RM_ADDRESS config variable. If you check it
-    // later on, it looks something like this:
-    //
-    //    INFO YarnClusterSuite: RM address in configuration is blah:42631
-    //
-    // This hack loops for a bit waiting for the port to change, and fails the test if it hasn't
-    // done so in a timely manner (defined to be 10 seconds).
-    val config = yarnCluster.getConfig()
-    val deadline = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(10)
-    while (config.get(YarnConfiguration.RM_ADDRESS).split(":")(1) == "0") {
-      if (System.currentTimeMillis() > deadline) {
-        throw new IllegalStateException("Timed out waiting for RM to come up.")
-      }
-      logDebug("RM address still not set in configuration, waiting...")
-      TimeUnit.MILLISECONDS.sleep(100)
-    }
-
-    logInfo(s"RM address in configuration is ${config.get(YarnConfiguration.RM_ADDRESS)}")
-
-    fakeSparkJar = File.createTempFile("sparkJar", null, tempDir)
-    hadoopConfDir = new File(tempDir, Client.LOCALIZED_CONF_DIR)
-    assert(hadoopConfDir.mkdir())
-    File.createTempFile("token", ".txt", hadoopConfDir)
-  }
-
-  override def afterAll() {
-    try {
-      yarnCluster.stop()
-    } finally {
-      System.setProperties(oldSystemProperties)
-      super.afterAll()
-    }
-  }
-
-  protected def runSpark(
-      clientMode: Boolean,
-      klass: String,
-      appArgs: Seq[String] = Nil,
-      sparkArgs: Seq[(String, String)] = Nil,
-      extraClassPath: Seq[String] = Nil,
-      extraJars: Seq[String] = Nil,
-      extraConf: Map[String, String] = Map(),
-      extraEnv: Map[String, String] = Map()): SparkAppHandle.State = {
-    val deployMode = if (clientMode) "client" else "cluster"
-    val propsFile = createConfFile(extraClassPath = extraClassPath, extraConf = extraConf)
-    val env = Map("YARN_CONF_DIR" -> hadoopConfDir.getAbsolutePath()) ++ extraEnv
-
-    val launcher = new SparkLauncher(env.asJava)
-    if (klass.endsWith(".py")) {
-      launcher.setAppResource(klass)
-    } else {
-      launcher.setMainClass(klass)
-      launcher.setAppResource(fakeSparkJar.getAbsolutePath())
-    }
-    launcher.setSparkHome(sys.props("spark.test.home"))
-      .setMaster("yarn")
-      .setDeployMode(deployMode)
-      .setConf("spark.executor.instances", "1")
-      .setPropertiesFile(propsFile)
-      .addAppArgs(appArgs.toArray: _*)
-
-    sparkArgs.foreach { case (name, value) =>
-      if (value != null) {
-        launcher.addSparkArg(name, value)
-      } else {
-        launcher.addSparkArg(name)
-      }
-    }
-    extraJars.foreach(launcher.addJar)
-
-    val handle = launcher.startApplication()
-    try {
-      eventually(timeout(2 minutes), interval(1 second)) {
-        assert(handle.getState().isFinal())
-      }
-    } finally {
-      handle.kill()
-    }
-
-    handle.getState()
-  }
-
-  /**
-   * This is a workaround for an issue with yarn-cluster mode: the Client class will not provide
-   * any sort of error when the job process finishes successfully, but the job itself fails. So
-   * the tests enforce that something is written to a file after everything is ok to indicate
-   * that the job succeeded.
-   */
-  protected def checkResult(finalState: SparkAppHandle.State, result: File): Unit = {
-    checkResult(finalState, result, "success")
-  }
-
-  protected def checkResult(
-      finalState: SparkAppHandle.State,
-      result: File,
-      expected: String): Unit = {
-    finalState should be (SparkAppHandle.State.FINISHED)
-    val resultString = Files.toString(result, StandardCharsets.UTF_8)
-    resultString should be (expected)
-  }
-
-  protected def mainClassName(klass: Class[_]): String = {
-    klass.getName().stripSuffix("$")
-  }
-
-  protected def createConfFile(
-      extraClassPath: Seq[String] = Nil,
-      extraConf: Map[String, String] = Map()): String = {
-    val props = new Properties()
-    props.put(SPARK_JARS.key, "local:" + fakeSparkJar.getAbsolutePath())
-
-    val testClasspath = new TestClasspathBuilder()
-      .buildClassPath(
-        logConfDir.getAbsolutePath() +
-        File.pathSeparator +
-        extraClassPath.mkString(File.pathSeparator))
-      .asScala
-      .mkString(File.pathSeparator)
-
-    props.put("spark.driver.extraClassPath", testClasspath)
-    props.put("spark.executor.extraClassPath", testClasspath)
-
-    // SPARK-4267: make sure java options are propagated correctly.
-    props.setProperty("spark.driver.extraJavaOptions", "-Dfoo=\"one two three\"")
-    props.setProperty("spark.executor.extraJavaOptions", "-Dfoo=\"one two three\"")
-
-    yarnCluster.getConfig().asScala.foreach { e =>
-      props.setProperty("spark.hadoop." + e.getKey(), e.getValue())
-    }
-    sys.props.foreach { case (k, v) =>
-      if (k.startsWith("spark.")) {
-        props.setProperty(k, v)
-      }
-    }
-    extraConf.foreach { case (k, v) => props.setProperty(k, v) }
-
-    val propsFile = File.createTempFile("spark", ".properties", tempDir)
-    val writer = new OutputStreamWriter(new FileOutputStream(propsFile), StandardCharsets.UTF_8)
-    props.store(writer, "Spark properties.")
-    writer.close()
-    propsFile.getAbsolutePath()
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
----------------------------------------------------------------------
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
deleted file mode 100644
index b696e08..0000000
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
+++ /dev/null
@@ -1,204 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import java.net.URI
-
-import scala.collection.mutable.HashMap
-import scala.collection.mutable.Map
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.FileStatus
-import org.apache.hadoop.fs.FileSystem
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.yarn.api.records.LocalResource
-import org.apache.hadoop.yarn.api.records.LocalResourceType
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility
-import org.apache.hadoop.yarn.util.ConverterUtils
-import org.mockito.Mockito.when
-import org.scalatest.mock.MockitoSugar
-
-import org.apache.spark.{SparkConf, SparkFunSuite}
-import org.apache.spark.deploy.yarn.config._
-
-class ClientDistributedCacheManagerSuite extends SparkFunSuite with MockitoSugar {
-
-  class MockClientDistributedCacheManager extends ClientDistributedCacheManager {
-    override def getVisibility(conf: Configuration, uri: URI, statCache: Map[URI, FileStatus]):
-        LocalResourceVisibility = {
-      LocalResourceVisibility.PRIVATE
-    }
-  }
-
-  test("test getFileStatus empty") {
-    val distMgr = new ClientDistributedCacheManager()
-    val fs = mock[FileSystem]
-    val uri = new URI("/tmp/testing")
-    when(fs.getFileStatus(new Path(uri))).thenReturn(new FileStatus())
-    val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
-    val stat = distMgr.getFileStatus(fs, uri, statCache)
-    assert(stat.getPath() === null)
-  }
-
-  test("test getFileStatus cached") {
-    val distMgr = new ClientDistributedCacheManager()
-    val fs = mock[FileSystem]
-    val uri = new URI("/tmp/testing")
-    val realFileStatus = new FileStatus(10, false, 1, 1024, 10, 10, null, "testOwner",
-      null, new Path("/tmp/testing"))
-    when(fs.getFileStatus(new Path(uri))).thenReturn(new FileStatus())
-    val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus](uri -> realFileStatus)
-    val stat = distMgr.getFileStatus(fs, uri, statCache)
-    assert(stat.getPath().toString() === "/tmp/testing")
-  }
-
-  test("test addResource") {
-    val distMgr = new MockClientDistributedCacheManager()
-    val fs = mock[FileSystem]
-    val conf = new Configuration()
-    val destPath = new Path("file:///foo.invalid.com:8080/tmp/testing")
-    val localResources = HashMap[String, LocalResource]()
-    val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
-    when(fs.getFileStatus(destPath)).thenReturn(new FileStatus())
-
-    distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, "link",
-      statCache, false)
-    val resource = localResources("link")
-    assert(resource.getVisibility() === LocalResourceVisibility.PRIVATE)
-    assert(ConverterUtils.getPathFromYarnURL(resource.getResource()) === destPath)
-    assert(resource.getTimestamp() === 0)
-    assert(resource.getSize() === 0)
-    assert(resource.getType() === LocalResourceType.FILE)
-
-    val sparkConf = new SparkConf(false)
-    distMgr.updateConfiguration(sparkConf)
-    assert(sparkConf.get(CACHED_FILES) === Seq("file:/foo.invalid.com:8080/tmp/testing#link"))
-    assert(sparkConf.get(CACHED_FILES_TIMESTAMPS) === Seq(0L))
-    assert(sparkConf.get(CACHED_FILES_SIZES) === Seq(0L))
-    assert(sparkConf.get(CACHED_FILES_VISIBILITIES) === Seq(LocalResourceVisibility.PRIVATE.name()))
-    assert(sparkConf.get(CACHED_FILES_TYPES) === Seq(LocalResourceType.FILE.name()))
-
-    // add another one and verify both there and order correct
-    val realFileStatus = new FileStatus(20, false, 1, 1024, 10, 30, null, "testOwner",
-      null, new Path("/tmp/testing2"))
-    val destPath2 = new Path("file:///foo.invalid.com:8080/tmp/testing2")
-    when(fs.getFileStatus(destPath2)).thenReturn(realFileStatus)
-    distMgr.addResource(fs, conf, destPath2, localResources, LocalResourceType.FILE, "link2",
-      statCache, false)
-    val resource2 = localResources("link2")
-    assert(resource2.getVisibility() === LocalResourceVisibility.PRIVATE)
-    assert(ConverterUtils.getPathFromYarnURL(resource2.getResource()) === destPath2)
-    assert(resource2.getTimestamp() === 10)
-    assert(resource2.getSize() === 20)
-    assert(resource2.getType() === LocalResourceType.FILE)
-
-    val sparkConf2 = new SparkConf(false)
-    distMgr.updateConfiguration(sparkConf2)
-
-    val files = sparkConf2.get(CACHED_FILES)
-    val sizes = sparkConf2.get(CACHED_FILES_SIZES)
-    val timestamps = sparkConf2.get(CACHED_FILES_TIMESTAMPS)
-    val visibilities = sparkConf2.get(CACHED_FILES_VISIBILITIES)
-
-    assert(files(0) === "file:/foo.invalid.com:8080/tmp/testing#link")
-    assert(timestamps(0)  === 0)
-    assert(sizes(0)  === 0)
-    assert(visibilities(0) === LocalResourceVisibility.PRIVATE.name())
-
-    assert(files(1) === "file:/foo.invalid.com:8080/tmp/testing2#link2")
-    assert(timestamps(1)  === 10)
-    assert(sizes(1)  === 20)
-    assert(visibilities(1) === LocalResourceVisibility.PRIVATE.name())
-  }
-
-  test("test addResource link null") {
-    val distMgr = new MockClientDistributedCacheManager()
-    val fs = mock[FileSystem]
-    val conf = new Configuration()
-    val destPath = new Path("file:///foo.invalid.com:8080/tmp/testing")
-    val localResources = HashMap[String, LocalResource]()
-    val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
-    when(fs.getFileStatus(destPath)).thenReturn(new FileStatus())
-
-    intercept[Exception] {
-      distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, null,
-        statCache, false)
-    }
-    assert(localResources.get("link") === None)
-    assert(localResources.size === 0)
-  }
-
-  test("test addResource appmaster only") {
-    val distMgr = new MockClientDistributedCacheManager()
-    val fs = mock[FileSystem]
-    val conf = new Configuration()
-    val destPath = new Path("file:///foo.invalid.com:8080/tmp/testing")
-    val localResources = HashMap[String, LocalResource]()
-    val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
-    val realFileStatus = new FileStatus(20, false, 1, 1024, 10, 30, null, "testOwner",
-      null, new Path("/tmp/testing"))
-    when(fs.getFileStatus(destPath)).thenReturn(realFileStatus)
-
-    distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE, "link",
-      statCache, true)
-    val resource = localResources("link")
-    assert(resource.getVisibility() === LocalResourceVisibility.PRIVATE)
-    assert(ConverterUtils.getPathFromYarnURL(resource.getResource()) === destPath)
-    assert(resource.getTimestamp() === 10)
-    assert(resource.getSize() === 20)
-    assert(resource.getType() === LocalResourceType.ARCHIVE)
-
-    val sparkConf = new SparkConf(false)
-    distMgr.updateConfiguration(sparkConf)
-    assert(sparkConf.get(CACHED_FILES) === Nil)
-    assert(sparkConf.get(CACHED_FILES_TIMESTAMPS) === Nil)
-    assert(sparkConf.get(CACHED_FILES_SIZES) === Nil)
-    assert(sparkConf.get(CACHED_FILES_VISIBILITIES) === Nil)
-    assert(sparkConf.get(CACHED_FILES_TYPES) === Nil)
-  }
-
-  test("test addResource archive") {
-    val distMgr = new MockClientDistributedCacheManager()
-    val fs = mock[FileSystem]
-    val conf = new Configuration()
-    val destPath = new Path("file:///foo.invalid.com:8080/tmp/testing")
-    val localResources = HashMap[String, LocalResource]()
-    val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
-    val realFileStatus = new FileStatus(20, false, 1, 1024, 10, 30, null, "testOwner",
-      null, new Path("/tmp/testing"))
-    when(fs.getFileStatus(destPath)).thenReturn(realFileStatus)
-
-    distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE, "link",
-      statCache, false)
-    val resource = localResources("link")
-    assert(resource.getVisibility() === LocalResourceVisibility.PRIVATE)
-    assert(ConverterUtils.getPathFromYarnURL(resource.getResource()) === destPath)
-    assert(resource.getTimestamp() === 10)
-    assert(resource.getSize() === 20)
-    assert(resource.getType() === LocalResourceType.ARCHIVE)
-
-    val sparkConf = new SparkConf(false)
-    distMgr.updateConfiguration(sparkConf)
-    assert(sparkConf.get(CACHED_FILES) === Seq("file:/foo.invalid.com:8080/tmp/testing#link"))
-    assert(sparkConf.get(CACHED_FILES_SIZES) === Seq(20L))
-    assert(sparkConf.get(CACHED_FILES_TIMESTAMPS) === Seq(10L))
-    assert(sparkConf.get(CACHED_FILES_VISIBILITIES) === Seq(LocalResourceVisibility.PRIVATE.name()))
-    assert(sparkConf.get(CACHED_FILES_TYPES) === Seq(LocalResourceType.ARCHIVE.name()))
-  }
-
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org