You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by mc...@apache.org on 2018/07/14 00:19:34 UTC
spark git commit: [SPARK-23984][K8S][TEST] Added Integration Tests
for PySpark on Kubernetes
Repository: spark
Updated Branches:
refs/heads/master a75571b46 -> f1a99ad58
[SPARK-23984][K8S][TEST] Added Integration Tests for PySpark on Kubernetes
## What changes were proposed in this pull request?
I added integration tests for PySpark ( + checking JVM options + RemoteFileTest) which wasn't properly merged in the initial integration test PR.
## How was this patch tested?
I tested this with integration tests using:
`dev/dev-run-integration-tests.sh --spark-tgz spark-2.4.0-SNAPSHOT-bin-2.7.3.tgz`
Author: Ilan Filonenko <if...@cornell.edu>
Closes #21583 from ifilonenko/master.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f1a99ad5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f1a99ad5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f1a99ad5
Branch: refs/heads/master
Commit: f1a99ad5825daf1b4cc275146ba8460cbcdf9701
Parents: a75571b
Author: Ilan Filonenko <if...@cornell.edu>
Authored: Fri Jul 13 17:19:28 2018 -0700
Committer: mcheah <mc...@palantir.com>
Committed: Fri Jul 13 17:19:28 2018 -0700
----------------------------------------------------------------------
.../k8s/integrationtest/KubernetesSuite.scala | 171 ++++++++++++++++---
.../KubernetesTestComponents.scala | 28 ++-
2 files changed, 167 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/f1a99ad5/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
index 6e334c8..774c393 100644
--- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
@@ -22,7 +22,7 @@ import java.util.UUID
import java.util.regex.Pattern
import com.google.common.io.PatternFilenameFilter
-import io.fabric8.kubernetes.api.model.{Container, Pod}
+import io.fabric8.kubernetes.api.model.Pod
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
import org.scalatest.concurrent.{Eventually, PatienceConfiguration}
import org.scalatest.time.{Minutes, Seconds, Span}
@@ -43,6 +43,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite
private var kubernetesTestComponents: KubernetesTestComponents = _
private var sparkAppConf: SparkAppConf = _
private var image: String = _
+ private var pyImage: String = _
private var containerLocalSparkDistroExamplesJar: String = _
private var appLocator: String = _
private var driverPodName: String = _
@@ -65,6 +66,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite
val imageTag = getTestImageTag
val imageRepo = getTestImageRepo
image = s"$imageRepo/spark:$imageTag"
+ pyImage = s"$imageRepo/spark-py:$imageTag"
val sparkDistroExamplesJarFile: File = sparkHomeDir.resolve(Paths.get("examples", "jars"))
.toFile
@@ -156,22 +158,77 @@ private[spark] class KubernetesSuite extends SparkFunSuite
})
}
- // TODO(ssuchter): Enable the below after debugging
- // test("Run PageRank using remote data file") {
- // sparkAppConf
- // .set("spark.kubernetes.mountDependencies.filesDownloadDir",
- // CONTAINER_LOCAL_FILE_DOWNLOAD_PATH)
- // .set("spark.files", REMOTE_PAGE_RANK_DATA_FILE)
- // runSparkPageRankAndVerifyCompletion(
- // appArgs = Array(CONTAINER_LOCAL_DOWNLOADED_PAGE_RANK_DATA_FILE))
- // }
+ test("Run extraJVMOptions check on driver") {
+ sparkAppConf
+ .set("spark.driver.extraJavaOptions", "-Dspark.test.foo=spark.test.bar")
+ runSparkJVMCheckAndVerifyCompletion(
+ expectedJVMValue = Seq("(spark.test.foo,spark.test.bar)"))
+ }
+
+ test("Run SparkRemoteFileTest using a remote data file") {
+ sparkAppConf
+ .set("spark.files", REMOTE_PAGE_RANK_DATA_FILE)
+ runSparkRemoteCheckAndVerifyCompletion(
+ appArgs = Array(REMOTE_PAGE_RANK_FILE_NAME))
+ }
+
+ test("Run PySpark on simple pi.py example") {
+ sparkAppConf
+ .set("spark.kubernetes.container.image", s"${getTestImageRepo}/spark-py:${getTestImageTag}")
+ runSparkApplicationAndVerifyCompletion(
+ appResource = PYSPARK_PI,
+ mainClass = "",
+ expectedLogOnCompletion = Seq("Pi is roughly 3"),
+ appArgs = Array("5"),
+ driverPodChecker = doBasicDriverPyPodCheck,
+ executorPodChecker = doBasicExecutorPyPodCheck,
+ appLocator = appLocator,
+ isJVM = false)
+ }
+
+ test("Run PySpark with Python2 to test a pyfiles example") {
+ sparkAppConf
+ .set("spark.kubernetes.container.image", s"${getTestImageRepo}/spark-py:${getTestImageTag}")
+ .set("spark.kubernetes.pyspark.pythonversion", "2")
+ runSparkApplicationAndVerifyCompletion(
+ appResource = PYSPARK_FILES,
+ mainClass = "",
+ expectedLogOnCompletion = Seq(
+ "Python runtime version check is: True",
+ "Python environment version check is: True"),
+ appArgs = Array("python"),
+ driverPodChecker = doBasicDriverPyPodCheck,
+ executorPodChecker = doBasicExecutorPyPodCheck,
+ appLocator = appLocator,
+ isJVM = false,
+ pyFiles = Some(PYSPARK_CONTAINER_TESTS))
+ }
+
+ test("Run PySpark with Python3 to test a pyfiles example") {
+ sparkAppConf
+ .set("spark.kubernetes.container.image", s"${getTestImageRepo}/spark-py:${getTestImageTag}")
+ .set("spark.kubernetes.pyspark.pythonversion", "3")
+ runSparkApplicationAndVerifyCompletion(
+ appResource = PYSPARK_FILES,
+ mainClass = "",
+ expectedLogOnCompletion = Seq(
+ "Python runtime version check is: True",
+ "Python environment version check is: True"),
+ appArgs = Array("python3"),
+ driverPodChecker = doBasicDriverPyPodCheck,
+ executorPodChecker = doBasicExecutorPyPodCheck,
+ appLocator = appLocator,
+ isJVM = false,
+ pyFiles = Some(PYSPARK_CONTAINER_TESTS))
+ }
private def runSparkPiAndVerifyCompletion(
appResource: String = containerLocalSparkDistroExamplesJar,
driverPodChecker: Pod => Unit = doBasicDriverPodCheck,
executorPodChecker: Pod => Unit = doBasicExecutorPodCheck,
appArgs: Array[String] = Array.empty[String],
- appLocator: String = appLocator): Unit = {
+ appLocator: String = appLocator,
+ isJVM: Boolean = true ): Unit = {
runSparkApplicationAndVerifyCompletion(
appResource,
SPARK_PI_MAIN_CLASS,
@@ -179,10 +236,11 @@ private[spark] class KubernetesSuite extends SparkFunSuite
appArgs,
driverPodChecker,
executorPodChecker,
- appLocator)
+ appLocator,
+ isJVM)
}
- private def runSparkPageRankAndVerifyCompletion(
+ private def runSparkRemoteCheckAndVerifyCompletion(
appResource: String = containerLocalSparkDistroExamplesJar,
driverPodChecker: Pod => Unit = doBasicDriverPodCheck,
executorPodChecker: Pod => Unit = doBasicExecutorPodCheck,
@@ -190,12 +248,50 @@ private[spark] class KubernetesSuite extends SparkFunSuite
appLocator: String = appLocator): Unit = {
runSparkApplicationAndVerifyCompletion(
appResource,
- SPARK_PAGE_RANK_MAIN_CLASS,
- Seq("1 has rank", "2 has rank", "3 has rank", "4 has rank"),
+ SPARK_REMOTE_MAIN_CLASS,
+ Seq(s"Mounting of ${appArgs.head} was true"),
appArgs,
driverPodChecker,
executorPodChecker,
- appLocator)
+ appLocator,
+ true)
+ }
+
+ private def runSparkJVMCheckAndVerifyCompletion(
+ appResource: String = containerLocalSparkDistroExamplesJar,
+ mainClass: String = SPARK_DRIVER_MAIN_CLASS,
+ driverPodChecker: Pod => Unit = doBasicDriverPodCheck,
+ appArgs: Array[String] = Array("5"),
+ expectedJVMValue: Seq[String]): Unit = {
+ val appArguments = SparkAppArguments(
+ mainAppResource = appResource,
+ mainClass = mainClass,
+ appArgs = appArgs)
+ SparkAppLauncher.launch(
+ appArguments,
+ sparkAppConf,
+ TIMEOUT.value.toSeconds.toInt,
+ sparkHomeDir,
+ true)
+
+ val driverPod = kubernetesTestComponents.kubernetesClient
+ .pods()
+ .withLabel("spark-app-locator", appLocator)
+ .withLabel("spark-role", "driver")
+ .list()
+ .getItems
+ .get(0)
+ doBasicDriverPodCheck(driverPod)
+
+ Eventually.eventually(TIMEOUT, INTERVAL) {
+ expectedJVMValue.foreach { e =>
+ assert(kubernetesTestComponents.kubernetesClient
+ .pods()
+ .withName(driverPod.getMetadata.getName)
+ .getLog
+ .contains(e), "The application did not complete.")
+ }
+ }
}
private def runSparkApplicationAndVerifyCompletion(
@@ -205,12 +301,20 @@ private[spark] class KubernetesSuite extends SparkFunSuite
appArgs: Array[String],
driverPodChecker: Pod => Unit,
executorPodChecker: Pod => Unit,
- appLocator: String): Unit = {
+ appLocator: String,
+ isJVM: Boolean,
+ pyFiles: Option[String] = None): Unit = {
val appArguments = SparkAppArguments(
mainAppResource = appResource,
mainClass = mainClass,
appArgs = appArgs)
- SparkAppLauncher.launch(appArguments, sparkAppConf, TIMEOUT.value.toSeconds.toInt, sparkHomeDir)
+ SparkAppLauncher.launch(
+ appArguments,
+ sparkAppConf,
+ TIMEOUT.value.toSeconds.toInt,
+ sparkHomeDir,
+ isJVM,
+ pyFiles)
val driverPod = kubernetesTestComponents.kubernetesClient
.pods()
@@ -248,11 +352,22 @@ private[spark] class KubernetesSuite extends SparkFunSuite
assert(driverPod.getSpec.getContainers.get(0).getName === "spark-kubernetes-driver")
}
+ private def doBasicDriverPyPodCheck(driverPod: Pod): Unit = {
+ assert(driverPod.getMetadata.getName === driverPodName)
+ assert(driverPod.getSpec.getContainers.get(0).getImage === pyImage)
+ assert(driverPod.getSpec.getContainers.get(0).getName === "spark-kubernetes-driver")
+ }
+
private def doBasicExecutorPodCheck(executorPod: Pod): Unit = {
assert(executorPod.getSpec.getContainers.get(0).getImage === image)
assert(executorPod.getSpec.getContainers.get(0).getName === "executor")
}
+ private def doBasicExecutorPyPodCheck(executorPod: Pod): Unit = {
+ assert(executorPod.getSpec.getContainers.get(0).getImage === pyImage)
+ assert(executorPod.getSpec.getContainers.get(0).getName === "executor")
+ }
+
private def checkCustomSettings(pod: Pod): Unit = {
assert(pod.getMetadata.getLabels.get("label1") === "label1-value")
assert(pod.getMetadata.getLabels.get("label2") === "label2-value")
@@ -287,14 +402,22 @@ private[spark] object KubernetesSuite {
val TIMEOUT = PatienceConfiguration.Timeout(Span(2, Minutes))
val INTERVAL = PatienceConfiguration.Interval(Span(2, Seconds))
val SPARK_PI_MAIN_CLASS: String = "org.apache.spark.examples.SparkPi"
+ val SPARK_REMOTE_MAIN_CLASS: String = "org.apache.spark.examples.SparkRemoteFileTest"
+ val SPARK_DRIVER_MAIN_CLASS: String = "org.apache.spark.examples.DriverSubmissionTest"
val SPARK_PAGE_RANK_MAIN_CLASS: String = "org.apache.spark.examples.SparkPageRank"
+ val CONTAINER_LOCAL_PYSPARK: String = "local:///opt/spark/examples/src/main/python/"
+ val PYSPARK_PI: String = CONTAINER_LOCAL_PYSPARK + "pi.py"
+ val PYSPARK_FILES: String = CONTAINER_LOCAL_PYSPARK + "pyfiles.py"
+ val PYSPARK_CONTAINER_TESTS: String = CONTAINER_LOCAL_PYSPARK + "py_container_checks.py"
- // val CONTAINER_LOCAL_FILE_DOWNLOAD_PATH = "/var/spark-data/spark-files"
+ val TEST_SECRET_NAME_PREFIX = "test-secret-"
+ val TEST_SECRET_KEY = "test-key"
+ val TEST_SECRET_VALUE = "test-data"
+ val TEST_SECRET_MOUNT_PATH = "/etc/secrets"
- // val REMOTE_PAGE_RANK_DATA_FILE =
- // "https://storage.googleapis.com/spark-k8s-integration-tests/files/pagerank_data.txt"
- // val CONTAINER_LOCAL_DOWNLOADED_PAGE_RANK_DATA_FILE =
- // s"$CONTAINER_LOCAL_FILE_DOWNLOAD_PATH/pagerank_data.txt"
+ val REMOTE_PAGE_RANK_DATA_FILE =
+ "https://storage.googleapis.com/spark-k8s-integration-tests/files/pagerank_data.txt"
+ val REMOTE_PAGE_RANK_FILE_NAME = "pagerank_data.txt"
- // case object ShuffleNotReadyException extends Exception
+ case object ShuffleNotReadyException extends Exception
}
http://git-wip-us.apache.org/repos/asf/spark/blob/f1a99ad5/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala
index b2471e5..a9b49a8 100644
--- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala
@@ -97,21 +97,33 @@ private[spark] case class SparkAppArguments(
appArgs: Array[String])
private[spark] object SparkAppLauncher extends Logging {
-
def launch(
appArguments: SparkAppArguments,
appConf: SparkAppConf,
timeoutSecs: Int,
- sparkHomeDir: Path): Unit = {
+ sparkHomeDir: Path,
+ isJVM: Boolean,
+ pyFiles: Option[String] = None): Unit = {
val sparkSubmitExecutable = sparkHomeDir.resolve(Paths.get("bin", "spark-submit"))
logInfo(s"Launching a spark app with arguments $appArguments and conf $appConf")
- val commandLine = (Array(sparkSubmitExecutable.toFile.getAbsolutePath,
+ val preCommandLine = if (isJVM) {
+ mutable.ArrayBuffer(sparkSubmitExecutable.toFile.getAbsolutePath,
"--deploy-mode", "cluster",
"--class", appArguments.mainClass,
- "--master", appConf.get("spark.master")
- ) ++ appConf.toStringArray :+
- appArguments.mainAppResource) ++
- appArguments.appArgs
- ProcessUtils.executeProcess(commandLine, timeoutSecs)
+ "--master", appConf.get("spark.master"))
+ } else {
+ mutable.ArrayBuffer(sparkSubmitExecutable.toFile.getAbsolutePath,
+ "--deploy-mode", "cluster",
+ "--master", appConf.get("spark.master"))
+ }
+ val commandLine =
+ pyFiles.map(s => preCommandLine ++ Array("--py-files", s)).getOrElse(preCommandLine) ++
+ appConf.toStringArray :+ appArguments.mainAppResource
+
+ if (appArguments.appArgs.nonEmpty) {
+ commandLine += appArguments.appArgs.mkString(" ")
+ }
+ logInfo(s"Launching a spark app with command line: ${commandLine.mkString(" ")}")
+ ProcessUtils.executeProcess(commandLine.toArray, timeoutSecs)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org