You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@submarine.apache.org by ka...@apache.org on 2021/07/06 05:33:17 UTC

[submarine] branch master updated: SUBMARINE-900. Create a submarine mlflow for single user

This is an automated email from the ASF dual-hosted git repository.

kaihsun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/submarine.git


The following commit(s) were added to refs/heads/master by this push:
     new 20b76e6  SUBMARINE-900. Create a submarine mlflow for single user
20b76e6 is described below

commit 20b76e65177a86a6fd1a911a4f0f5f768f24b03e
Author: Kenchu123 <k8...@gmail.com>
AuthorDate: Mon Jul 5 19:57:43 2021 +0800

    SUBMARINE-900. Create a submarine mlflow for single user
    
    ### What is this PR for?
    
    Create  a submarine mlflow by operator for single user.
    
    ### What type of PR is it?
    
    [Feature]
    
    ### Todos
    
    ### What is the Jira issue?
    
    https://issues.apache.org/jira/browse/SUBMARINE-900
    
    ### How should this be tested?
    
    ### Screenshots (if appropriate)
    
    https://user-images.githubusercontent.com/17617373/124424951-72767500-dd9a-11eb-8e4c-db21ea8d56c6.mov
    
    ### Questions:
    * Do the license files need updating? No
    * Are there breaking changes for older versions? No
    * Does this need new documentation? No
    
    Author: Kenchu123 <k8...@gmail.com>
    
    Signed-off-by: Kai-Hsun Chen <ka...@apache.org>
    
    Closes #645 from Kenchu123/SUBMARINE-900 and squashes the following commits:
    
    77ddabf0 [Kenchu123] SUBMARINE-900. rerun gofmt on mlflow
    203b3908 [Kenchu123] SUBMARINE-900. change tab indent to spaces in controller and fix the format difference
    4d1e391c [Kenchu123] SUBMARINE-900. Add type to mlflow service
    b1136cd1 [Kenchu123] SUBMARINE-900. Create mlflow on applying a submarine
---
 submarine-cloud-v2/go.mod                          |   4 +-
 submarine-cloud-v2/pkg/controller/controller.go    |  10 +
 .../pkg/controller/submarine_mlflow.go             | 342 +++++++++++++++++++++
 3 files changed, 354 insertions(+), 2 deletions(-)

diff --git a/submarine-cloud-v2/go.mod b/submarine-cloud-v2/go.mod
index 96634f0..330a66f 100644
--- a/submarine-cloud-v2/go.mod
+++ b/submarine-cloud-v2/go.mod
@@ -3,10 +3,10 @@ module github.com/apache/submarine/submarine-cloud-v2
 go 1.16
 
 require (
-	github.com/fatih/color v1.7.0
+	github.com/fatih/color v1.7.0 // indirect
 	github.com/gofrs/flock v0.8.0
 	github.com/pkg/errors v0.9.1
-	github.com/stretchr/testify v1.7.0
+	github.com/stretchr/testify v1.7.0 // indirect
 	github.com/traefik/traefik/v2 v2.4.8
 	golang.org/x/sync v0.0.0-20201207232520-09787c993a3a
 	gopkg.in/yaml.v2 v2.4.0
diff --git a/submarine-cloud-v2/pkg/controller/controller.go b/submarine-cloud-v2/pkg/controller/controller.go
index 8d46bef..bc9eaab 100644
--- a/submarine-cloud-v2/pkg/controller/controller.go
+++ b/submarine-cloud-v2/pkg/controller/controller.go
@@ -66,6 +66,7 @@ const (
 	serverName                  = "submarine-server"
 	databaseName                = "submarine-database"
 	tensorboardName             = "submarine-tensorboard"
+	mlflowName                  = "submarine-mlflow"
 	ingressName                 = serverName + "-ingress"
 	databasePvNamePrefix        = databaseName + "-pv"
 	databasePvcName             = databaseName + "-pvc"
@@ -73,6 +74,10 @@ const (
 	tensorboardPvcName          = tensorboardName + "-pvc"
 	tensorboardServiceName      = tensorboardName + "-service"
 	tensorboardIngressRouteName = tensorboardName + "-ingressroute"
+	mlflowPvNamePrefix          = mlflowName + "-pv"
+	mlflowPvcName               = mlflowName + "-pvc"
+	mlflowServiceName           = mlflowName + "-service"
+	mlflowIngressRouteName      = mlflowName + "-ingressroute"
 )
 
 // PersistentVolumes are not namespaced resources, so we add the namespace as a
@@ -480,6 +485,11 @@ func (c *Controller) syncHandler(workqueueItem WorkQueueItem) error {
 			return err
 		}
 
+		err = c.createSubmarineMlflow(submarine)
+		if err != nil {
+			return err
+		}
+
 		err = c.updateSubmarineStatus(submarine, serverDeployment, databaseDeployment)
 		if err != nil {
 			return err
diff --git a/submarine-cloud-v2/pkg/controller/submarine_mlflow.go b/submarine-cloud-v2/pkg/controller/submarine_mlflow.go
new file mode 100644
index 0000000..3f431e9
--- /dev/null
+++ b/submarine-cloud-v2/pkg/controller/submarine_mlflow.go
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package controller
+
+import (
+	"context"
+	"fmt"
+
+	v1alpha1 "github.com/apache/submarine/submarine-cloud-v2/pkg/apis/submarine/v1alpha1"
+	traefikv1alpha1 "github.com/traefik/traefik/v2/pkg/provider/kubernetes/crd/traefik/v1alpha1"
+	appsv1 "k8s.io/api/apps/v1"
+	corev1 "k8s.io/api/core/v1"
+	"k8s.io/apimachinery/pkg/api/errors"
+	"k8s.io/apimachinery/pkg/api/resource"
+	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+	"k8s.io/apimachinery/pkg/util/intstr"
+	"k8s.io/klog/v2"
+)
+
+func newSubmarineMlflowPersistentVolume(submarine *v1alpha1.Submarine) *corev1.PersistentVolume {
+	var persistentVolumeSource corev1.PersistentVolumeSource
+	switch submarine.Spec.Storage.StorageType {
+	case "nfs":
+		persistentVolumeSource = corev1.PersistentVolumeSource{
+			NFS: &corev1.NFSVolumeSource{
+				Server: submarine.Spec.Storage.NfsIP,
+				Path:   submarine.Spec.Storage.NfsPath,
+			},
+		}
+	case "host":
+		hostPathType := corev1.HostPathDirectoryOrCreate
+		persistentVolumeSource = corev1.PersistentVolumeSource{
+			HostPath: &corev1.HostPathVolumeSource{
+				Path: submarine.Spec.Storage.HostPath,
+				Type: &hostPathType,
+			},
+		}
+	}
+	return &corev1.PersistentVolume{
+		ObjectMeta: metav1.ObjectMeta{
+			Name: pvName(mlflowPvNamePrefix, submarine.Namespace),
+			OwnerReferences: []metav1.OwnerReference{
+				*metav1.NewControllerRef(submarine, v1alpha1.SchemeGroupVersion.WithKind("Submarine")),
+			},
+		},
+		Spec: corev1.PersistentVolumeSpec{
+			AccessModes: []corev1.PersistentVolumeAccessMode{
+				corev1.ReadWriteMany,
+			},
+			Capacity: corev1.ResourceList{
+				corev1.ResourceStorage: resource.MustParse(submarine.Spec.Mlflow.StorageSize),
+			},
+			PersistentVolumeSource: persistentVolumeSource,
+		},
+	}
+}
+
+func newSubmarineMlflowPersistentVolumeClaim(submarine *v1alpha1.Submarine) *corev1.PersistentVolumeClaim {
+	storageClassName := ""
+	return &corev1.PersistentVolumeClaim{
+		ObjectMeta: metav1.ObjectMeta{
+			Name: mlflowPvcName,
+			OwnerReferences: []metav1.OwnerReference{
+				*metav1.NewControllerRef(submarine, v1alpha1.SchemeGroupVersion.WithKind("Submarine")),
+			},
+		},
+		Spec: corev1.PersistentVolumeClaimSpec{
+			AccessModes: []corev1.PersistentVolumeAccessMode{
+				corev1.ReadWriteMany,
+			},
+			Resources: corev1.ResourceRequirements{
+				Requests: corev1.ResourceList{
+					corev1.ResourceStorage: resource.MustParse(submarine.Spec.Mlflow.StorageSize),
+				},
+			},
+			VolumeName:       pvName(mlflowPvNamePrefix, submarine.Namespace),
+			StorageClassName: &storageClassName,
+		},
+	}
+}
+
+func newSubmarineMlflowDeployment(submarine *v1alpha1.Submarine) *appsv1.Deployment {
+	return &appsv1.Deployment{
+		ObjectMeta: metav1.ObjectMeta{
+			Name: mlflowName,
+			OwnerReferences: []metav1.OwnerReference{
+				*metav1.NewControllerRef(submarine, v1alpha1.SchemeGroupVersion.WithKind("Submarine")),
+			},
+		},
+		Spec: appsv1.DeploymentSpec{
+			Selector: &metav1.LabelSelector{
+				MatchLabels: map[string]string{
+					"app": mlflowName + "-pod",
+				},
+			},
+			Template: corev1.PodTemplateSpec{
+				ObjectMeta: metav1.ObjectMeta{
+					Labels: map[string]string{
+						"app": mlflowName + "-pod",
+					},
+				},
+				Spec: corev1.PodSpec{
+					Containers: []corev1.Container{
+						{
+							Name:            mlflowName + "-container",
+							Image:           "apache/submarine:mlflow-0.6.0-SNAPSHOT",
+							ImagePullPolicy: "IfNotPresent",
+							Ports: []corev1.ContainerPort{
+								{
+									ContainerPort: 5000,
+								},
+							},
+							VolumeMounts: []corev1.VolumeMount{
+								{
+									MountPath: "/logs",
+									Name:      "volume",
+									SubPath:   mlflowName,
+								},
+							},
+							ReadinessProbe: &corev1.Probe{
+								Handler: corev1.Handler{
+									TCPSocket: &corev1.TCPSocketAction{
+										Port: intstr.FromInt(5000),
+									},
+								},
+								InitialDelaySeconds: 60,
+								PeriodSeconds:       10,
+							},
+						},
+					},
+					Volumes: []corev1.Volume{
+						{
+							Name: "volume",
+							VolumeSource: corev1.VolumeSource{
+								PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
+									ClaimName: mlflowPvcName,
+								},
+							},
+						},
+					},
+				},
+			},
+		},
+	}
+}
+
+func newSubmarineMlflowService(submarine *v1alpha1.Submarine) *corev1.Service {
+	return &corev1.Service{
+		ObjectMeta: metav1.ObjectMeta{
+			Name: mlflowServiceName,
+			OwnerReferences: []metav1.OwnerReference{
+				*metav1.NewControllerRef(submarine, v1alpha1.SchemeGroupVersion.WithKind("Submarine")),
+			},
+		},
+		Spec: corev1.ServiceSpec{
+			Type: corev1.ServiceTypeClusterIP,
+			Selector: map[string]string{
+				"app": mlflowName + "-pod",
+			},
+			Ports: []corev1.ServicePort{
+				{
+					Protocol:   "TCP",
+					Port:       5000,
+					TargetPort: intstr.FromInt(5000),
+				},
+			},
+		},
+	}
+}
+
+func newSubmarineMlflowIngressRoute(submarine *v1alpha1.Submarine) *traefikv1alpha1.IngressRoute {
+	return &traefikv1alpha1.IngressRoute{
+		ObjectMeta: metav1.ObjectMeta{
+			Name: mlflowName + "-ingressroute",
+			OwnerReferences: []metav1.OwnerReference{
+				*metav1.NewControllerRef(submarine, v1alpha1.SchemeGroupVersion.WithKind("Submarine")),
+			},
+		},
+		Spec: traefikv1alpha1.IngressRouteSpec{
+			EntryPoints: []string{
+				"web",
+			},
+			Routes: []traefikv1alpha1.Route{
+				{
+					Kind:  "Rule",
+					Match: "PathPrefix(`/mlflow`)",
+					Services: []traefikv1alpha1.Service{
+						{
+							LoadBalancerSpec: traefikv1alpha1.LoadBalancerSpec{
+								Kind: "Service",
+								Name: mlflowServiceName,
+								Port: 5000,
+							},
+						},
+					},
+				},
+			},
+		},
+	}
+}
+
+// createSubmarineMlflow is a function to create submarine-mlflow.
+// Reference: https://github.com/apache/submarine/blob/master/helm-charts/submarine/templates/submarine-mlflow.yaml
+func (c *Controller) createSubmarineMlflow(submarine *v1alpha1.Submarine) error {
+	klog.Info("[createSubmarineMlflow]")
+
+	// Step 1: Create PersistentVolume
+	// PersistentVolumes are not namespaced resources, so we add the namespace
+	// as a suffix to distinguish them
+	pv, err := c.persistentvolumeLister.Get(pvName(mlflowPvNamePrefix, submarine.Namespace))
+
+	// If the resource doesn't exist, we'll create it
+	if errors.IsNotFound(err) {
+		pv, err = c.kubeclientset.CoreV1().PersistentVolumes().Create(context.TODO(), newSubmarineMlflowPersistentVolume(submarine), metav1.CreateOptions{})
+		if err != nil {
+			klog.Info(err)
+		}
+		klog.Info(" Create PersistentVolume: ", pv.Name)
+	}
+	// If an error occurs during Get/Create, we'll requeue the item so we can
+	// attempt processing again later. This could have been caused by a
+	// temporary network failure, or any other transient reason.
+	if err != nil {
+		return err
+	}
+
+	if !metav1.IsControlledBy(pv, submarine) {
+		msg := fmt.Sprintf(MessageResourceExists, pv.Name)
+		c.recorder.Event(submarine, corev1.EventTypeWarning, ErrResourceExists, msg)
+		return fmt.Errorf(msg)
+	}
+
+	// Step 2: Create PersistentVolumeClaim
+	pvc, err := c.persistentvolumeclaimLister.PersistentVolumeClaims(submarine.Namespace).Get(mlflowPvcName)
+	// If the resource doesn't exist, we'll create it
+	if errors.IsNotFound(err) {
+		pvc, err = c.kubeclientset.CoreV1().PersistentVolumeClaims(submarine.Namespace).Create(context.TODO(),
+			newSubmarineMlflowPersistentVolumeClaim(submarine),
+			metav1.CreateOptions{})
+		if err != nil {
+			klog.Info(err)
+		}
+		klog.Info("	Create PersistentVolumeClaim: ", pvc.Name)
+	}
+	// If an error occurs during Get/Create, we'll requeue the item so we can
+	// attempt processing again later. This could have been caused by a
+	// temporary network failure, or any other transient reason.
+	if err != nil {
+		return err
+	}
+
+	if !metav1.IsControlledBy(pvc, submarine) {
+		msg := fmt.Sprintf(MessageResourceExists, pvc.Name)
+		c.recorder.Event(submarine, corev1.EventTypeWarning, ErrResourceExists, msg)
+		return fmt.Errorf(msg)
+	}
+
+	// Step 3: Create Deployment
+	deployment, err := c.deploymentLister.Deployments(submarine.Namespace).Get(mlflowName)
+	if errors.IsNotFound(err) {
+		deployment, err = c.kubeclientset.AppsV1().Deployments(submarine.Namespace).Create(context.TODO(), newSubmarineMlflowDeployment(submarine), metav1.CreateOptions{})
+		if err != nil {
+			klog.Info(err)
+		}
+		klog.Info("	Create Deployment: ", deployment.Name)
+	}
+	// If an error occurs during Get/Create, we'll requeue the item so we can
+	// attempt processing again later. This could have been caused by a
+	// temporary network failure, or any other transient reason.
+	if err != nil {
+		return err
+	}
+
+	if !metav1.IsControlledBy(deployment, submarine) {
+		msg := fmt.Sprintf(MessageResourceExists, deployment.Name)
+		c.recorder.Event(submarine, corev1.EventTypeWarning, ErrResourceExists, msg)
+		return fmt.Errorf(msg)
+	}
+
+	// Step 4: Create Service
+	service, err := c.serviceLister.Services(submarine.Namespace).Get(mlflowServiceName)
+	// If the resource doesn't exist, we'll create it
+	if errors.IsNotFound(err) {
+		service, err = c.kubeclientset.CoreV1().Services(submarine.Namespace).Create(context.TODO(), newSubmarineMlflowService(submarine), metav1.CreateOptions{})
+		if err != nil {
+			klog.Info(err)
+		}
+		klog.Info(" Create Service: ", service.Name)
+	}
+	// If an error occurs during Get/Create, we'll requeue the item so we can
+	// attempt processing again later. This could have been caused by a
+	// temporary network failure, or any other transient reason.
+	if err != nil {
+		return err
+	}
+
+	if !metav1.IsControlledBy(service, submarine) {
+		msg := fmt.Sprintf(MessageResourceExists, service.Name)
+		c.recorder.Event(submarine, corev1.EventTypeWarning, ErrResourceExists, msg)
+		return fmt.Errorf(msg)
+	}
+
+	// Step 5: Create IngressRoute
+	ingressroute, err := c.ingressrouteLister.IngressRoutes(submarine.Namespace).Get(mlflowIngressRouteName)
+	// If the resource doesn't exist, we'll create it
+	if errors.IsNotFound(err) {
+		ingressroute, err = c.traefikclientset.TraefikV1alpha1().IngressRoutes(submarine.Namespace).Create(context.TODO(), newSubmarineMlflowIngressRoute(submarine), metav1.CreateOptions{})
+		if err != nil {
+			klog.Info(err)
+		}
+		klog.Info(" Create IngressRoute: ", ingressroute.Name)
+	}
+	// If an error occurs during Get/Create, we'll requeue the item so we can
+	// attempt processing again later. This could have been caused by a
+	// temporary network failure, or any other transient reason.
+	if err != nil {
+		return err
+	}
+
+	if !metav1.IsControlledBy(ingressroute, submarine) {
+		msg := fmt.Sprintf(MessageResourceExists, ingressroute.Name)
+		c.recorder.Event(submarine, corev1.EventTypeWarning, ErrResourceExists, msg)
+		return fmt.Errorf(msg)
+	}
+
+	return nil
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@submarine.apache.org
For additional commands, e-mail: dev-help@submarine.apache.org