You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2018/01/04 23:00:16 UTC
spark git commit: [SPARK-22948][K8S] Move SparkPodInitContainer to
correct package.
Repository: spark
Updated Branches:
refs/heads/master d2cddc88e -> 95f9659ab
[SPARK-22948][K8S] Move SparkPodInitContainer to correct package.
Author: Marcelo Vanzin <va...@cloudera.com>
Closes #20156 from vanzin/SPARK-22948.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/95f9659a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/95f9659a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/95f9659a
Branch: refs/heads/master
Commit: 95f9659abe8845f9f3f42fd7ababd79e55c52489
Parents: d2cddc8
Author: Marcelo Vanzin <va...@cloudera.com>
Authored: Thu Jan 4 15:00:09 2018 -0800
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Thu Jan 4 15:00:09 2018 -0800
----------------------------------------------------------------------
dev/sparktestsupport/modules.py | 2 +-
.../deploy/k8s/SparkPodInitContainer.scala | 116 +++++++++++++++++++
.../deploy/rest/k8s/SparkPodInitContainer.scala | 116 -------------------
.../deploy/k8s/SparkPodInitContainerSuite.scala | 86 ++++++++++++++
.../rest/k8s/SparkPodInitContainerSuite.scala | 86 --------------
.../main/dockerfiles/init-container/Dockerfile | 2 +-
6 files changed, 204 insertions(+), 204 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/95f9659a/dev/sparktestsupport/modules.py
----------------------------------------------------------------------
diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index f834563..7164180 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -539,7 +539,7 @@ mesos = Module(
kubernetes = Module(
name="kubernetes",
dependencies=[],
- source_file_regexes=["resource-managers/kubernetes/core"],
+ source_file_regexes=["resource-managers/kubernetes"],
build_profile_flags=["-Pkubernetes"],
sbt_test_goals=["kubernetes/test"]
)
http://git-wip-us.apache.org/repos/asf/spark/blob/95f9659a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPodInitContainer.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPodInitContainer.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPodInitContainer.scala
new file mode 100644
index 0000000..c0f0878
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPodInitContainer.scala
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+import java.io.File
+import java.util.concurrent.TimeUnit
+
+import scala.concurrent.{ExecutionContext, Future}
+
+import org.apache.spark.{SecurityManager => SparkSecurityManager, SparkConf}
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+/**
+ * Process that fetches files from a resource staging server and/or arbitrary remote locations.
+ *
+ * The init-container can handle fetching files from any of those sources, but not all of the
+ * sources need to be specified. This allows for composing multiple instances of this container
+ * with different configurations for different download sources, or using the same container to
+ * download everything at once.
+ */
+private[spark] class SparkPodInitContainer(
+ sparkConf: SparkConf,
+ fileFetcher: FileFetcher) extends Logging {
+
+ private val maxThreadPoolSize = sparkConf.get(INIT_CONTAINER_MAX_THREAD_POOL_SIZE)
+ private implicit val downloadExecutor = ExecutionContext.fromExecutorService(
+ ThreadUtils.newDaemonCachedThreadPool("download-executor", maxThreadPoolSize))
+
+ private val jarsDownloadDir = new File(sparkConf.get(JARS_DOWNLOAD_LOCATION))
+ private val filesDownloadDir = new File(sparkConf.get(FILES_DOWNLOAD_LOCATION))
+
+ private val remoteJars = sparkConf.get(INIT_CONTAINER_REMOTE_JARS)
+ private val remoteFiles = sparkConf.get(INIT_CONTAINER_REMOTE_FILES)
+
+ private val downloadTimeoutMinutes = sparkConf.get(INIT_CONTAINER_MOUNT_TIMEOUT)
+
+ def run(): Unit = {
+ logInfo(s"Downloading remote jars: $remoteJars")
+ downloadFiles(
+ remoteJars,
+ jarsDownloadDir,
+ s"Remote jars download directory specified at $jarsDownloadDir does not exist " +
+ "or is not a directory.")
+
+ logInfo(s"Downloading remote files: $remoteFiles")
+ downloadFiles(
+ remoteFiles,
+ filesDownloadDir,
+ s"Remote files download directory specified at $filesDownloadDir does not exist " +
+ "or is not a directory.")
+
+ downloadExecutor.shutdown()
+ downloadExecutor.awaitTermination(downloadTimeoutMinutes, TimeUnit.MINUTES)
+ }
+
+ private def downloadFiles(
+ filesCommaSeparated: Option[String],
+ downloadDir: File,
+ errMessage: String): Unit = {
+ filesCommaSeparated.foreach { files =>
+ require(downloadDir.isDirectory, errMessage)
+ Utils.stringToSeq(files).foreach { file =>
+ Future[Unit] {
+ fileFetcher.fetchFile(file, downloadDir)
+ }
+ }
+ }
+ }
+}
+
+private class FileFetcher(sparkConf: SparkConf, securityManager: SparkSecurityManager) {
+
+ def fetchFile(uri: String, targetDir: File): Unit = {
+ Utils.fetchFile(
+ url = uri,
+ targetDir = targetDir,
+ conf = sparkConf,
+ securityMgr = securityManager,
+ hadoopConf = SparkHadoopUtil.get.newConfiguration(sparkConf),
+ timestamp = System.currentTimeMillis(),
+ useCache = false)
+ }
+}
+
+object SparkPodInitContainer extends Logging {
+
+ def main(args: Array[String]): Unit = {
+ logInfo("Starting init-container to download Spark application dependencies.")
+ val sparkConf = new SparkConf(true)
+ if (args.nonEmpty) {
+ Utils.loadDefaultSparkProperties(sparkConf, args(0))
+ }
+
+ val securityManager = new SparkSecurityManager(sparkConf)
+ val fileFetcher = new FileFetcher(sparkConf, securityManager)
+ new SparkPodInitContainer(sparkConf, fileFetcher).run()
+ logInfo("Finished downloading application dependencies.")
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/95f9659a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/k8s/SparkPodInitContainer.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/k8s/SparkPodInitContainer.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/k8s/SparkPodInitContainer.scala
deleted file mode 100644
index 4a4b628..0000000
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/k8s/SparkPodInitContainer.scala
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.rest.k8s
-
-import java.io.File
-import java.util.concurrent.TimeUnit
-
-import scala.concurrent.{ExecutionContext, Future}
-
-import org.apache.spark.{SecurityManager => SparkSecurityManager, SparkConf}
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.deploy.k8s.Config._
-import org.apache.spark.internal.Logging
-import org.apache.spark.util.{ThreadUtils, Utils}
-
-/**
- * Process that fetches files from a resource staging server and/or arbitrary remote locations.
- *
- * The init-container can handle fetching files from any of those sources, but not all of the
- * sources need to be specified. This allows for composing multiple instances of this container
- * with different configurations for different download sources, or using the same container to
- * download everything at once.
- */
-private[spark] class SparkPodInitContainer(
- sparkConf: SparkConf,
- fileFetcher: FileFetcher) extends Logging {
-
- private val maxThreadPoolSize = sparkConf.get(INIT_CONTAINER_MAX_THREAD_POOL_SIZE)
- private implicit val downloadExecutor = ExecutionContext.fromExecutorService(
- ThreadUtils.newDaemonCachedThreadPool("download-executor", maxThreadPoolSize))
-
- private val jarsDownloadDir = new File(sparkConf.get(JARS_DOWNLOAD_LOCATION))
- private val filesDownloadDir = new File(sparkConf.get(FILES_DOWNLOAD_LOCATION))
-
- private val remoteJars = sparkConf.get(INIT_CONTAINER_REMOTE_JARS)
- private val remoteFiles = sparkConf.get(INIT_CONTAINER_REMOTE_FILES)
-
- private val downloadTimeoutMinutes = sparkConf.get(INIT_CONTAINER_MOUNT_TIMEOUT)
-
- def run(): Unit = {
- logInfo(s"Downloading remote jars: $remoteJars")
- downloadFiles(
- remoteJars,
- jarsDownloadDir,
- s"Remote jars download directory specified at $jarsDownloadDir does not exist " +
- "or is not a directory.")
-
- logInfo(s"Downloading remote files: $remoteFiles")
- downloadFiles(
- remoteFiles,
- filesDownloadDir,
- s"Remote files download directory specified at $filesDownloadDir does not exist " +
- "or is not a directory.")
-
- downloadExecutor.shutdown()
- downloadExecutor.awaitTermination(downloadTimeoutMinutes, TimeUnit.MINUTES)
- }
-
- private def downloadFiles(
- filesCommaSeparated: Option[String],
- downloadDir: File,
- errMessage: String): Unit = {
- filesCommaSeparated.foreach { files =>
- require(downloadDir.isDirectory, errMessage)
- Utils.stringToSeq(files).foreach { file =>
- Future[Unit] {
- fileFetcher.fetchFile(file, downloadDir)
- }
- }
- }
- }
-}
-
-private class FileFetcher(sparkConf: SparkConf, securityManager: SparkSecurityManager) {
-
- def fetchFile(uri: String, targetDir: File): Unit = {
- Utils.fetchFile(
- url = uri,
- targetDir = targetDir,
- conf = sparkConf,
- securityMgr = securityManager,
- hadoopConf = SparkHadoopUtil.get.newConfiguration(sparkConf),
- timestamp = System.currentTimeMillis(),
- useCache = false)
- }
-}
-
-object SparkPodInitContainer extends Logging {
-
- def main(args: Array[String]): Unit = {
- logInfo("Starting init-container to download Spark application dependencies.")
- val sparkConf = new SparkConf(true)
- if (args.nonEmpty) {
- Utils.loadDefaultSparkProperties(sparkConf, args(0))
- }
-
- val securityManager = new SparkSecurityManager(sparkConf)
- val fileFetcher = new FileFetcher(sparkConf, securityManager)
- new SparkPodInitContainer(sparkConf, fileFetcher).run()
- logInfo("Finished downloading application dependencies.")
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/95f9659a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/SparkPodInitContainerSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/SparkPodInitContainerSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/SparkPodInitContainerSuite.scala
new file mode 100644
index 0000000..e0f29ec
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/SparkPodInitContainerSuite.scala
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+import java.io.File
+import java.util.UUID
+
+import com.google.common.base.Charsets
+import com.google.common.io.Files
+import org.mockito.Mockito
+import org.scalatest.BeforeAndAfter
+import org.scalatest.mockito.MockitoSugar._
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.util.Utils
+
+class SparkPodInitContainerSuite extends SparkFunSuite with BeforeAndAfter {
+
+ private val DOWNLOAD_JARS_SECRET_LOCATION = createTempFile("txt")
+ private val DOWNLOAD_FILES_SECRET_LOCATION = createTempFile("txt")
+
+ private var downloadJarsDir: File = _
+ private var downloadFilesDir: File = _
+ private var downloadJarsSecretValue: String = _
+ private var downloadFilesSecretValue: String = _
+ private var fileFetcher: FileFetcher = _
+
+ override def beforeAll(): Unit = {
+ downloadJarsSecretValue = Files.toString(
+ new File(DOWNLOAD_JARS_SECRET_LOCATION), Charsets.UTF_8)
+ downloadFilesSecretValue = Files.toString(
+ new File(DOWNLOAD_FILES_SECRET_LOCATION), Charsets.UTF_8)
+ }
+
+ before {
+ downloadJarsDir = Utils.createTempDir()
+ downloadFilesDir = Utils.createTempDir()
+ fileFetcher = mock[FileFetcher]
+ }
+
+ after {
+ downloadJarsDir.delete()
+ downloadFilesDir.delete()
+ }
+
+ test("Downloads from remote server should invoke the file fetcher") {
+ val sparkConf = getSparkConfForRemoteFileDownloads
+ val initContainerUnderTest = new SparkPodInitContainer(sparkConf, fileFetcher)
+ initContainerUnderTest.run()
+ Mockito.verify(fileFetcher).fetchFile("http://localhost:9000/jar1.jar", downloadJarsDir)
+ Mockito.verify(fileFetcher).fetchFile("hdfs://localhost:9000/jar2.jar", downloadJarsDir)
+ Mockito.verify(fileFetcher).fetchFile("http://localhost:9000/file.txt", downloadFilesDir)
+ }
+
+ private def getSparkConfForRemoteFileDownloads: SparkConf = {
+ new SparkConf(true)
+ .set(INIT_CONTAINER_REMOTE_JARS,
+ "http://localhost:9000/jar1.jar,hdfs://localhost:9000/jar2.jar")
+ .set(INIT_CONTAINER_REMOTE_FILES,
+ "http://localhost:9000/file.txt")
+ .set(JARS_DOWNLOAD_LOCATION, downloadJarsDir.getAbsolutePath)
+ .set(FILES_DOWNLOAD_LOCATION, downloadFilesDir.getAbsolutePath)
+ }
+
+ private def createTempFile(extension: String): String = {
+ val dir = Utils.createTempDir()
+ val file = new File(dir, s"${UUID.randomUUID().toString}.$extension")
+ Files.write(UUID.randomUUID().toString, file, Charsets.UTF_8)
+ file.getAbsolutePath
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/95f9659a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/k8s/SparkPodInitContainerSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/k8s/SparkPodInitContainerSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/k8s/SparkPodInitContainerSuite.scala
deleted file mode 100644
index 6c557ec..0000000
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/k8s/SparkPodInitContainerSuite.scala
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.rest.k8s
-
-import java.io.File
-import java.util.UUID
-
-import com.google.common.base.Charsets
-import com.google.common.io.Files
-import org.mockito.Mockito
-import org.scalatest.BeforeAndAfter
-import org.scalatest.mockito.MockitoSugar._
-
-import org.apache.spark.{SparkConf, SparkFunSuite}
-import org.apache.spark.deploy.k8s.Config._
-import org.apache.spark.util.Utils
-
-class SparkPodInitContainerSuite extends SparkFunSuite with BeforeAndAfter {
-
- private val DOWNLOAD_JARS_SECRET_LOCATION = createTempFile("txt")
- private val DOWNLOAD_FILES_SECRET_LOCATION = createTempFile("txt")
-
- private var downloadJarsDir: File = _
- private var downloadFilesDir: File = _
- private var downloadJarsSecretValue: String = _
- private var downloadFilesSecretValue: String = _
- private var fileFetcher: FileFetcher = _
-
- override def beforeAll(): Unit = {
- downloadJarsSecretValue = Files.toString(
- new File(DOWNLOAD_JARS_SECRET_LOCATION), Charsets.UTF_8)
- downloadFilesSecretValue = Files.toString(
- new File(DOWNLOAD_FILES_SECRET_LOCATION), Charsets.UTF_8)
- }
-
- before {
- downloadJarsDir = Utils.createTempDir()
- downloadFilesDir = Utils.createTempDir()
- fileFetcher = mock[FileFetcher]
- }
-
- after {
- downloadJarsDir.delete()
- downloadFilesDir.delete()
- }
-
- test("Downloads from remote server should invoke the file fetcher") {
- val sparkConf = getSparkConfForRemoteFileDownloads
- val initContainerUnderTest = new SparkPodInitContainer(sparkConf, fileFetcher)
- initContainerUnderTest.run()
- Mockito.verify(fileFetcher).fetchFile("http://localhost:9000/jar1.jar", downloadJarsDir)
- Mockito.verify(fileFetcher).fetchFile("hdfs://localhost:9000/jar2.jar", downloadJarsDir)
- Mockito.verify(fileFetcher).fetchFile("http://localhost:9000/file.txt", downloadFilesDir)
- }
-
- private def getSparkConfForRemoteFileDownloads: SparkConf = {
- new SparkConf(true)
- .set(INIT_CONTAINER_REMOTE_JARS,
- "http://localhost:9000/jar1.jar,hdfs://localhost:9000/jar2.jar")
- .set(INIT_CONTAINER_REMOTE_FILES,
- "http://localhost:9000/file.txt")
- .set(JARS_DOWNLOAD_LOCATION, downloadJarsDir.getAbsolutePath)
- .set(FILES_DOWNLOAD_LOCATION, downloadFilesDir.getAbsolutePath)
- }
-
- private def createTempFile(extension: String): String = {
- val dir = Utils.createTempDir()
- val file = new File(dir, s"${UUID.randomUUID().toString}.$extension")
- Files.write(UUID.randomUUID().toString, file, Charsets.UTF_8)
- file.getAbsolutePath
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/95f9659a/resource-managers/kubernetes/docker/src/main/dockerfiles/init-container/Dockerfile
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/init-container/Dockerfile b/resource-managers/kubernetes/docker/src/main/dockerfiles/init-container/Dockerfile
index 0554931..047056a 100644
--- a/resource-managers/kubernetes/docker/src/main/dockerfiles/init-container/Dockerfile
+++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/init-container/Dockerfile
@@ -21,4 +21,4 @@ FROM spark-base
# command should be invoked from the top level directory of the Spark distribution. E.g.:
# docker build -t spark-init:latest -f kubernetes/dockerfiles/init-container/Dockerfile .
-ENTRYPOINT [ "/opt/entrypoint.sh", "/opt/spark/bin/spark-class", "org.apache.spark.deploy.rest.k8s.SparkPodInitContainer" ]
+ENTRYPOINT [ "/opt/entrypoint.sh", "/opt/spark/bin/spark-class", "org.apache.spark.deploy.k8s.SparkPodInitContainer" ]
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org