You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/03/25 19:39:59 UTC
[spark] branch branch-3.0 updated: [SPARK-31244][K8S][TEST] Use
Minio instead of Ceph in K8S DepsTestsSuite
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 53221cd [SPARK-31244][K8S][TEST] Use Minio instead of Ceph in K8S DepsTestsSuite
53221cd is described below
commit 53221cda408e9be5d0d2ff5946c200cb43647dd9
Author: Dongjoon Hyun <do...@apache.org>
AuthorDate: Wed Mar 25 12:38:15 2020 -0700
[SPARK-31244][K8S][TEST] Use Minio instead of Ceph in K8S DepsTestsSuite
### What changes were proposed in this pull request?
This PR (SPARK-31244) replaces `Ceph` with `Minio` in K8S `DepsTestSuite`.
### Why are the changes needed?
Currently, `DepsTestsSuite` is using `ceph` for S3 storage. However, the used version and all new releases are broken on new `minikube` releases. We had better use more robust and small one.
```
$ minikube version
minikube version: v1.8.2
$ minikube -p minikube docker-env | source
$ docker run -it --rm -e NETWORK_AUTO_DETECT=4 -e RGW_FRONTEND_PORT=8000 -e SREE_PORT=5001 -e CEPH_DEMO_UID=nano -e CEPH_DAEMON=demo ceph/daemon:v4.0.3-stable-4.0-nautilus-centos-7-x86_64 /bin/sh
2020-03-25 04:26:21 /opt/ceph-container/bin/entrypoint.sh: ERROR- it looks like we have not been able to discover the network settings
$ docker run -it --rm -e NETWORK_AUTO_DETECT=4 -e RGW_FRONTEND_PORT=8000 -e SREE_PORT=5001 -e CEPH_DEMO_UID=nano -e CEPH_DAEMON=demo ceph/daemon:v4.0.11-stable-4.0-nautilus-centos-7 /bin/sh
2020-03-25 04:20:30 /opt/ceph-container/bin/entrypoint.sh: ERROR- it looks like we have not been able to discover the network settings
```
Also, the image size is unnecessarily big (almost `1GB`) and growing while `minio` is `55.8MB` with the same features.
```
$ docker images | grep ceph
ceph/daemon v4.0.3-stable-4.0-nautilus-centos-7-x86_64 a6a05ccdf924 6 months ago 852MB
ceph/daemon v4.0.11-stable-4.0-nautilus-centos-7 87f695550d8e 12 hours ago 901MB
$ docker images | grep minio
minio/minio latest 95c226551ea6 5 days ago 55.8MB
```
### Does this PR introduce any user-facing change?
No. (This is a test case change)
### How was this patch tested?
Pass the existing Jenkins K8s integration test job and test with the latest minikube.
```
$ minikube version
minikube version: v1.8.2
$ kubectl version --short
Client Version: v1.17.4
Server Version: v1.17.4
$ NO_MANUAL=1 ./dev/make-distribution.sh --r --pip --tgz -Pkubernetes
$ resource-managers/kubernetes/integration-tests/dev/dev-run-integration-tests.sh --spark-tgz $PWD/spark-*.tgz
...
KubernetesSuite:
- Run SparkPi with no resources
- Run SparkPi with a very long application name.
- Use SparkLauncher.NO_RESOURCE
- Run SparkPi with a master URL without a scheme.
- Run SparkPi with an argument.
- Run SparkPi with custom labels, annotations, and environment variables.
- All pods have the same service account by default
- Run extraJVMOptions check on driver
- Run SparkRemoteFileTest using a remote data file
- Run SparkPi with env and mount secrets.
- Run PySpark on simple pi.py example
- Run PySpark with Python2 to test a pyfiles example
- Run PySpark with Python3 to test a pyfiles example
- Run PySpark with memory customization
- Run in client mode.
- Start pod creation from template
- PVs with local storage *** FAILED *** // This is irrelevant to this PR.
- Launcher client dependencies // This is the fixed test case by this PR.
- Test basic decommissioning
- Run SparkR on simple dataframe.R example
Run completed in 12 minutes, 4 seconds.
...
```
The following is the working snapshot of `DepsTestSuite` test.
```
$ kubectl get all -ncf9438dd8a65436686b1196a6b73000f
NAME READY STATUS RESTARTS AGE
pod/minio-0 1/1 Running 0 70s
pod/spark-test-app-8494bddca3754390b9e59a2ef47584eb 1/1 Running 0 55s
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
service/minio-s3 NodePort 10.109.54.180 <none> 9000:30678/TCP 70s
service/spark-test-app-fd916b711061c7b8-driver-svc ClusterIP None <none> 7078/TCP,7079/TCP,4040/TCP 55s
NAME READY AGE
statefulset.apps/minio 1/1 70s
```
Closes #28015 from dongjoon-hyun/SPARK-31244.
Authored-by: Dongjoon Hyun <do...@apache.org>
Signed-off-by: Dongjoon Hyun <do...@apache.org>
(cherry picked from commit f206bbde3a8f64650236013d61680faba492d7a4)
Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
.../k8s/integrationtest/DepsTestsSuite.scala | 90 +++++++++-------------
1 file changed, 36 insertions(+), 54 deletions(-)
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DepsTestsSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DepsTestsSuite.scala
index 4141268..c35aa5c 100644
--- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DepsTestsSuite.scala
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DepsTestsSuite.scala
@@ -35,17 +35,16 @@ import org.apache.spark.deploy.k8s.integrationtest.backend.minikube.Minikube
private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite =>
import KubernetesSuite.k8sTestTag
- val cName = "ceph-nano"
+ val cName = "minio"
val svcName = s"$cName-s3"
- val bucket = "spark"
-
- private def getCephContainer(): Container = {
- val envVars = Map ( "NETWORK_AUTO_DETECT" -> "4",
- "RGW_FRONTEND_PORT" -> "8000",
- "SREE_PORT" -> "5001",
- "CEPH_DEMO_UID" -> "nano",
- "CEPH_DAEMON" -> "demo",
- "DEBUG" -> "verbose"
+ val BUCKET = "spark"
+ val ACCESS_KEY = "minio"
+ val SECRET_KEY = "miniostorage"
+
+ private def getMinioContainer(): Container = {
+ val envVars = Map (
+ "MINIO_ACCESS_KEY" -> ACCESS_KEY,
+ "MINIO_SECRET_KEY" -> SECRET_KEY
).map( envV =>
new EnvVarBuilder()
.withName(envV._1)
@@ -63,13 +62,14 @@ private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite =>
).asJava
new ContainerBuilder()
- .withImage("ceph/daemon:v4.0.3-stable-4.0-nautilus-centos-7-x86_64")
+ .withImage("minio/minio:latest")
.withImagePullPolicy("Always")
.withName(cName)
+ .withArgs("server", "/data")
.withPorts(new ContainerPortBuilder()
.withName(svcName)
.withProtocol("TCP")
- .withContainerPort(8000)
+ .withContainerPort(9000)
.build()
)
.withResources(new ResourceRequirementsBuilder()
@@ -81,10 +81,9 @@ private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite =>
.build()
}
- // Based on https://github.com/ceph/cn
- private def setupCephStorage(): Unit = {
- val labels = Map("app" -> "ceph", "daemon" -> "nano").asJava
- val cephService = new ServiceBuilder()
+ private def setupMinioStorage(): Unit = {
+ val labels = Map("app" -> "minio").asJava
+ val minioService = new ServiceBuilder()
.withNewMetadata()
.withName(svcName)
.withLabels(labels)
@@ -92,9 +91,9 @@ private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite =>
.withNewSpec()
.withPorts(new ServicePortBuilder()
.withName("https")
- .withPort(8000)
+ .withPort(9000)
.withProtocol("TCP")
- .withTargetPort(new IntOrString(8000))
+ .withTargetPort(new IntOrString(9000))
.build()
)
.withType("NodePort")
@@ -102,7 +101,7 @@ private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite =>
.endSpec()
.build()
- val cephStatefulSet = new StatefulSetBuilder()
+ val minioStatefulSet = new StatefulSetBuilder()
.withNewMetadata()
.withName(cName)
.withLabels(labels)
@@ -110,7 +109,7 @@ private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite =>
.withNewSpec()
.withReplicas(1)
.withNewSelector()
- .withMatchLabels(Map("app" -> "ceph").asJava)
+ .withMatchLabels(Map("app" -> "minio").asJava)
.endSelector()
.withServiceName(cName)
.withNewTemplate()
@@ -119,7 +118,7 @@ private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite =>
.withLabels(labels)
.endMetadata()
.withNewSpec()
- .withContainers(getCephContainer())
+ .withContainers(getMinioContainer())
.endSpec()
.endTemplate()
.endSpec()
@@ -128,16 +127,16 @@ private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite =>
kubernetesTestComponents
.kubernetesClient
.services()
- .create(cephService)
+ .create(minioService)
kubernetesTestComponents
.kubernetesClient
.apps()
.statefulSets()
- .create(cephStatefulSet)
+ .create(minioStatefulSet)
}
- private def deleteCephStorage(): Unit = {
+ private def deleteMinioStorage(): Unit = {
kubernetesTestComponents
.kubernetesClient
.apps()
@@ -155,47 +154,30 @@ private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite =>
test("Launcher client dependencies", k8sTestTag, MinikubeTag) {
val fileName = Utils.createTempFile(FILE_CONTENTS, HOST_PATH)
try {
- setupCephStorage()
- val cephUrlStr = getServiceUrl(svcName)
- val cephUrl = new URL(cephUrlStr)
- val cephHost = cephUrl.getHost
- val cephPort = cephUrl.getPort
+ setupMinioStorage()
+ val minioUrlStr = getServiceUrl(svcName)
+ val minioUrl = new URL(minioUrlStr)
+ val minioHost = minioUrl.getHost
+ val minioPort = minioUrl.getPort
val examplesJar = Utils.getExamplesJarAbsolutePath(sparkHomeDir)
- val (accessKey, secretKey) = getCephCredentials()
sparkAppConf
- .set("spark.hadoop.fs.s3a.access.key", accessKey)
- .set("spark.hadoop.fs.s3a.secret.key", secretKey)
+ .set("spark.hadoop.fs.s3a.access.key", ACCESS_KEY)
+ .set("spark.hadoop.fs.s3a.secret.key", SECRET_KEY)
.set("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
- .set("spark.hadoop.fs.s3a.endpoint", s"$cephHost:$cephPort")
- .set("spark.kubernetes.file.upload.path", s"s3a://$bucket")
+ .set("spark.hadoop.fs.s3a.endpoint", s"$minioHost:$minioPort")
+ .set("spark.kubernetes.file.upload.path", s"s3a://$BUCKET")
.set("spark.files", s"$HOST_PATH/$fileName")
.set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
.set("spark.jars.packages", "com.amazonaws:aws-java-sdk:" +
"1.7.4,org.apache.hadoop:hadoop-aws:2.7.6")
.set("spark.driver.extraJavaOptions", "-Divy.cache.dir=/tmp -Divy.home=/tmp")
- createS3Bucket(accessKey, secretKey, cephUrlStr)
+ createS3Bucket(ACCESS_KEY, SECRET_KEY, minioUrlStr)
runSparkRemoteCheckAndVerifyCompletion(appResource = examplesJar,
appArgs = Array(fileName),
timeout = Option(DEPS_TIMEOUT))
} finally {
// make sure this always runs
- deleteCephStorage()
- }
- }
-
- // There isn't a cleaner way to get the credentials
- // when ceph-nano runs on k8s
- private def getCephCredentials(): (String, String) = {
- Eventually.eventually(TIMEOUT, INTERVAL) {
- val cephPod = kubernetesTestComponents
- .kubernetesClient
- .pods()
- .withName(s"$cName-0")
- .get()
- implicit val podName: String = cephPod.getMetadata.getName
- implicit val components: KubernetesTestComponents = kubernetesTestComponents
- val contents = Utils.executeCommand("cat", "/nano_user_details")
- (extractS3Key(contents, "access_key"), extractS3Key(contents, "secret_key"))
+ deleteMinioStorage()
}
}
@@ -215,10 +197,10 @@ private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite =>
val credentials = new BasicAWSCredentials(accessKey, secretKey)
val s3client = new AmazonS3Client(credentials)
s3client.setEndpoint(endPoint)
- s3client.createBucket(bucket)
+ s3client.createBucket(BUCKET)
} catch {
case e: Exception =>
- throw new SparkException(s"Failed to create bucket $bucket.", e)
+ throw new SparkException(s"Failed to create bucket $BUCKET.", e)
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org