You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@submarine.apache.org by ka...@apache.org on 2021/07/06 05:33:17 UTC
[submarine] branch master updated: SUBMARINE-900. Create a
submarine mlflow for single user
This is an automated email from the ASF dual-hosted git repository.
kaihsun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/submarine.git
The following commit(s) were added to refs/heads/master by this push:
new 20b76e6 SUBMARINE-900. Create a submarine mlflow for single user
20b76e6 is described below
commit 20b76e65177a86a6fd1a911a4f0f5f768f24b03e
Author: Kenchu123 <k8...@gmail.com>
AuthorDate: Mon Jul 5 19:57:43 2021 +0800
SUBMARINE-900. Create a submarine mlflow for single user
### What is this PR for?
Create a submarine mlflow by operator for single user.
### What type of PR is it?
[Feature]
### Todos
### What is the Jira issue?
https://issues.apache.org/jira/browse/SUBMARINE-900
### How should this be tested?
### Screenshots (if appropriate)
https://user-images.githubusercontent.com/17617373/124424951-72767500-dd9a-11eb-8e4c-db21ea8d56c6.mov
### Questions:
* Do the license files need updating? No
* Are there breaking changes for older versions? No
* Does this need new documentation? No
Author: Kenchu123 <k8...@gmail.com>
Signed-off-by: Kai-Hsun Chen <ka...@apache.org>
Closes #645 from Kenchu123/SUBMARINE-900 and squashes the following commits:
77ddabf0 [Kenchu123] SUBMARINE-900. rerun gofmt on mlflow
203b3908 [Kenchu123] SUBMARINE-900. change tab indent to spaces in controller and fix the format difference
4d1e391c [Kenchu123] SUBMARINE-900. Add type to mlflow service
b1136cd1 [Kenchu123] SUBMARINE-900. Create mlflow on applying a submarine
---
submarine-cloud-v2/go.mod | 4 +-
submarine-cloud-v2/pkg/controller/controller.go | 10 +
.../pkg/controller/submarine_mlflow.go | 342 +++++++++++++++++++++
3 files changed, 354 insertions(+), 2 deletions(-)
diff --git a/submarine-cloud-v2/go.mod b/submarine-cloud-v2/go.mod
index 96634f0..330a66f 100644
--- a/submarine-cloud-v2/go.mod
+++ b/submarine-cloud-v2/go.mod
@@ -3,10 +3,10 @@ module github.com/apache/submarine/submarine-cloud-v2
go 1.16
require (
- github.com/fatih/color v1.7.0
+ github.com/fatih/color v1.7.0 // indirect
github.com/gofrs/flock v0.8.0
github.com/pkg/errors v0.9.1
- github.com/stretchr/testify v1.7.0
+ github.com/stretchr/testify v1.7.0 // indirect
github.com/traefik/traefik/v2 v2.4.8
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a
gopkg.in/yaml.v2 v2.4.0
diff --git a/submarine-cloud-v2/pkg/controller/controller.go b/submarine-cloud-v2/pkg/controller/controller.go
index 8d46bef..bc9eaab 100644
--- a/submarine-cloud-v2/pkg/controller/controller.go
+++ b/submarine-cloud-v2/pkg/controller/controller.go
@@ -66,6 +66,7 @@ const (
serverName = "submarine-server"
databaseName = "submarine-database"
tensorboardName = "submarine-tensorboard"
+ mlflowName = "submarine-mlflow"
ingressName = serverName + "-ingress"
databasePvNamePrefix = databaseName + "-pv"
databasePvcName = databaseName + "-pvc"
@@ -73,6 +74,10 @@ const (
tensorboardPvcName = tensorboardName + "-pvc"
tensorboardServiceName = tensorboardName + "-service"
tensorboardIngressRouteName = tensorboardName + "-ingressroute"
+ mlflowPvNamePrefix = mlflowName + "-pv"
+ mlflowPvcName = mlflowName + "-pvc"
+ mlflowServiceName = mlflowName + "-service"
+ mlflowIngressRouteName = mlflowName + "-ingressroute"
)
// PersistentVolumes are not namespaced resources, so we add the namespace as a
@@ -480,6 +485,11 @@ func (c *Controller) syncHandler(workqueueItem WorkQueueItem) error {
return err
}
+ err = c.createSubmarineMlflow(submarine)
+ if err != nil {
+ return err
+ }
+
err = c.updateSubmarineStatus(submarine, serverDeployment, databaseDeployment)
if err != nil {
return err
diff --git a/submarine-cloud-v2/pkg/controller/submarine_mlflow.go b/submarine-cloud-v2/pkg/controller/submarine_mlflow.go
new file mode 100644
index 0000000..3f431e9
--- /dev/null
+++ b/submarine-cloud-v2/pkg/controller/submarine_mlflow.go
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package controller
+
+import (
+ "context"
+ "fmt"
+
+ v1alpha1 "github.com/apache/submarine/submarine-cloud-v2/pkg/apis/submarine/v1alpha1"
+ traefikv1alpha1 "github.com/traefik/traefik/v2/pkg/provider/kubernetes/crd/traefik/v1alpha1"
+ appsv1 "k8s.io/api/apps/v1"
+ corev1 "k8s.io/api/core/v1"
+ "k8s.io/apimachinery/pkg/api/errors"
+ "k8s.io/apimachinery/pkg/api/resource"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/util/intstr"
+ "k8s.io/klog/v2"
+)
+
+func newSubmarineMlflowPersistentVolume(submarine *v1alpha1.Submarine) *corev1.PersistentVolume {
+ var persistentVolumeSource corev1.PersistentVolumeSource
+ switch submarine.Spec.Storage.StorageType {
+ case "nfs":
+ persistentVolumeSource = corev1.PersistentVolumeSource{
+ NFS: &corev1.NFSVolumeSource{
+ Server: submarine.Spec.Storage.NfsIP,
+ Path: submarine.Spec.Storage.NfsPath,
+ },
+ }
+ case "host":
+ hostPathType := corev1.HostPathDirectoryOrCreate
+ persistentVolumeSource = corev1.PersistentVolumeSource{
+ HostPath: &corev1.HostPathVolumeSource{
+ Path: submarine.Spec.Storage.HostPath,
+ Type: &hostPathType,
+ },
+ }
+ }
+ return &corev1.PersistentVolume{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: pvName(mlflowPvNamePrefix, submarine.Namespace),
+ OwnerReferences: []metav1.OwnerReference{
+ *metav1.NewControllerRef(submarine, v1alpha1.SchemeGroupVersion.WithKind("Submarine")),
+ },
+ },
+ Spec: corev1.PersistentVolumeSpec{
+ AccessModes: []corev1.PersistentVolumeAccessMode{
+ corev1.ReadWriteMany,
+ },
+ Capacity: corev1.ResourceList{
+ corev1.ResourceStorage: resource.MustParse(submarine.Spec.Mlflow.StorageSize),
+ },
+ PersistentVolumeSource: persistentVolumeSource,
+ },
+ }
+}
+
+func newSubmarineMlflowPersistentVolumeClaim(submarine *v1alpha1.Submarine) *corev1.PersistentVolumeClaim {
+ storageClassName := ""
+ return &corev1.PersistentVolumeClaim{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: mlflowPvcName,
+ OwnerReferences: []metav1.OwnerReference{
+ *metav1.NewControllerRef(submarine, v1alpha1.SchemeGroupVersion.WithKind("Submarine")),
+ },
+ },
+ Spec: corev1.PersistentVolumeClaimSpec{
+ AccessModes: []corev1.PersistentVolumeAccessMode{
+ corev1.ReadWriteMany,
+ },
+ Resources: corev1.ResourceRequirements{
+ Requests: corev1.ResourceList{
+ corev1.ResourceStorage: resource.MustParse(submarine.Spec.Mlflow.StorageSize),
+ },
+ },
+ VolumeName: pvName(mlflowPvNamePrefix, submarine.Namespace),
+ StorageClassName: &storageClassName,
+ },
+ }
+}
+
+func newSubmarineMlflowDeployment(submarine *v1alpha1.Submarine) *appsv1.Deployment {
+ return &appsv1.Deployment{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: mlflowName,
+ OwnerReferences: []metav1.OwnerReference{
+ *metav1.NewControllerRef(submarine, v1alpha1.SchemeGroupVersion.WithKind("Submarine")),
+ },
+ },
+ Spec: appsv1.DeploymentSpec{
+ Selector: &metav1.LabelSelector{
+ MatchLabels: map[string]string{
+ "app": mlflowName + "-pod",
+ },
+ },
+ Template: corev1.PodTemplateSpec{
+ ObjectMeta: metav1.ObjectMeta{
+ Labels: map[string]string{
+ "app": mlflowName + "-pod",
+ },
+ },
+ Spec: corev1.PodSpec{
+ Containers: []corev1.Container{
+ {
+ Name: mlflowName + "-container",
+ Image: "apache/submarine:mlflow-0.6.0-SNAPSHOT",
+ ImagePullPolicy: "IfNotPresent",
+ Ports: []corev1.ContainerPort{
+ {
+ ContainerPort: 5000,
+ },
+ },
+ VolumeMounts: []corev1.VolumeMount{
+ {
+ MountPath: "/logs",
+ Name: "volume",
+ SubPath: mlflowName,
+ },
+ },
+ ReadinessProbe: &corev1.Probe{
+ Handler: corev1.Handler{
+ TCPSocket: &corev1.TCPSocketAction{
+ Port: intstr.FromInt(5000),
+ },
+ },
+ InitialDelaySeconds: 60,
+ PeriodSeconds: 10,
+ },
+ },
+ },
+ Volumes: []corev1.Volume{
+ {
+ Name: "volume",
+ VolumeSource: corev1.VolumeSource{
+ PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
+ ClaimName: mlflowPvcName,
+ },
+ },
+ },
+ },
+ },
+ },
+ },
+ }
+}
+
+func newSubmarineMlflowService(submarine *v1alpha1.Submarine) *corev1.Service {
+ return &corev1.Service{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: mlflowServiceName,
+ OwnerReferences: []metav1.OwnerReference{
+ *metav1.NewControllerRef(submarine, v1alpha1.SchemeGroupVersion.WithKind("Submarine")),
+ },
+ },
+ Spec: corev1.ServiceSpec{
+ Type: corev1.ServiceTypeClusterIP,
+ Selector: map[string]string{
+ "app": mlflowName + "-pod",
+ },
+ Ports: []corev1.ServicePort{
+ {
+ Protocol: "TCP",
+ Port: 5000,
+ TargetPort: intstr.FromInt(5000),
+ },
+ },
+ },
+ }
+}
+
+func newSubmarineMlflowIngressRoute(submarine *v1alpha1.Submarine) *traefikv1alpha1.IngressRoute {
+ return &traefikv1alpha1.IngressRoute{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: mlflowName + "-ingressroute",
+ OwnerReferences: []metav1.OwnerReference{
+ *metav1.NewControllerRef(submarine, v1alpha1.SchemeGroupVersion.WithKind("Submarine")),
+ },
+ },
+ Spec: traefikv1alpha1.IngressRouteSpec{
+ EntryPoints: []string{
+ "web",
+ },
+ Routes: []traefikv1alpha1.Route{
+ {
+ Kind: "Rule",
+ Match: "PathPrefix(`/mlflow`)",
+ Services: []traefikv1alpha1.Service{
+ {
+ LoadBalancerSpec: traefikv1alpha1.LoadBalancerSpec{
+ Kind: "Service",
+ Name: mlflowServiceName,
+ Port: 5000,
+ },
+ },
+ },
+ },
+ },
+ },
+ }
+}
+
+// createSubmarineMlflow is a function to create submarine-mlflow.
+// Reference: https://github.com/apache/submarine/blob/master/helm-charts/submarine/templates/submarine-mlflow.yaml
+func (c *Controller) createSubmarineMlflow(submarine *v1alpha1.Submarine) error {
+ klog.Info("[createSubmarineMlflow]")
+
+ // Step 1: Create PersistentVolume
+ // PersistentVolumes are not namespaced resources, so we add the namespace
+ // as a suffix to distinguish them
+ pv, err := c.persistentvolumeLister.Get(pvName(mlflowPvNamePrefix, submarine.Namespace))
+
+ // If the resource doesn't exist, we'll create it
+ if errors.IsNotFound(err) {
+ pv, err = c.kubeclientset.CoreV1().PersistentVolumes().Create(context.TODO(), newSubmarineMlflowPersistentVolume(submarine), metav1.CreateOptions{})
+ if err != nil {
+ klog.Info(err)
+ }
+ klog.Info(" Create PersistentVolume: ", pv.Name)
+ }
+ // If an error occurs during Get/Create, we'll requeue the item so we can
+ // attempt processing again later. This could have been caused by a
+ // temporary network failure, or any other transient reason.
+ if err != nil {
+ return err
+ }
+
+ if !metav1.IsControlledBy(pv, submarine) {
+ msg := fmt.Sprintf(MessageResourceExists, pv.Name)
+ c.recorder.Event(submarine, corev1.EventTypeWarning, ErrResourceExists, msg)
+ return fmt.Errorf(msg)
+ }
+
+ // Step 2: Create PersistentVolumeClaim
+ pvc, err := c.persistentvolumeclaimLister.PersistentVolumeClaims(submarine.Namespace).Get(mlflowPvcName)
+ // If the resource doesn't exist, we'll create it
+ if errors.IsNotFound(err) {
+ pvc, err = c.kubeclientset.CoreV1().PersistentVolumeClaims(submarine.Namespace).Create(context.TODO(),
+ newSubmarineMlflowPersistentVolumeClaim(submarine),
+ metav1.CreateOptions{})
+ if err != nil {
+ klog.Info(err)
+ }
+ klog.Info(" Create PersistentVolumeClaim: ", pvc.Name)
+ }
+ // If an error occurs during Get/Create, we'll requeue the item so we can
+ // attempt processing again later. This could have been caused by a
+ // temporary network failure, or any other transient reason.
+ if err != nil {
+ return err
+ }
+
+ if !metav1.IsControlledBy(pvc, submarine) {
+ msg := fmt.Sprintf(MessageResourceExists, pvc.Name)
+ c.recorder.Event(submarine, corev1.EventTypeWarning, ErrResourceExists, msg)
+ return fmt.Errorf(msg)
+ }
+
+ // Step 3: Create Deployment
+ deployment, err := c.deploymentLister.Deployments(submarine.Namespace).Get(mlflowName)
+ if errors.IsNotFound(err) {
+ deployment, err = c.kubeclientset.AppsV1().Deployments(submarine.Namespace).Create(context.TODO(), newSubmarineMlflowDeployment(submarine), metav1.CreateOptions{})
+ if err != nil {
+ klog.Info(err)
+ }
+ klog.Info(" Create Deployment: ", deployment.Name)
+ }
+ // If an error occurs during Get/Create, we'll requeue the item so we can
+ // attempt processing again later. This could have been caused by a
+ // temporary network failure, or any other transient reason.
+ if err != nil {
+ return err
+ }
+
+ if !metav1.IsControlledBy(deployment, submarine) {
+ msg := fmt.Sprintf(MessageResourceExists, deployment.Name)
+ c.recorder.Event(submarine, corev1.EventTypeWarning, ErrResourceExists, msg)
+ return fmt.Errorf(msg)
+ }
+
+ // Step 4: Create Service
+ service, err := c.serviceLister.Services(submarine.Namespace).Get(mlflowServiceName)
+ // If the resource doesn't exist, we'll create it
+ if errors.IsNotFound(err) {
+ service, err = c.kubeclientset.CoreV1().Services(submarine.Namespace).Create(context.TODO(), newSubmarineMlflowService(submarine), metav1.CreateOptions{})
+ if err != nil {
+ klog.Info(err)
+ }
+ klog.Info(" Create Service: ", service.Name)
+ }
+ // If an error occurs during Get/Create, we'll requeue the item so we can
+ // attempt processing again later. This could have been caused by a
+ // temporary network failure, or any other transient reason.
+ if err != nil {
+ return err
+ }
+
+ if !metav1.IsControlledBy(service, submarine) {
+ msg := fmt.Sprintf(MessageResourceExists, service.Name)
+ c.recorder.Event(submarine, corev1.EventTypeWarning, ErrResourceExists, msg)
+ return fmt.Errorf(msg)
+ }
+
+ // Step 5: Create IngressRoute
+ ingressroute, err := c.ingressrouteLister.IngressRoutes(submarine.Namespace).Get(mlflowIngressRouteName)
+ // If the resource doesn't exist, we'll create it
+ if errors.IsNotFound(err) {
+ ingressroute, err = c.traefikclientset.TraefikV1alpha1().IngressRoutes(submarine.Namespace).Create(context.TODO(), newSubmarineMlflowIngressRoute(submarine), metav1.CreateOptions{})
+ if err != nil {
+ klog.Info(err)
+ }
+ klog.Info(" Create IngressRoute: ", ingressroute.Name)
+ }
+ // If an error occurs during Get/Create, we'll requeue the item so we can
+ // attempt processing again later. This could have been caused by a
+ // temporary network failure, or any other transient reason.
+ if err != nil {
+ return err
+ }
+
+ if !metav1.IsControlledBy(ingressroute, submarine) {
+ msg := fmt.Sprintf(MessageResourceExists, ingressroute.Name)
+ c.recorder.Event(submarine, corev1.EventTypeWarning, ErrResourceExists, msg)
+ return fmt.Errorf(msg)
+ }
+
+ return nil
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@submarine.apache.org
For additional commands, e-mail: dev-help@submarine.apache.org