You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2017/08/29 21:42:30 UTC

spark git commit: [SPARK-21728][CORE] Allow SparkSubmit to use Logging.

Repository: spark
Updated Branches:
  refs/heads/master 840ba053b -> d7b1fcf8f


[SPARK-21728][CORE] Allow SparkSubmit to use Logging.

This change initializes logging when SparkSubmit runs, using
a configuration that should avoid printing log messages as
much as possible with most configurations, and adds code to
restore the Spark logging system to as close as possible to
its initial state, so the Spark app being run can re-initialize
logging with its own configuration.

With that feature, some duplicate code in SparkSubmit can now
be replaced with the existing methods in the Utils class, which
could not be used before because they initialized logging. As part
of that I also did some minor refactoring, moving methods that
should really belong in DependencyUtils.

The change also shuffles some code in SparkHadoopUtil so that
SparkSubmit can create a Hadoop config like the rest of Spark
code, respecting the user's Spark configuration.

The behavior was verified running spark-shell, pyspark and
normal applications, then verifying the logging behavior,
with and without dependency downloads.

Author: Marcelo Vanzin <va...@cloudera.com>

Closes #19013 from vanzin/SPARK-21728.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d7b1fcf8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d7b1fcf8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d7b1fcf8

Branch: refs/heads/master
Commit: d7b1fcf8f0a267322af0592b2cb31f1c8970fb16
Parents: 840ba05
Author: Marcelo Vanzin <va...@cloudera.com>
Authored: Tue Aug 29 14:42:24 2017 -0700
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Tue Aug 29 14:42:24 2017 -0700

----------------------------------------------------------------------
 .../apache/spark/deploy/DependencyUtils.scala   | 112 ++++++++----
 .../apache/spark/deploy/SparkHadoopUtil.scala   |  79 ++++----
 .../org/apache/spark/deploy/SparkSubmit.scala   | 179 ++++---------------
 .../spark/deploy/worker/DriverWrapper.scala     |   9 +-
 .../org/apache/spark/internal/Logging.scala     |  61 +++++--
 .../scala/org/apache/spark/util/Utils.scala     |  10 +-
 .../apache/spark/deploy/SparkSubmitSuite.scala  |  60 +++++--
 7 files changed, 263 insertions(+), 247 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d7b1fcf8/core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala b/core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala
index 97f3803..db92a8f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala
@@ -18,15 +18,15 @@
 package org.apache.spark.deploy
 
 import java.io.File
-import java.nio.file.Files
 
 import scala.collection.mutable.HashMap
 
-import org.apache.commons.io.FileUtils
 import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
 
-import org.apache.spark.util.MutableURLClassLoader
+import org.apache.spark.{SecurityManager, SparkConf}
+import org.apache.spark.util.{MutableURLClassLoader, Utils}
 
 private[deploy] object DependencyUtils {
 
@@ -51,41 +51,22 @@ private[deploy] object DependencyUtils {
     SparkSubmitUtils.resolveMavenCoordinates(packages, ivySettings, exclusions = exclusions)
   }
 
-  def createTempDir(): File = {
-    val targetDir = Files.createTempDirectory("tmp").toFile
-    // scalastyle:off runtimeaddshutdownhook
-    Runtime.getRuntime.addShutdownHook(new Thread() {
-      override def run(): Unit = {
-        FileUtils.deleteQuietly(targetDir)
-      }
-    })
-    // scalastyle:on runtimeaddshutdownhook
-    targetDir
-  }
-
-  def resolveAndDownloadJars(jars: String, userJar: String): String = {
-    val targetDir = DependencyUtils.createTempDir()
-    val hadoopConf = new Configuration()
-    val sparkProperties = new HashMap[String, String]()
-    val securityProperties = List("spark.ssl.fs.trustStore", "spark.ssl.trustStore",
-      "spark.ssl.fs.trustStorePassword", "spark.ssl.trustStorePassword",
-      "spark.ssl.fs.protocol", "spark.ssl.protocol")
-
-    securityProperties.foreach { pName =>
-      sys.props.get(pName).foreach { pValue =>
-        sparkProperties.put(pName, pValue)
-      }
-    }
-
+  def resolveAndDownloadJars(
+      jars: String,
+      userJar: String,
+      sparkConf: SparkConf,
+      hadoopConf: Configuration,
+      secMgr: SecurityManager): String = {
+    val targetDir = Utils.createTempDir()
     Option(jars)
       .map {
-        SparkSubmit.resolveGlobPaths(_, hadoopConf)
+        resolveGlobPaths(_, hadoopConf)
           .split(",")
           .filterNot(_.contains(userJar.split("/").last))
           .mkString(",")
       }
       .filterNot(_ == "")
-      .map(SparkSubmit.downloadFileList(_, targetDir, sparkProperties, hadoopConf))
+      .map(downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr))
       .orNull
   }
 
@@ -96,4 +77,73 @@ private[deploy] object DependencyUtils {
       }
     }
   }
+
+  /**
+   * Download a list of remote files to temp local files. If the file is local, the original file
+   * will be returned.
+   *
+   * @param fileList A comma separated file list.
+   * @param targetDir A temporary directory for which downloaded files.
+   * @param sparkConf Spark configuration.
+   * @param hadoopConf Hadoop configuration.
+   * @param secMgr Spark security manager.
+   * @return A comma separated local files list.
+   */
+  def downloadFileList(
+      fileList: String,
+      targetDir: File,
+      sparkConf: SparkConf,
+      hadoopConf: Configuration,
+      secMgr: SecurityManager): String = {
+    require(fileList != null, "fileList cannot be null.")
+    fileList.split(",")
+      .map(downloadFile(_, targetDir, sparkConf, hadoopConf, secMgr))
+      .mkString(",")
+  }
+
+  /**
+   * Download a file from the remote to a local temporary directory. If the input path points to
+   * a local path, returns it with no operation.
+   *
+   * @param path A file path from where the files will be downloaded.
+   * @param targetDir A temporary directory for which downloaded files.
+   * @param sparkConf Spark configuration.
+   * @param hadoopConf Hadoop configuration.
+   * @param secMgr Spark security manager.
+   * @return Path to the local file.
+   */
+  def downloadFile(
+      path: String,
+      targetDir: File,
+      sparkConf: SparkConf,
+      hadoopConf: Configuration,
+      secMgr: SecurityManager): String = {
+    require(path != null, "path cannot be null.")
+    val uri = Utils.resolveURI(path)
+
+    uri.getScheme match {
+      case "file" | "local" => path
+      case _ =>
+        val fname = new Path(uri).getName()
+        val localFile = Utils.doFetchFile(uri.toString(), targetDir, fname, sparkConf, secMgr,
+          hadoopConf)
+        localFile.toURI().toString()
+    }
+  }
+
+  def resolveGlobPaths(paths: String, hadoopConf: Configuration): String = {
+    require(paths != null, "paths cannot be null.")
+    paths.split(",").map(_.trim).filter(_.nonEmpty).flatMap { path =>
+      val uri = Utils.resolveURI(path)
+      uri.getScheme match {
+        case "local" | "http" | "https" | "ftp" => Array(path)
+        case _ =>
+          val fs = FileSystem.get(uri, hadoopConf)
+          Option(fs.globStatus(new Path(uri))).map { status =>
+            status.filter(_.isFile).map(_.getPath.toUri.toString)
+          }.getOrElse(Array(path))
+      }
+    }.mkString(",")
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d7b1fcf8/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index 6d507d8..53775db 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -81,29 +81,7 @@ class SparkHadoopUtil extends Logging {
    * configuration.
    */
   def appendS3AndSparkHadoopConfigurations(conf: SparkConf, hadoopConf: Configuration): Unit = {
-    // Note: this null check is around more than just access to the "conf" object to maintain
-    // the behavior of the old implementation of this code, for backwards compatibility.
-    if (conf != null) {
-      // Explicitly check for S3 environment variables
-      val keyId = System.getenv("AWS_ACCESS_KEY_ID")
-      val accessKey = System.getenv("AWS_SECRET_ACCESS_KEY")
-      if (keyId != null && accessKey != null) {
-        hadoopConf.set("fs.s3.awsAccessKeyId", keyId)
-        hadoopConf.set("fs.s3n.awsAccessKeyId", keyId)
-        hadoopConf.set("fs.s3a.access.key", keyId)
-        hadoopConf.set("fs.s3.awsSecretAccessKey", accessKey)
-        hadoopConf.set("fs.s3n.awsSecretAccessKey", accessKey)
-        hadoopConf.set("fs.s3a.secret.key", accessKey)
-
-        val sessionToken = System.getenv("AWS_SESSION_TOKEN")
-        if (sessionToken != null) {
-          hadoopConf.set("fs.s3a.session.token", sessionToken)
-        }
-      }
-      appendSparkHadoopConfigs(conf, hadoopConf)
-      val bufferSize = conf.get("spark.buffer.size", "65536")
-      hadoopConf.set("io.file.buffer.size", bufferSize)
-    }
+    SparkHadoopUtil.appendS3AndSparkHadoopConfigurations(conf, hadoopConf)
   }
 
   /**
@@ -111,10 +89,7 @@ class SparkHadoopUtil extends Logging {
    * configuration without the spark.hadoop. prefix.
    */
   def appendSparkHadoopConfigs(conf: SparkConf, hadoopConf: Configuration): Unit = {
-    // Copy any "spark.hadoop.foo=bar" spark properties into conf as "foo=bar"
-    for ((key, value) <- conf.getAll if key.startsWith("spark.hadoop.")) {
-      hadoopConf.set(key.substring("spark.hadoop.".length), value)
-    }
+    SparkHadoopUtil.appendSparkHadoopConfigs(conf, hadoopConf)
   }
 
   /**
@@ -134,9 +109,7 @@ class SparkHadoopUtil extends Logging {
    * subsystems.
    */
   def newConfiguration(conf: SparkConf): Configuration = {
-    val hadoopConf = new Configuration()
-    appendS3AndSparkHadoopConfigurations(conf, hadoopConf)
-    hadoopConf
+    SparkHadoopUtil.newConfiguration(conf)
   }
 
   /**
@@ -479,4 +452,50 @@ object SparkHadoopUtil {
       hadoop
     }
   }
+
+  /**
+   * Returns a Configuration object with Spark configuration applied on top. Unlike
+   * the instance method, this will always return a Configuration instance, and not a
+   * cluster manager-specific type.
+   */
+  private[spark] def newConfiguration(conf: SparkConf): Configuration = {
+    val hadoopConf = new Configuration()
+    appendS3AndSparkHadoopConfigurations(conf, hadoopConf)
+    hadoopConf
+  }
+
+  private def appendS3AndSparkHadoopConfigurations(
+      conf: SparkConf,
+      hadoopConf: Configuration): Unit = {
+    // Note: this null check is around more than just access to the "conf" object to maintain
+    // the behavior of the old implementation of this code, for backwards compatibility.
+    if (conf != null) {
+      // Explicitly check for S3 environment variables
+      val keyId = System.getenv("AWS_ACCESS_KEY_ID")
+      val accessKey = System.getenv("AWS_SECRET_ACCESS_KEY")
+      if (keyId != null && accessKey != null) {
+        hadoopConf.set("fs.s3.awsAccessKeyId", keyId)
+        hadoopConf.set("fs.s3n.awsAccessKeyId", keyId)
+        hadoopConf.set("fs.s3a.access.key", keyId)
+        hadoopConf.set("fs.s3.awsSecretAccessKey", accessKey)
+        hadoopConf.set("fs.s3n.awsSecretAccessKey", accessKey)
+        hadoopConf.set("fs.s3a.secret.key", accessKey)
+
+        val sessionToken = System.getenv("AWS_SESSION_TOKEN")
+        if (sessionToken != null) {
+          hadoopConf.set("fs.s3a.session.token", sessionToken)
+        }
+      }
+      appendSparkHadoopConfigs(conf, hadoopConf)
+      val bufferSize = conf.get("spark.buffer.size", "65536")
+      hadoopConf.set("io.file.buffer.size", bufferSize)
+    }
+  }
+
+  private def appendSparkHadoopConfigs(conf: SparkConf, hadoopConf: Configuration): Unit = {
+    // Copy any "spark.hadoop.foo=bar" spark properties into conf as "foo=bar"
+    for ((key, value) <- conf.getAll if key.startsWith("spark.hadoop.")) {
+      hadoopConf.set(key.substring("spark.hadoop.".length), value)
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d7b1fcf8/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 548149a..38604fe 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -20,19 +20,16 @@ package org.apache.spark.deploy
 import java.io._
 import java.lang.reflect.{InvocationTargetException, Modifier, UndeclaredThrowableException}
 import java.net.URL
-import java.security.{KeyStore, PrivilegedExceptionAction}
-import java.security.cert.X509Certificate
+import java.security.PrivilegedExceptionAction
 import java.text.ParseException
-import javax.net.ssl._
 
 import scala.annotation.tailrec
 import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
 import scala.util.Properties
 
-import com.google.common.io.ByteStreams
 import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.conf.{Configuration => HadoopConfiguration}
-import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.fs.Path
 import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.ivy.Ivy
@@ -69,7 +66,9 @@ private[deploy] object SparkSubmitAction extends Enumeration {
  * This program handles setting up the classpath with relevant Spark dependencies and provides
  * a layer over the different cluster managers and deploy modes that Spark supports.
  */
-object SparkSubmit extends CommandLineUtils {
+object SparkSubmit extends CommandLineUtils with Logging {
+
+  import DependencyUtils._
 
   // Cluster managers
   private val YARN = 1
@@ -113,6 +112,10 @@ object SparkSubmit extends CommandLineUtils {
   // scalastyle:on println
 
   override def main(args: Array[String]): Unit = {
+    // Initialize logging if it hasn't been done yet. Keep track of whether logging needs to
+    // be reset before the application starts.
+    val uninitLog = initializeLogIfNecessary(true, silent = true)
+
     val appArgs = new SparkSubmitArguments(args)
     if (appArgs.verbose) {
       // scalastyle:off println
@@ -120,7 +123,7 @@ object SparkSubmit extends CommandLineUtils {
       // scalastyle:on println
     }
     appArgs.action match {
-      case SparkSubmitAction.SUBMIT => submit(appArgs)
+      case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
       case SparkSubmitAction.KILL => kill(appArgs)
       case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
     }
@@ -153,7 +156,7 @@ object SparkSubmit extends CommandLineUtils {
    * main class.
    */
   @tailrec
-  private def submit(args: SparkSubmitArguments): Unit = {
+  private def submit(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
     val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)
 
     def doRunMain(): Unit = {
@@ -185,11 +188,16 @@ object SparkSubmit extends CommandLineUtils {
       }
     }
 
-     // In standalone cluster mode, there are two submission gateways:
-     //   (1) The traditional RPC gateway using o.a.s.deploy.Client as a wrapper
-     //   (2) The new REST-based gateway introduced in Spark 1.3
-     // The latter is the default behavior as of Spark 1.3, but Spark submit will fail over
-     // to use the legacy gateway if the master endpoint turns out to be not a REST server.
+    // Let the main class re-initialize the logging system once it starts.
+    if (uninitLog) {
+      Logging.uninitialize()
+    }
+
+    // In standalone cluster mode, there are two submission gateways:
+    //   (1) The traditional RPC gateway using o.a.s.deploy.Client as a wrapper
+    //   (2) The new REST-based gateway introduced in Spark 1.3
+    // The latter is the default behavior as of Spark 1.3, but Spark submit will fail over
+    // to use the legacy gateway if the master endpoint turns out to be not a REST server.
     if (args.isStandaloneCluster && args.useRest) {
       try {
         // scalastyle:off println
@@ -202,7 +210,7 @@ object SparkSubmit extends CommandLineUtils {
           printWarning(s"Master endpoint ${args.master} was not a REST server. " +
             "Falling back to legacy submission gateway instead.")
           args.useRest = false
-          submit(args)
+          submit(args, false)
       }
     // In all other modes, just run the main class as prepared
     } else {
@@ -328,8 +336,10 @@ object SparkSubmit extends CommandLineUtils {
       }
     }
 
-    val hadoopConf = conf.getOrElse(new HadoopConfiguration())
-    val targetDir = DependencyUtils.createTempDir()
+    val sparkConf = new SparkConf(false)
+    args.sparkProperties.foreach { case (k, v) => sparkConf.set(k, v) }
+    val hadoopConf = conf.getOrElse(SparkHadoopUtil.newConfiguration(sparkConf))
+    val targetDir = Utils.createTempDir()
 
     // Resolve glob path for different resources.
     args.jars = Option(args.jars).map(resolveGlobPaths(_, hadoopConf)).orNull
@@ -342,14 +352,15 @@ object SparkSubmit extends CommandLineUtils {
     var localJars: String = null
     var localPyFiles: String = null
     if (deployMode == CLIENT) {
+      val secMgr = new SecurityManager(sparkConf)
       localPrimaryResource = Option(args.primaryResource).map {
-        downloadFile(_, targetDir, args.sparkProperties, hadoopConf)
+        downloadFile(_, targetDir, sparkConf, hadoopConf, secMgr)
       }.orNull
       localJars = Option(args.jars).map {
-        downloadFileList(_, targetDir, args.sparkProperties, hadoopConf)
+        downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr)
       }.orNull
       localPyFiles = Option(args.pyFiles).map {
-        downloadFileList(_, targetDir, args.sparkProperties, hadoopConf)
+        downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr)
       }.orNull
     }
 
@@ -863,136 +874,6 @@ object SparkSubmit extends CommandLineUtils {
     if (merged == "") null else merged
   }
 
-  /**
-   * Download a list of remote files to temp local files. If the file is local, the original file
-   * will be returned.
-   * @param fileList A comma separated file list.
-   * @param targetDir A temporary directory for which downloaded files
-   * @param sparkProperties Spark properties
-   * @return A comma separated local files list.
-   */
-  private[deploy] def downloadFileList(
-      fileList: String,
-      targetDir: File,
-      sparkProperties: Map[String, String],
-      hadoopConf: HadoopConfiguration): String = {
-    require(fileList != null, "fileList cannot be null.")
-    fileList.split(",")
-      .map(downloadFile(_, targetDir, sparkProperties, hadoopConf))
-      .mkString(",")
-  }
-
-  /**
-   * Download a file from the remote to a local temporary directory. If the input path points to
-   * a local path, returns it with no operation.
-   * @param path A file path from where the files will be downloaded.
-   * @param targetDir A temporary directory for which downloaded files
-   * @param sparkProperties Spark properties
-   * @return A comma separated local files list.
-   */
-  private[deploy] def downloadFile(
-      path: String,
-      targetDir: File,
-      sparkProperties: Map[String, String],
-      hadoopConf: HadoopConfiguration): String = {
-    require(path != null, "path cannot be null.")
-    val uri = Utils.resolveURI(path)
-    uri.getScheme match {
-      case "file" | "local" => path
-      case "http" | "https" | "ftp" =>
-        val uc = uri.toURL.openConnection()
-        uc match {
-          case https: HttpsURLConnection =>
-            val trustStore = sparkProperties.get("spark.ssl.fs.trustStore")
-              .orElse(sparkProperties.get("spark.ssl.trustStore"))
-            val trustStorePwd = sparkProperties.get("spark.ssl.fs.trustStorePassword")
-              .orElse(sparkProperties.get("spark.ssl.trustStorePassword"))
-              .map(_.toCharArray)
-              .orNull
-            val protocol = sparkProperties.get("spark.ssl.fs.protocol")
-              .orElse(sparkProperties.get("spark.ssl.protocol"))
-            if (protocol.isEmpty) {
-              printErrorAndExit("spark ssl protocol is required when enabling SSL connection.")
-            }
-
-            val trustStoreManagers = trustStore.map { t =>
-              var input: InputStream = null
-              try {
-                input = new FileInputStream(new File(t))
-                val ks = KeyStore.getInstance(KeyStore.getDefaultType)
-                ks.load(input, trustStorePwd)
-                val tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm)
-                tmf.init(ks)
-                tmf.getTrustManagers
-              } finally {
-                if (input != null) {
-                  input.close()
-                  input = null
-                }
-              }
-            }.getOrElse {
-              Array({
-                new X509TrustManager {
-                  override def getAcceptedIssuers: Array[X509Certificate] = null
-                  override def checkClientTrusted(
-                      x509Certificates: Array[X509Certificate], s: String) {}
-                  override def checkServerTrusted(
-                      x509Certificates: Array[X509Certificate], s: String) {}
-                }: TrustManager
-              })
-            }
-            val sslContext = SSLContext.getInstance(protocol.get)
-            sslContext.init(null, trustStoreManagers, null)
-            https.setSSLSocketFactory(sslContext.getSocketFactory)
-            https.setHostnameVerifier(new HostnameVerifier {
-              override def verify(s: String, sslSession: SSLSession): Boolean = false
-            })
-
-          case _ =>
-        }
-
-        uc.setConnectTimeout(60 * 1000)
-        uc.setReadTimeout(60 * 1000)
-        uc.connect()
-        val in = uc.getInputStream
-        val fileName = new Path(uri).getName
-        val tempFile = new File(targetDir, fileName)
-        val out = new FileOutputStream(tempFile)
-        // scalastyle:off println
-        printStream.println(s"Downloading ${uri.toString} to ${tempFile.getAbsolutePath}.")
-        // scalastyle:on println
-        try {
-          ByteStreams.copy(in, out)
-        } finally {
-          in.close()
-          out.close()
-        }
-        tempFile.toURI.toString
-      case _ =>
-        val fs = FileSystem.get(uri, hadoopConf)
-        val tmpFile = new File(targetDir, new Path(uri).getName)
-        // scalastyle:off println
-        printStream.println(s"Downloading ${uri.toString} to ${tmpFile.getAbsolutePath}.")
-        // scalastyle:on println
-        fs.copyToLocalFile(new Path(uri), new Path(tmpFile.getAbsolutePath))
-        tmpFile.toURI.toString
-    }
-  }
-
-  private[deploy] def resolveGlobPaths(paths: String, hadoopConf: HadoopConfiguration): String = {
-    require(paths != null, "paths cannot be null.")
-    paths.split(",").map(_.trim).filter(_.nonEmpty).flatMap { path =>
-      val uri = Utils.resolveURI(path)
-      uri.getScheme match {
-        case "local" | "http" | "https" | "ftp" => Array(path)
-        case _ =>
-          val fs = FileSystem.get(uri, hadoopConf)
-          Option(fs.globStatus(new Path(uri))).map { status =>
-            status.filter(_.isFile).map(_.getPath.toUri.toString)
-          }.getOrElse(Array(path))
-      }
-    }.mkString(",")
-  }
 }
 
 /** Provides utility functions to be used inside SparkSubmit. */

http://git-wip-us.apache.org/repos/asf/spark/blob/d7b1fcf8/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
index cd3e361..c167119 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
@@ -22,7 +22,7 @@ import java.io.File
 import org.apache.commons.lang3.StringUtils
 
 import org.apache.spark.{SecurityManager, SparkConf}
-import org.apache.spark.deploy.{DependencyUtils, SparkSubmit}
+import org.apache.spark.deploy.{DependencyUtils, SparkHadoopUtil, SparkSubmit}
 import org.apache.spark.rpc.RpcEnv
 import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, Utils}
 
@@ -72,6 +72,10 @@ object DriverWrapper {
   }
 
   private def setupDependencies(loader: MutableURLClassLoader, userJar: String): Unit = {
+    val sparkConf = new SparkConf()
+    val secMgr = new SecurityManager(sparkConf)
+    val hadoopConf = SparkHadoopUtil.newConfiguration(sparkConf)
+
     val Seq(packagesExclusions, packages, repositories, ivyRepoPath) =
       Seq("spark.jars.excludes", "spark.jars.packages", "spark.jars.repositories", "spark.jars.ivy")
         .map(sys.props.get(_).orNull)
@@ -86,7 +90,8 @@ object DriverWrapper {
         jarsProp
       }
     }
-    val localJars = DependencyUtils.resolveAndDownloadJars(jars, userJar)
+    val localJars = DependencyUtils.resolveAndDownloadJars(jars, userJar, sparkConf, hadoopConf,
+      secMgr)
     DependencyUtils.addJarsToClassPath(localJars, loader)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d7b1fcf8/core/src/main/scala/org/apache/spark/internal/Logging.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/Logging.scala b/core/src/main/scala/org/apache/spark/internal/Logging.scala
index c7f2847..cea9964 100644
--- a/core/src/main/scala/org/apache/spark/internal/Logging.scala
+++ b/core/src/main/scala/org/apache/spark/internal/Logging.scala
@@ -96,24 +96,27 @@ trait Logging {
   }
 
   protected def initializeLogIfNecessary(isInterpreter: Boolean): Unit = {
+    initializeLogIfNecessary(isInterpreter, silent = false)
+  }
+
+  protected def initializeLogIfNecessary(
+      isInterpreter: Boolean,
+      silent: Boolean = false): Boolean = {
     if (!Logging.initialized) {
       Logging.initLock.synchronized {
         if (!Logging.initialized) {
-          initializeLogging(isInterpreter)
+          initializeLogging(isInterpreter, silent)
+          return true
         }
       }
     }
+    false
   }
 
-  private def initializeLogging(isInterpreter: Boolean): Unit = {
+  private def initializeLogging(isInterpreter: Boolean, silent: Boolean): Unit = {
     // Don't use a logger in here, as this is itself occurring during initialization of a logger
     // If Log4j 1.2 is being used, but is not initialized, load a default properties file
-    val binderClass = StaticLoggerBinder.getSingleton.getLoggerFactoryClassStr
-    // This distinguishes the log4j 1.2 binding, currently
-    // org.slf4j.impl.Log4jLoggerFactory, from the log4j 2.0 binding, currently
-    // org.apache.logging.slf4j.Log4jLoggerFactory
-    val usingLog4j12 = "org.slf4j.impl.Log4jLoggerFactory".equals(binderClass)
-    if (usingLog4j12) {
+    if (Logging.isLog4j12()) {
       val log4j12Initialized = LogManager.getRootLogger.getAllAppenders.hasMoreElements
       // scalastyle:off println
       if (!log4j12Initialized) {
@@ -121,22 +124,30 @@ trait Logging {
         Option(Utils.getSparkClassLoader.getResource(defaultLogProps)) match {
           case Some(url) =>
             PropertyConfigurator.configure(url)
-            System.err.println(s"Using Spark's default log4j profile: $defaultLogProps")
+            if (!silent) {
+              System.err.println(s"Using Spark's default log4j profile: $defaultLogProps")
+            }
           case None =>
             System.err.println(s"Spark was unable to load $defaultLogProps")
         }
       }
 
+      val rootLogger = LogManager.getRootLogger()
+      if (Logging.defaultRootLevel == null) {
+        Logging.defaultRootLevel = rootLogger.getLevel()
+      }
+
       if (isInterpreter) {
         // Use the repl's main class to define the default log level when running the shell,
         // overriding the root logger's config if they're different.
-        val rootLogger = LogManager.getRootLogger()
         val replLogger = LogManager.getLogger(logName)
         val replLevel = Option(replLogger.getLevel()).getOrElse(Level.WARN)
         if (replLevel != rootLogger.getEffectiveLevel()) {
-          System.err.printf("Setting default log level to \"%s\".\n", replLevel)
-          System.err.println("To adjust logging level use sc.setLogLevel(newLevel). " +
-            "For SparkR, use setLogLevel(newLevel).")
+          if (!silent) {
+            System.err.printf("Setting default log level to \"%s\".\n", replLevel)
+            System.err.println("To adjust logging level use sc.setLogLevel(newLevel). " +
+              "For SparkR, use setLogLevel(newLevel).")
+          }
           rootLogger.setLevel(replLevel)
         }
       }
@@ -150,8 +161,10 @@ trait Logging {
   }
 }
 
-private object Logging {
+private[spark] object Logging {
   @volatile private var initialized = false
+  @volatile private var defaultRootLevel: Level = null
+
   val initLock = new Object()
   try {
     // We use reflection here to handle the case where users remove the
@@ -165,4 +178,24 @@ private object Logging {
   } catch {
     case e: ClassNotFoundException => // can't log anything yet so just fail silently
   }
+
+  /**
+   * Marks the logging system as not initialized. This does a best effort at resetting the
+   * logging system to its initial state so that the next class to use logging triggers
+   * initialization again.
+   */
+  def uninitialize(): Unit = initLock.synchronized {
+    if (isLog4j12()) {
+      LogManager.resetConfiguration()
+    }
+    this.initialized = false
+  }
+
+  private def isLog4j12(): Boolean = {
+    // This distinguishes the log4j 1.2 binding, currently
+    // org.slf4j.impl.Log4jLoggerFactory, from the log4j 2.0 binding, currently
+    // org.apache.logging.slf4j.Log4jLoggerFactory
+    val binderClass = StaticLoggerBinder.getSingleton.getLoggerFactoryClassStr
+    "org.slf4j.impl.Log4jLoggerFactory".equals(binderClass)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d7b1fcf8/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 3dce76c..0da075d 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -449,7 +449,7 @@ private[spark] object Utils extends Logging {
       securityMgr: SecurityManager,
       hadoopConf: Configuration,
       timestamp: Long,
-      useCache: Boolean) {
+      useCache: Boolean): File = {
     val fileName = decodeFileNameInURI(new URI(url))
     val targetFile = new File(targetDir, fileName)
     val fetchCacheEnabled = conf.getBoolean("spark.files.useFetchCache", defaultValue = true)
@@ -498,6 +498,8 @@ private[spark] object Utils extends Logging {
     if (isWindows) {
       FileUtil.chmod(targetFile.getAbsolutePath, "u+r")
     }
+
+    targetFile
   }
 
   /**
@@ -637,13 +639,13 @@ private[spark] object Utils extends Logging {
    * Throws SparkException if the target file already exists and has different contents than
    * the requested file.
    */
-  private def doFetchFile(
+  def doFetchFile(
       url: String,
       targetDir: File,
       filename: String,
       conf: SparkConf,
       securityMgr: SecurityManager,
-      hadoopConf: Configuration) {
+      hadoopConf: Configuration): File = {
     val targetFile = new File(targetDir, filename)
     val uri = new URI(url)
     val fileOverwrite = conf.getBoolean("spark.files.overwrite", defaultValue = false)
@@ -687,6 +689,8 @@ private[spark] object Utils extends Logging {
         fetchHcfsFile(path, targetDir, fs, conf, hadoopConf, fileOverwrite,
                       filename = Some(filename))
     }
+
+    targetFile
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/d7b1fcf8/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 95137c8..724096d 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -29,7 +29,7 @@ import scala.io.Source
 import com.google.common.io.ByteStreams
 import org.apache.commons.io.FileUtils
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem}
+import org.apache.hadoop.fs.{FileStatus, FSDataInputStream, Path}
 import org.scalatest.{BeforeAndAfterEach, Matchers}
 import org.scalatest.concurrent.Timeouts
 import org.scalatest.time.SpanSugar._
@@ -793,30 +793,37 @@ class SparkSubmitSuite
   }
 
   test("downloadFile - invalid url") {
+    val sparkConf = new SparkConf(false)
     intercept[IOException] {
-      SparkSubmit.downloadFile(
-        "abc:/my/file", Utils.createTempDir(), mutable.Map.empty, new Configuration())
+      DependencyUtils.downloadFile(
+        "abc:/my/file", Utils.createTempDir(), sparkConf, new Configuration(),
+        new SecurityManager(sparkConf))
     }
   }
 
   test("downloadFile - file doesn't exist") {
+    val sparkConf = new SparkConf(false)
     val hadoopConf = new Configuration()
     val tmpDir = Utils.createTempDir()
     updateConfWithFakeS3Fs(hadoopConf)
     intercept[FileNotFoundException] {
-      SparkSubmit.downloadFile("s3a:/no/such/file", tmpDir, mutable.Map.empty, hadoopConf)
+      DependencyUtils.downloadFile("s3a:/no/such/file", tmpDir, sparkConf, hadoopConf,
+        new SecurityManager(sparkConf))
     }
   }
 
   test("downloadFile does not download local file") {
+    val sparkConf = new SparkConf(false)
+    val secMgr = new SecurityManager(sparkConf)
     // empty path is considered as local file.
     val tmpDir = Files.createTempDirectory("tmp").toFile
-    assert(SparkSubmit.downloadFile("", tmpDir, mutable.Map.empty, new Configuration()) === "")
-    assert(SparkSubmit.downloadFile("/local/file", tmpDir, mutable.Map.empty,
-      new Configuration()) === "/local/file")
+    assert(DependencyUtils.downloadFile("", tmpDir, sparkConf, new Configuration(), secMgr) === "")
+    assert(DependencyUtils.downloadFile("/local/file", tmpDir, sparkConf, new Configuration(),
+      secMgr) === "/local/file")
   }
 
   test("download one file to local") {
+    val sparkConf = new SparkConf(false)
     val jarFile = File.createTempFile("test", ".jar")
     jarFile.deleteOnExit()
     val content = "hello, world"
@@ -825,13 +832,14 @@ class SparkSubmitSuite
     val tmpDir = Files.createTempDirectory("tmp").toFile
     updateConfWithFakeS3Fs(hadoopConf)
     val sourcePath = s"s3a://${jarFile.getAbsolutePath}"
-    val outputPath =
-      SparkSubmit.downloadFile(sourcePath, tmpDir, mutable.Map.empty, hadoopConf)
+    val outputPath = DependencyUtils.downloadFile(sourcePath, tmpDir, sparkConf, hadoopConf,
+      new SecurityManager(sparkConf))
     checkDownloadedFile(sourcePath, outputPath)
     deleteTempOutputFile(outputPath)
   }
 
   test("download list of files to local") {
+    val sparkConf = new SparkConf(false)
     val jarFile = File.createTempFile("test", ".jar")
     jarFile.deleteOnExit()
     val content = "hello, world"
@@ -840,8 +848,10 @@ class SparkSubmitSuite
     val tmpDir = Files.createTempDirectory("tmp").toFile
     updateConfWithFakeS3Fs(hadoopConf)
     val sourcePaths = Seq("/local/file", s"s3a://${jarFile.getAbsolutePath}")
-    val outputPaths = SparkSubmit.downloadFileList(
-      sourcePaths.mkString(","), tmpDir, mutable.Map.empty, hadoopConf).split(",")
+    val outputPaths = DependencyUtils
+      .downloadFileList(sourcePaths.mkString(","), tmpDir, sparkConf, hadoopConf,
+        new SecurityManager(sparkConf))
+      .split(",")
 
     assert(outputPaths.length === sourcePaths.length)
     sourcePaths.zip(outputPaths).foreach { case (sourcePath, outputPath) =>
@@ -996,17 +1006,31 @@ object UserClasspathFirstTest {
 }
 
 class TestFileSystem extends org.apache.hadoop.fs.LocalFileSystem {
-  override def copyToLocalFile(src: Path, dst: Path): Unit = {
+  private def local(path: Path): Path = {
     // Ignore the scheme for testing.
-    super.copyToLocalFile(new Path(src.toUri.getPath), dst)
+    new Path(path.toUri.getPath)
+  }
+
+  private def toRemote(status: FileStatus): FileStatus = {
+    val path = s"s3a://${status.getPath.toUri.getPath}"
+    status.setPath(new Path(path))
+    status
   }
 
+  override def isFile(path: Path): Boolean = super.isFile(local(path))
+
   override def globStatus(pathPattern: Path): Array[FileStatus] = {
     val newPath = new Path(pathPattern.toUri.getPath)
-    super.globStatus(newPath).map { status =>
-      val path = s"s3a://${status.getPath.toUri.getPath}"
-      status.setPath(new Path(path))
-      status
-    }
+    super.globStatus(newPath).map(toRemote)
   }
+
+  override def listStatus(path: Path): Array[FileStatus] = {
+    super.listStatus(local(path)).map(toRemote)
+  }
+
+  override def copyToLocalFile(src: Path, dst: Path): Unit = {
+    super.copyToLocalFile(local(src), dst)
+  }
+
+  override def open(path: Path): FSDataInputStream = super.open(local(path))
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org