You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2018/01/04 23:00:16 UTC

spark git commit: [SPARK-22948][K8S] Move SparkPodInitContainer to correct package.

Repository: spark
Updated Branches:
  refs/heads/master d2cddc88e -> 95f9659ab


[SPARK-22948][K8S] Move SparkPodInitContainer to correct package.

Author: Marcelo Vanzin <va...@cloudera.com>

Closes #20156 from vanzin/SPARK-22948.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/95f9659a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/95f9659a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/95f9659a

Branch: refs/heads/master
Commit: 95f9659abe8845f9f3f42fd7ababd79e55c52489
Parents: d2cddc8
Author: Marcelo Vanzin <va...@cloudera.com>
Authored: Thu Jan 4 15:00:09 2018 -0800
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Thu Jan 4 15:00:09 2018 -0800

----------------------------------------------------------------------
 dev/sparktestsupport/modules.py                 |   2 +-
 .../deploy/k8s/SparkPodInitContainer.scala      | 116 +++++++++++++++++++
 .../deploy/rest/k8s/SparkPodInitContainer.scala | 116 -------------------
 .../deploy/k8s/SparkPodInitContainerSuite.scala |  86 ++++++++++++++
 .../rest/k8s/SparkPodInitContainerSuite.scala   |  86 --------------
 .../main/dockerfiles/init-container/Dockerfile  |   2 +-
 6 files changed, 204 insertions(+), 204 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/95f9659a/dev/sparktestsupport/modules.py
----------------------------------------------------------------------
diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index f834563..7164180 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -539,7 +539,7 @@ mesos = Module(
 kubernetes = Module(
     name="kubernetes",
     dependencies=[],
-    source_file_regexes=["resource-managers/kubernetes/core"],
+    source_file_regexes=["resource-managers/kubernetes"],
     build_profile_flags=["-Pkubernetes"],
     sbt_test_goals=["kubernetes/test"]
 )

http://git-wip-us.apache.org/repos/asf/spark/blob/95f9659a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPodInitContainer.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPodInitContainer.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPodInitContainer.scala
new file mode 100644
index 0000000..c0f0878
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPodInitContainer.scala
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+import java.io.File
+import java.util.concurrent.TimeUnit
+
+import scala.concurrent.{ExecutionContext, Future}
+
+import org.apache.spark.{SecurityManager => SparkSecurityManager, SparkConf}
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+/**
+ * Process that fetches files from a resource staging server and/or arbitrary remote locations.
+ *
+ * The init-container can handle fetching files from any of those sources, but not all of the
+ * sources need to be specified. This allows for composing multiple instances of this container
+ * with different configurations for different download sources, or using the same container to
+ * download everything at once.
+ */
+private[spark] class SparkPodInitContainer(
+    sparkConf: SparkConf,
+    fileFetcher: FileFetcher) extends Logging {
+
+  private val maxThreadPoolSize = sparkConf.get(INIT_CONTAINER_MAX_THREAD_POOL_SIZE)
+  private implicit val downloadExecutor = ExecutionContext.fromExecutorService(
+    ThreadUtils.newDaemonCachedThreadPool("download-executor", maxThreadPoolSize))
+
+  private val jarsDownloadDir = new File(sparkConf.get(JARS_DOWNLOAD_LOCATION))
+  private val filesDownloadDir = new File(sparkConf.get(FILES_DOWNLOAD_LOCATION))
+
+  private val remoteJars = sparkConf.get(INIT_CONTAINER_REMOTE_JARS)
+  private val remoteFiles = sparkConf.get(INIT_CONTAINER_REMOTE_FILES)
+
+  private val downloadTimeoutMinutes = sparkConf.get(INIT_CONTAINER_MOUNT_TIMEOUT)
+
+  def run(): Unit = {
+    logInfo(s"Downloading remote jars: $remoteJars")
+    downloadFiles(
+      remoteJars,
+      jarsDownloadDir,
+      s"Remote jars download directory specified at $jarsDownloadDir does not exist " +
+        "or is not a directory.")
+
+    logInfo(s"Downloading remote files: $remoteFiles")
+    downloadFiles(
+      remoteFiles,
+      filesDownloadDir,
+      s"Remote files download directory specified at $filesDownloadDir does not exist " +
+        "or is not a directory.")
+
+    downloadExecutor.shutdown()
+    downloadExecutor.awaitTermination(downloadTimeoutMinutes, TimeUnit.MINUTES)
+  }
+
+  private def downloadFiles(
+      filesCommaSeparated: Option[String],
+      downloadDir: File,
+      errMessage: String): Unit = {
+    filesCommaSeparated.foreach { files =>
+      require(downloadDir.isDirectory, errMessage)
+      Utils.stringToSeq(files).foreach { file =>
+        Future[Unit] {
+          fileFetcher.fetchFile(file, downloadDir)
+        }
+      }
+    }
+  }
+}
+
+private class FileFetcher(sparkConf: SparkConf, securityManager: SparkSecurityManager) {
+
+  def fetchFile(uri: String, targetDir: File): Unit = {
+    Utils.fetchFile(
+      url = uri,
+      targetDir = targetDir,
+      conf = sparkConf,
+      securityMgr = securityManager,
+      hadoopConf = SparkHadoopUtil.get.newConfiguration(sparkConf),
+      timestamp = System.currentTimeMillis(),
+      useCache = false)
+  }
+}
+
+object SparkPodInitContainer extends Logging {
+
+  def main(args: Array[String]): Unit = {
+    logInfo("Starting init-container to download Spark application dependencies.")
+    val sparkConf = new SparkConf(true)
+    if (args.nonEmpty) {
+      Utils.loadDefaultSparkProperties(sparkConf, args(0))
+    }
+
+    val securityManager = new SparkSecurityManager(sparkConf)
+    val fileFetcher = new FileFetcher(sparkConf, securityManager)
+    new SparkPodInitContainer(sparkConf, fileFetcher).run()
+    logInfo("Finished downloading application dependencies.")
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/95f9659a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/k8s/SparkPodInitContainer.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/k8s/SparkPodInitContainer.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/k8s/SparkPodInitContainer.scala
deleted file mode 100644
index 4a4b628..0000000
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/k8s/SparkPodInitContainer.scala
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.rest.k8s
-
-import java.io.File
-import java.util.concurrent.TimeUnit
-
-import scala.concurrent.{ExecutionContext, Future}
-
-import org.apache.spark.{SecurityManager => SparkSecurityManager, SparkConf}
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.deploy.k8s.Config._
-import org.apache.spark.internal.Logging
-import org.apache.spark.util.{ThreadUtils, Utils}
-
-/**
- * Process that fetches files from a resource staging server and/or arbitrary remote locations.
- *
- * The init-container can handle fetching files from any of those sources, but not all of the
- * sources need to be specified. This allows for composing multiple instances of this container
- * with different configurations for different download sources, or using the same container to
- * download everything at once.
- */
-private[spark] class SparkPodInitContainer(
-    sparkConf: SparkConf,
-    fileFetcher: FileFetcher) extends Logging {
-
-  private val maxThreadPoolSize = sparkConf.get(INIT_CONTAINER_MAX_THREAD_POOL_SIZE)
-  private implicit val downloadExecutor = ExecutionContext.fromExecutorService(
-    ThreadUtils.newDaemonCachedThreadPool("download-executor", maxThreadPoolSize))
-
-  private val jarsDownloadDir = new File(sparkConf.get(JARS_DOWNLOAD_LOCATION))
-  private val filesDownloadDir = new File(sparkConf.get(FILES_DOWNLOAD_LOCATION))
-
-  private val remoteJars = sparkConf.get(INIT_CONTAINER_REMOTE_JARS)
-  private val remoteFiles = sparkConf.get(INIT_CONTAINER_REMOTE_FILES)
-
-  private val downloadTimeoutMinutes = sparkConf.get(INIT_CONTAINER_MOUNT_TIMEOUT)
-
-  def run(): Unit = {
-    logInfo(s"Downloading remote jars: $remoteJars")
-    downloadFiles(
-      remoteJars,
-      jarsDownloadDir,
-      s"Remote jars download directory specified at $jarsDownloadDir does not exist " +
-        "or is not a directory.")
-
-    logInfo(s"Downloading remote files: $remoteFiles")
-    downloadFiles(
-      remoteFiles,
-      filesDownloadDir,
-      s"Remote files download directory specified at $filesDownloadDir does not exist " +
-        "or is not a directory.")
-
-    downloadExecutor.shutdown()
-    downloadExecutor.awaitTermination(downloadTimeoutMinutes, TimeUnit.MINUTES)
-  }
-
-  private def downloadFiles(
-      filesCommaSeparated: Option[String],
-      downloadDir: File,
-      errMessage: String): Unit = {
-    filesCommaSeparated.foreach { files =>
-      require(downloadDir.isDirectory, errMessage)
-      Utils.stringToSeq(files).foreach { file =>
-        Future[Unit] {
-          fileFetcher.fetchFile(file, downloadDir)
-        }
-      }
-    }
-  }
-}
-
-private class FileFetcher(sparkConf: SparkConf, securityManager: SparkSecurityManager) {
-
-  def fetchFile(uri: String, targetDir: File): Unit = {
-    Utils.fetchFile(
-      url = uri,
-      targetDir = targetDir,
-      conf = sparkConf,
-      securityMgr = securityManager,
-      hadoopConf = SparkHadoopUtil.get.newConfiguration(sparkConf),
-      timestamp = System.currentTimeMillis(),
-      useCache = false)
-  }
-}
-
-object SparkPodInitContainer extends Logging {
-
-  def main(args: Array[String]): Unit = {
-    logInfo("Starting init-container to download Spark application dependencies.")
-    val sparkConf = new SparkConf(true)
-    if (args.nonEmpty) {
-      Utils.loadDefaultSparkProperties(sparkConf, args(0))
-    }
-
-    val securityManager = new SparkSecurityManager(sparkConf)
-    val fileFetcher = new FileFetcher(sparkConf, securityManager)
-    new SparkPodInitContainer(sparkConf, fileFetcher).run()
-    logInfo("Finished downloading application dependencies.")
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/95f9659a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/SparkPodInitContainerSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/SparkPodInitContainerSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/SparkPodInitContainerSuite.scala
new file mode 100644
index 0000000..e0f29ec
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/SparkPodInitContainerSuite.scala
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+import java.io.File
+import java.util.UUID
+
+import com.google.common.base.Charsets
+import com.google.common.io.Files
+import org.mockito.Mockito
+import org.scalatest.BeforeAndAfter
+import org.scalatest.mockito.MockitoSugar._
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.util.Utils
+
+class SparkPodInitContainerSuite extends SparkFunSuite with BeforeAndAfter {
+
+  private val DOWNLOAD_JARS_SECRET_LOCATION = createTempFile("txt")
+  private val DOWNLOAD_FILES_SECRET_LOCATION = createTempFile("txt")
+
+  private var downloadJarsDir: File = _
+  private var downloadFilesDir: File = _
+  private var downloadJarsSecretValue: String = _
+  private var downloadFilesSecretValue: String = _
+  private var fileFetcher: FileFetcher = _
+
+  override def beforeAll(): Unit = {
+    downloadJarsSecretValue = Files.toString(
+      new File(DOWNLOAD_JARS_SECRET_LOCATION), Charsets.UTF_8)
+    downloadFilesSecretValue = Files.toString(
+      new File(DOWNLOAD_FILES_SECRET_LOCATION), Charsets.UTF_8)
+  }
+
+  before {
+    downloadJarsDir = Utils.createTempDir()
+    downloadFilesDir = Utils.createTempDir()
+    fileFetcher = mock[FileFetcher]
+  }
+
+  after {
+    downloadJarsDir.delete()
+    downloadFilesDir.delete()
+  }
+
+  test("Downloads from remote server should invoke the file fetcher") {
+    val sparkConf = getSparkConfForRemoteFileDownloads
+    val initContainerUnderTest = new SparkPodInitContainer(sparkConf, fileFetcher)
+    initContainerUnderTest.run()
+    Mockito.verify(fileFetcher).fetchFile("http://localhost:9000/jar1.jar", downloadJarsDir)
+    Mockito.verify(fileFetcher).fetchFile("hdfs://localhost:9000/jar2.jar", downloadJarsDir)
+    Mockito.verify(fileFetcher).fetchFile("http://localhost:9000/file.txt", downloadFilesDir)
+  }
+
+  private def getSparkConfForRemoteFileDownloads: SparkConf = {
+    new SparkConf(true)
+      .set(INIT_CONTAINER_REMOTE_JARS,
+        "http://localhost:9000/jar1.jar,hdfs://localhost:9000/jar2.jar")
+      .set(INIT_CONTAINER_REMOTE_FILES,
+        "http://localhost:9000/file.txt")
+      .set(JARS_DOWNLOAD_LOCATION, downloadJarsDir.getAbsolutePath)
+      .set(FILES_DOWNLOAD_LOCATION, downloadFilesDir.getAbsolutePath)
+  }
+
+  private def createTempFile(extension: String): String = {
+    val dir = Utils.createTempDir()
+    val file = new File(dir, s"${UUID.randomUUID().toString}.$extension")
+    Files.write(UUID.randomUUID().toString, file, Charsets.UTF_8)
+    file.getAbsolutePath
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/95f9659a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/k8s/SparkPodInitContainerSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/k8s/SparkPodInitContainerSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/k8s/SparkPodInitContainerSuite.scala
deleted file mode 100644
index 6c557ec..0000000
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/k8s/SparkPodInitContainerSuite.scala
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.rest.k8s
-
-import java.io.File
-import java.util.UUID
-
-import com.google.common.base.Charsets
-import com.google.common.io.Files
-import org.mockito.Mockito
-import org.scalatest.BeforeAndAfter
-import org.scalatest.mockito.MockitoSugar._
-
-import org.apache.spark.{SparkConf, SparkFunSuite}
-import org.apache.spark.deploy.k8s.Config._
-import org.apache.spark.util.Utils
-
-class SparkPodInitContainerSuite extends SparkFunSuite with BeforeAndAfter {
-
-  private val DOWNLOAD_JARS_SECRET_LOCATION = createTempFile("txt")
-  private val DOWNLOAD_FILES_SECRET_LOCATION = createTempFile("txt")
-
-  private var downloadJarsDir: File = _
-  private var downloadFilesDir: File = _
-  private var downloadJarsSecretValue: String = _
-  private var downloadFilesSecretValue: String = _
-  private var fileFetcher: FileFetcher = _
-
-  override def beforeAll(): Unit = {
-    downloadJarsSecretValue = Files.toString(
-      new File(DOWNLOAD_JARS_SECRET_LOCATION), Charsets.UTF_8)
-    downloadFilesSecretValue = Files.toString(
-      new File(DOWNLOAD_FILES_SECRET_LOCATION), Charsets.UTF_8)
-  }
-
-  before {
-    downloadJarsDir = Utils.createTempDir()
-    downloadFilesDir = Utils.createTempDir()
-    fileFetcher = mock[FileFetcher]
-  }
-
-  after {
-    downloadJarsDir.delete()
-    downloadFilesDir.delete()
-  }
-
-  test("Downloads from remote server should invoke the file fetcher") {
-    val sparkConf = getSparkConfForRemoteFileDownloads
-    val initContainerUnderTest = new SparkPodInitContainer(sparkConf, fileFetcher)
-    initContainerUnderTest.run()
-    Mockito.verify(fileFetcher).fetchFile("http://localhost:9000/jar1.jar", downloadJarsDir)
-    Mockito.verify(fileFetcher).fetchFile("hdfs://localhost:9000/jar2.jar", downloadJarsDir)
-    Mockito.verify(fileFetcher).fetchFile("http://localhost:9000/file.txt", downloadFilesDir)
-  }
-
-  private def getSparkConfForRemoteFileDownloads: SparkConf = {
-    new SparkConf(true)
-      .set(INIT_CONTAINER_REMOTE_JARS,
-        "http://localhost:9000/jar1.jar,hdfs://localhost:9000/jar2.jar")
-      .set(INIT_CONTAINER_REMOTE_FILES,
-        "http://localhost:9000/file.txt")
-      .set(JARS_DOWNLOAD_LOCATION, downloadJarsDir.getAbsolutePath)
-      .set(FILES_DOWNLOAD_LOCATION, downloadFilesDir.getAbsolutePath)
-  }
-
-  private def createTempFile(extension: String): String = {
-    val dir = Utils.createTempDir()
-    val file = new File(dir, s"${UUID.randomUUID().toString}.$extension")
-    Files.write(UUID.randomUUID().toString, file, Charsets.UTF_8)
-    file.getAbsolutePath
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/95f9659a/resource-managers/kubernetes/docker/src/main/dockerfiles/init-container/Dockerfile
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/init-container/Dockerfile b/resource-managers/kubernetes/docker/src/main/dockerfiles/init-container/Dockerfile
index 0554931..047056a 100644
--- a/resource-managers/kubernetes/docker/src/main/dockerfiles/init-container/Dockerfile
+++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/init-container/Dockerfile
@@ -21,4 +21,4 @@ FROM spark-base
 # command should be invoked from the top level directory of the Spark distribution. E.g.:
 # docker build -t spark-init:latest -f kubernetes/dockerfiles/init-container/Dockerfile .
 
-ENTRYPOINT [ "/opt/entrypoint.sh", "/opt/spark/bin/spark-class", "org.apache.spark.deploy.rest.k8s.SparkPodInitContainer" ]
+ENTRYPOINT [ "/opt/entrypoint.sh", "/opt/spark/bin/spark-class", "org.apache.spark.deploy.k8s.SparkPodInitContainer" ]


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org