You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2015/04/14 20:49:08 UTC
spark git commit: [SPARK-6081] Support fetching http/https uris in
driver runner.
Repository: spark
Updated Branches:
refs/heads/master 51b306b93 -> 320bca450
[SPARK-6081] Support fetching http/https uris in driver runner.
Currently if passed uris such as http/https, it won't able to fetch them as it only calls HadoopFs get.
This fix utilizes the existing util method to fetch remote uris as well.
Author: Timothy Chen <tn...@gmail.com>
Closes #4832 from tnachen/driver_remote and squashes the following commits:
aa52cd6 [Timothy Chen] Support fetching remote uris in driver runner.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/320bca45
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/320bca45
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/320bca45
Branch: refs/heads/master
Commit: 320bca4508e890b874c2eb7abb76a30ef14c932f
Parents: 51b306b
Author: Timothy Chen <tn...@gmail.com>
Authored: Tue Apr 14 11:48:12 2015 -0700
Committer: Andrew Or <an...@databricks.com>
Committed: Tue Apr 14 11:49:04 2015 -0700
----------------------------------------------------------------------
.../spark/deploy/worker/DriverRunner.scala | 21 ++++++++++++--------
.../org/apache/spark/deploy/worker/Worker.scala | 3 ++-
.../apache/spark/deploy/JsonProtocolSuite.scala | 7 ++++---
.../spark/deploy/worker/DriverRunnerTest.scala | 7 ++++---
4 files changed, 23 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/320bca45/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
index e0948e1..ef7a703 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
@@ -24,14 +24,14 @@ import scala.collection.JavaConversions._
import akka.actor.ActorRef
import com.google.common.base.Charsets.UTF_8
import com.google.common.io.Files
-import org.apache.hadoop.fs.{FileUtil, Path}
+import org.apache.hadoop.fs.Path
-import org.apache.spark.{Logging, SparkConf}
+import org.apache.spark.{Logging, SparkConf, SecurityManager}
import org.apache.spark.deploy.{DriverDescription, SparkHadoopUtil}
import org.apache.spark.deploy.DeployMessages.DriverStateChanged
import org.apache.spark.deploy.master.DriverState
import org.apache.spark.deploy.master.DriverState.DriverState
-import org.apache.spark.util.{Clock, SystemClock}
+import org.apache.spark.util.{Utils, Clock, SystemClock}
/**
* Manages the execution of one driver, including automatically restarting the driver on failure.
@@ -44,7 +44,8 @@ private[deploy] class DriverRunner(
val sparkHome: File,
val driverDesc: DriverDescription,
val worker: ActorRef,
- val workerUrl: String)
+ val workerUrl: String,
+ val securityManager: SecurityManager)
extends Logging {
@volatile private var process: Option[Process] = None
@@ -136,12 +137,9 @@ private[deploy] class DriverRunner(
* Will throw an exception if there are errors downloading the jar.
*/
private def downloadUserJar(driverDir: File): String = {
-
val jarPath = new Path(driverDesc.jarUrl)
val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
- val jarFileSystem = jarPath.getFileSystem(hadoopConf)
-
val destPath = new File(driverDir.getAbsolutePath, jarPath.getName)
val jarFileName = jarPath.getName
val localJarFile = new File(driverDir, jarFileName)
@@ -149,7 +147,14 @@ private[deploy] class DriverRunner(
if (!localJarFile.exists()) { // May already exist if running multiple workers on one node
logInfo(s"Copying user jar $jarPath to $destPath")
- FileUtil.copy(jarFileSystem, jarPath, destPath, false, hadoopConf)
+ Utils.fetchFile(
+ driverDesc.jarUrl,
+ driverDir,
+ conf,
+ securityManager,
+ hadoopConf,
+ System.currentTimeMillis(),
+ useCache = false)
}
if (!localJarFile.exists()) { // Verify copy succeeded
http://git-wip-us.apache.org/repos/asf/spark/blob/320bca45/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index c4c24a7..3ee2eb6 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -436,7 +436,8 @@ private[worker] class Worker(
sparkHome,
driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),
self,
- akkaUrl)
+ akkaUrl,
+ securityMgr)
drivers(driverId) = driver
driver.start()
http://git-wip-us.apache.org/repos/asf/spark/blob/320bca45/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
index 2071701..b58d625 100644
--- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
@@ -28,7 +28,7 @@ import org.scalatest.FunSuite
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, WorkerStateResponse}
import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, RecoveryState, WorkerInfo}
import org.apache.spark.deploy.worker.{DriverRunner, ExecutorRunner}
-import org.apache.spark.SparkConf
+import org.apache.spark.{SecurityManager, SparkConf}
class JsonProtocolSuite extends FunSuite {
@@ -124,8 +124,9 @@ class JsonProtocolSuite extends FunSuite {
}
def createDriverRunner(): DriverRunner = {
- new DriverRunner(new SparkConf(), "driverId", new File("workDir"), new File("sparkHome"),
- createDriverDesc(), null, "akka://worker")
+ val conf = new SparkConf()
+ new DriverRunner(conf, "driverId", new File("workDir"), new File("sparkHome"),
+ createDriverDesc(), null, "akka://worker", new SecurityManager(conf))
}
def assertValidJson(json: JValue) {
http://git-wip-us.apache.org/repos/asf/spark/blob/320bca45/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala
index aa6e487..2159fd8 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala
@@ -25,7 +25,7 @@ import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
import org.scalatest.FunSuite
-import org.apache.spark.SparkConf
+import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.deploy.{Command, DriverDescription}
import org.apache.spark.util.Clock
@@ -33,8 +33,9 @@ class DriverRunnerTest extends FunSuite {
private def createDriverRunner() = {
val command = new Command("mainClass", Seq(), Map(), Seq(), Seq(), Seq())
val driverDescription = new DriverDescription("jarUrl", 512, 1, true, command)
- new DriverRunner(new SparkConf(), "driverId", new File("workDir"), new File("sparkHome"),
- driverDescription, null, "akka://1.2.3.4/worker/")
+ val conf = new SparkConf()
+ new DriverRunner(conf, "driverId", new File("workDir"), new File("sparkHome"),
+ driverDescription, null, "akka://1.2.3.4/worker/", new SecurityManager(conf))
}
private def createProcessBuilderAndProcess(): (ProcessBuilderLike, Process) = {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org