You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@submarine.apache.org by GitBox <gi...@apache.org> on 2021/08/25 10:55:56 UTC

[GitHub] [submarine] MortalHappiness commented on a change in pull request #719: SUBMARINE-950. Create state machine for submarine custom resource

MortalHappiness commented on a change in pull request #719:
URL: https://github.com/apache/submarine/pull/719#discussion_r695507150



##########
File path: submarine-cloud-v2/pkg/controller/controller.go
##########
@@ -380,102 +383,129 @@ func (c *Controller) processNextWorkItem() bool {
 // syncHandler compares the actual state with the desired, and attempts to
 // converge the two. It then updates the Status block of the Submarine resource
 // with the current status of the resource.
+// State Machine for Submarine
+//+-----------------------------------------------------------------+
+//|      +---------+         +----------+          +----------+     |
+//|      |         |         |          |          |          |     |
+//|      |   New   +---------> Creating +----------> Running  |     |
+//|      |         |         |          |          |          |     |
+//|      +----+----+         +-----+----+          +-----+----+     |
+//|           |                    |                     |          |
+//|           |                    |                     |          |
+//|           |                    |                     |          |
+//|           |                    |               +-----v----+     |
+//|           |                    |               |          |     |
+//|           +--------------------+--------------->  Failed  |     |
+//|                                                |          |     |
+//|                                                +----------+     |
+//+-----------------------------------------------------------------+
 func (c *Controller) syncHandler(key string) error {
 	// Convert the namespace/name string into a distinct namespace and name
 	namespace, name, err := cache.SplitMetaNamespaceKey(key)
 	if err != nil {
-		utilruntime.HandleError(fmt.Errorf("Invalid resource key: %s", key))
+		utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))

Review comment:
       Why this "Invalid" needs to be uncapitalized?

##########
File path: submarine-cloud-v2/pkg/controller/controller.go
##########
@@ -531,3 +561,149 @@ func (c *Controller) handleObject(obj interface{}) {
 		return
 	}
 }
+
+func (c *Controller) getSubmarine(namespace, name string) (*v1alpha1.Submarine, error) {
+	submarine, err := c.submarinesLister.Submarines(namespace).Get(name)
+	if err != nil {
+		// The Submarine resource may no longer exist, in which case we stop
+		// processing
+		if errors.IsNotFound(err) {
+			return nil, nil
+		}
+		return nil, err
+	}
+	return submarine, nil
+}
+
+func (c *Controller) getDeployment(namespace, name string) (*appsv1.Deployment, error) {
+	deployment, err := c.deploymentLister.Deployments(namespace).Get(name)
+	if err != nil {
+		if errors.IsNotFound(err) {
+			return nil, nil
+		}
+		return nil, err
+	}
+	return deployment, nil
+}
+
+func (c *Controller) validateSubmarine(submarine *v1alpha1.Submarine) error {
+
+	// Print out the spec of the Submarine resource
+	b, err := json.MarshalIndent(submarine.Spec, "", "  ")
+	fmt.Println(string(b))
+
+	if err != nil {
+		return err
+	}
+
+	// Check storage type
+	storageType := submarine.Spec.Storage.StorageType
+	if storageType != "nfs" && storageType != "host" {
+		utilruntime.HandleError(fmt.Errorf("invalid storageType '%s' found in submarine spec, nothing will be created. Valid storage types are 'nfs' and 'host'", storageType))
+		return nil
+	}
+
+	return nil
+}
+
+func (c *Controller) createSubmarine(submarine *v1alpha1.Submarine) error {
+	var err error
+	err = c.createSubmarineServer(submarine)
+	if err != nil {
+		return err
+	}
+
+	err = c.createSubmarineDatabase(submarine)
+	if err != nil {
+		return err
+	}
+
+	err = c.createIngress(submarine)
+	if err != nil {
+		return err
+	}
+
+	err = c.createSubmarineServerRBAC(submarine)
+	if err != nil {
+		return err
+	}
+
+	err = c.createSubmarineTensorboard(submarine)
+	if err != nil {
+		return err
+	}
+
+	err = c.createSubmarineMlflow(submarine)
+	if err != nil {
+		return err
+	}
+
+	err = c.createSubmarineMinio(submarine)
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
+func (c *Controller) checkSubmarineDependentsReady(submarine *v1alpha1.Submarine) (bool, error) {
+	for _, name := range dependents {
+		podList, err := c.kubeclientset.CoreV1().Pods(submarine.Namespace).List(context.TODO(), metav1.ListOptions{LabelSelector: fmt.Sprintf("app=%s", name)})
+		if err != nil {
+			return false, err
+		}
+		for _, pod := range podList.Items {
+			switch pod.Status.Phase {
+			case corev1.PodPending:
+				return false, nil
+			case corev1.PodFailed, corev1.PodSucceeded:
+				return false, fmt.Errorf("pod completed")
+			case corev1.PodRunning:
+				for _, condition := range pod.Status.Conditions {
+					if condition.Type == corev1.PodReady && condition.Status != corev1.ConditionTrue {
+						return false, nil
+					}
+				}
+			}
+		}
+	}
+
+	return true, nil
+}

Review comment:
       Why do you need to check the running state for all the pods belonging to the deployments instead of simply checking the status of the deployments? Is there any benefit? For example the [ReadyReplicas in DeploymentStatus](https://pkg.go.dev/k8s.io/api/apps/v1#DeploymentStatus).

##########
File path: submarine-cloud-v2/pkg/controller/controller.go
##########
@@ -531,3 +561,149 @@ func (c *Controller) handleObject(obj interface{}) {
 		return
 	}
 }
+
+func (c *Controller) getSubmarine(namespace, name string) (*v1alpha1.Submarine, error) {
+	submarine, err := c.submarinesLister.Submarines(namespace).Get(name)
+	if err != nil {
+		// The Submarine resource may no longer exist, in which case we stop
+		// processing

Review comment:
       These comments can be removed.

##########
File path: submarine-cloud-v2/pkg/controller/controller.go
##########
@@ -380,102 +383,129 @@ func (c *Controller) processNextWorkItem() bool {
 // syncHandler compares the actual state with the desired, and attempts to
 // converge the two. It then updates the Status block of the Submarine resource
 // with the current status of the resource.
+// State Machine for Submarine
+//+-----------------------------------------------------------------+
+//|      +---------+         +----------+          +----------+     |
+//|      |         |         |          |          |          |     |
+//|      |   New   +---------> Creating +----------> Running  |     |
+//|      |         |         |          |          |          |     |
+//|      +----+----+         +-----+----+          +-----+----+     |
+//|           |                    |                     |          |
+//|           |                    |                     |          |
+//|           |                    |                     |          |
+//|           |                    |               +-----v----+     |
+//|           |                    |               |          |     |
+//|           +--------------------+--------------->  Failed  |     |
+//|                                                |          |     |
+//|                                                +----------+     |
+//+-----------------------------------------------------------------+
 func (c *Controller) syncHandler(key string) error {
 	// Convert the namespace/name string into a distinct namespace and name
 	namespace, name, err := cache.SplitMetaNamespaceKey(key)
 	if err != nil {
-		utilruntime.HandleError(fmt.Errorf("Invalid resource key: %s", key))
+		utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
 		return nil
 	}
 	klog.Info("syncHandler: ", key)
 
 	// Get the Submarine resource with this namespace/name
-	submarine, err := c.submarinesLister.Submarines(namespace).Get(name)
+	submarine, err := c.getSubmarine(namespace, name)
 	if err != nil {
+		return err
+	}
+	if submarine == nil {
 		// The Submarine resource may no longer exist, in which case we stop
 		// processing
-		if errors.IsNotFound(err) {
-			utilruntime.HandleError(fmt.Errorf("submarine '%s' in work queue no longer exists", key))
-			return nil
-		}
-		return err
+		utilruntime.HandleError(fmt.Errorf("submarine '%s' in work queue no longer exists", key))
+		return nil
 	}
 
 	// Submarine is in the terminating process
 	if !submarine.DeletionTimestamp.IsZero() {
 		return nil
 	}
 
-	// Print out the spec of the Submarine resource
-	b, err := json.MarshalIndent(submarine.Spec, "", "  ")
-	fmt.Println(string(b))
-
-	storageType := submarine.Spec.Storage.StorageType
-	if storageType != "nfs" && storageType != "host" {
-		utilruntime.HandleError(fmt.Errorf("Invalid storageType '%s' found in submarine spec, nothing will be created. Valid storage types are 'nfs' and 'host'", storageType))
-		return nil
-	}
-
-	var serverDeployment *appsv1.Deployment
-	var databaseDeployment *appsv1.Deployment
+	submarineCopy := submarine.DeepCopy()
 
-	if err != nil {
-		return err
+	// Take action based on submarine state
+	switch submarineCopy.Status.SubmarineState.State {
+	case v1alpha1.NewState:
+		c.recordSubmarineEvent(submarineCopy)
+		if err := c.validateSubmarine(submarineCopy); err != nil {
+			submarineCopy.Status.SubmarineState.State = v1alpha1.FailedState
+			submarineCopy.Status.SubmarineState.ErrorMessage = err.Error()
+			c.recordSubmarineEvent(submarineCopy)
+		} else {
+			submarineCopy.Status.SubmarineState.State = v1alpha1.CreatingState
+			c.recordSubmarineEvent(submarineCopy)
+		}
+	case v1alpha1.CreatingState:
+		if err := c.createSubmarine(submarineCopy); err != nil {
+			submarineCopy.Status.SubmarineState.State = v1alpha1.FailedState
+			submarineCopy.Status.SubmarineState.ErrorMessage = err.Error()
+			c.recordSubmarineEvent(submarineCopy)
+		}
+		ok, err := c.checkSubmarineDependentsReady(submarineCopy)
+		if err != nil {
+			submarineCopy.Status.SubmarineState.State = v1alpha1.FailedState
+			submarineCopy.Status.SubmarineState.ErrorMessage = err.Error()
+			c.recordSubmarineEvent(submarineCopy)
+		}
+		if ok {
+			submarineCopy.Status.SubmarineState.State = v1alpha1.RunningState
+			c.recordSubmarineEvent(submarineCopy)
+		}
+	case v1alpha1.RunningState:
+		if err := c.createSubmarine(submarineCopy); err != nil {
+			submarineCopy.Status.SubmarineState.State = v1alpha1.FailedState
+			submarineCopy.Status.SubmarineState.ErrorMessage = err.Error()
+			c.recordSubmarineEvent(submarineCopy)
+		}
 	}
 
-	serverDeployment, err = c.createSubmarineServer(submarine)
-	if err != nil {
-		return err
+	// update submarine status
+	if submarineCopy != nil {

Review comment:
       `submarineCopy` will never be `nil`, so this `if` statement is redundant.

##########
File path: submarine-cloud-v2/pkg/controller/controller.go
##########
@@ -380,102 +383,129 @@ func (c *Controller) processNextWorkItem() bool {
 // syncHandler compares the actual state with the desired, and attempts to
 // converge the two. It then updates the Status block of the Submarine resource
 // with the current status of the resource.
+// State Machine for Submarine
+//+-----------------------------------------------------------------+
+//|      +---------+         +----------+          +----------+     |
+//|      |         |         |          |          |          |     |
+//|      |   New   +---------> Creating +----------> Running  |     |
+//|      |         |         |          |          |          |     |
+//|      +----+----+         +-----+----+          +-----+----+     |
+//|           |                    |                     |          |
+//|           |                    |                     |          |
+//|           |                    |                     |          |
+//|           |                    |               +-----v----+     |
+//|           |                    |               |          |     |
+//|           +--------------------+--------------->  Failed  |     |
+//|                                                |          |     |
+//|                                                +----------+     |
+//+-----------------------------------------------------------------+
 func (c *Controller) syncHandler(key string) error {
 	// Convert the namespace/name string into a distinct namespace and name
 	namespace, name, err := cache.SplitMetaNamespaceKey(key)
 	if err != nil {
-		utilruntime.HandleError(fmt.Errorf("Invalid resource key: %s", key))
+		utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
 		return nil
 	}
 	klog.Info("syncHandler: ", key)
 
 	// Get the Submarine resource with this namespace/name
-	submarine, err := c.submarinesLister.Submarines(namespace).Get(name)
+	submarine, err := c.getSubmarine(namespace, name)
 	if err != nil {
+		return err
+	}
+	if submarine == nil {
 		// The Submarine resource may no longer exist, in which case we stop
 		// processing
-		if errors.IsNotFound(err) {
-			utilruntime.HandleError(fmt.Errorf("submarine '%s' in work queue no longer exists", key))
-			return nil
-		}
-		return err

Review comment:
       These lines should be preserved. See the comments on lines 568 to 572.

##########
File path: submarine-cloud-v2/pkg/controller/controller.go
##########
@@ -380,102 +383,129 @@ func (c *Controller) processNextWorkItem() bool {
 // syncHandler compares the actual state with the desired, and attempts to
 // converge the two. It then updates the Status block of the Submarine resource
 // with the current status of the resource.
+// State Machine for Submarine
+//+-----------------------------------------------------------------+
+//|      +---------+         +----------+          +----------+     |
+//|      |         |         |          |          |          |     |
+//|      |   New   +---------> Creating +----------> Running  |     |
+//|      |         |         |          |          |          |     |
+//|      +----+----+         +-----+----+          +-----+----+     |
+//|           |                    |                     |          |
+//|           |                    |                     |          |
+//|           |                    |                     |          |
+//|           |                    |               +-----v----+     |
+//|           |                    |               |          |     |
+//|           +--------------------+--------------->  Failed  |     |
+//|                                                |          |     |
+//|                                                +----------+     |
+//+-----------------------------------------------------------------+
 func (c *Controller) syncHandler(key string) error {
 	// Convert the namespace/name string into a distinct namespace and name
 	namespace, name, err := cache.SplitMetaNamespaceKey(key)
 	if err != nil {
-		utilruntime.HandleError(fmt.Errorf("Invalid resource key: %s", key))
+		utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
 		return nil
 	}
 	klog.Info("syncHandler: ", key)
 
 	// Get the Submarine resource with this namespace/name
-	submarine, err := c.submarinesLister.Submarines(namespace).Get(name)
+	submarine, err := c.getSubmarine(namespace, name)
 	if err != nil {
+		return err
+	}
+	if submarine == nil {
 		// The Submarine resource may no longer exist, in which case we stop
 		// processing
-		if errors.IsNotFound(err) {
-			utilruntime.HandleError(fmt.Errorf("submarine '%s' in work queue no longer exists", key))
-			return nil
-		}
-		return err
+		utilruntime.HandleError(fmt.Errorf("submarine '%s' in work queue no longer exists", key))
+		return nil
 	}
 
 	// Submarine is in the terminating process
 	if !submarine.DeletionTimestamp.IsZero() {
 		return nil
 	}
 
-	// Print out the spec of the Submarine resource
-	b, err := json.MarshalIndent(submarine.Spec, "", "  ")
-	fmt.Println(string(b))
-
-	storageType := submarine.Spec.Storage.StorageType
-	if storageType != "nfs" && storageType != "host" {
-		utilruntime.HandleError(fmt.Errorf("Invalid storageType '%s' found in submarine spec, nothing will be created. Valid storage types are 'nfs' and 'host'", storageType))
-		return nil
-	}
-
-	var serverDeployment *appsv1.Deployment
-	var databaseDeployment *appsv1.Deployment
+	submarineCopy := submarine.DeepCopy()
 
-	if err != nil {
-		return err
+	// Take action based on submarine state
+	switch submarineCopy.Status.SubmarineState.State {
+	case v1alpha1.NewState:
+		c.recordSubmarineEvent(submarineCopy)
+		if err := c.validateSubmarine(submarineCopy); err != nil {
+			submarineCopy.Status.SubmarineState.State = v1alpha1.FailedState
+			submarineCopy.Status.SubmarineState.ErrorMessage = err.Error()
+			c.recordSubmarineEvent(submarineCopy)
+		} else {
+			submarineCopy.Status.SubmarineState.State = v1alpha1.CreatingState
+			c.recordSubmarineEvent(submarineCopy)
+		}
+	case v1alpha1.CreatingState:
+		if err := c.createSubmarine(submarineCopy); err != nil {
+			submarineCopy.Status.SubmarineState.State = v1alpha1.FailedState
+			submarineCopy.Status.SubmarineState.ErrorMessage = err.Error()
+			c.recordSubmarineEvent(submarineCopy)
+		}
+		ok, err := c.checkSubmarineDependentsReady(submarineCopy)
+		if err != nil {
+			submarineCopy.Status.SubmarineState.State = v1alpha1.FailedState
+			submarineCopy.Status.SubmarineState.ErrorMessage = err.Error()
+			c.recordSubmarineEvent(submarineCopy)
+		}
+		if ok {
+			submarineCopy.Status.SubmarineState.State = v1alpha1.RunningState
+			c.recordSubmarineEvent(submarineCopy)
+		}
+	case v1alpha1.RunningState:
+		if err := c.createSubmarine(submarineCopy); err != nil {
+			submarineCopy.Status.SubmarineState.State = v1alpha1.FailedState
+			submarineCopy.Status.SubmarineState.ErrorMessage = err.Error()
+			c.recordSubmarineEvent(submarineCopy)
+		}
 	}
 
-	serverDeployment, err = c.createSubmarineServer(submarine)
-	if err != nil {
-		return err
+	// update submarine status
+	if submarineCopy != nil {
+		err = c.updateSubmarineStatus(submarine, submarineCopy)
+		if err != nil {
+			return err
+		}
 	}
 
-	databaseDeployment, err = c.createSubmarineDatabase(submarine)
-	if err != nil {
-		return err
-	}
+	// c.recorder.Event(submarine, corev1.EventTypeNormal, SuccessSynced, MessageResourceSynced)

Review comment:
       Why this line should be commented out?

##########
File path: submarine-cloud-v2/pkg/controller/controller.go
##########
@@ -531,3 +561,149 @@ func (c *Controller) handleObject(obj interface{}) {
 		return
 	}
 }
+
+func (c *Controller) getSubmarine(namespace, name string) (*v1alpha1.Submarine, error) {
+	submarine, err := c.submarinesLister.Submarines(namespace).Get(name)
+	if err != nil {
+		// The Submarine resource may no longer exist, in which case we stop
+		// processing
+		if errors.IsNotFound(err) {
+			return nil, nil
+		}
+		return nil, err
+	}
+	return submarine, nil
+}
+
+func (c *Controller) getDeployment(namespace, name string) (*appsv1.Deployment, error) {
+	deployment, err := c.deploymentLister.Deployments(namespace).Get(name)
+	if err != nil {
+		if errors.IsNotFound(err) {
+			return nil, nil
+		}
+		return nil, err
+	}
+	return deployment, nil
+}
+
+func (c *Controller) validateSubmarine(submarine *v1alpha1.Submarine) error {
+
+	// Print out the spec of the Submarine resource
+	b, err := json.MarshalIndent(submarine.Spec, "", "  ")
+	fmt.Println(string(b))
+
+	if err != nil {
+		return err
+	}
+
+	// Check storage type
+	storageType := submarine.Spec.Storage.StorageType
+	if storageType != "nfs" && storageType != "host" {
+		utilruntime.HandleError(fmt.Errorf("invalid storageType '%s' found in submarine spec, nothing will be created. Valid storage types are 'nfs' and 'host'", storageType))
+		return nil
+	}

Review comment:
       This is a fatal error and the submarine should enter the failed state. Change these lines to the following code.
   
   ```go
   if storageType != "nfs" && storageType != "host" {
   	err = fmt.Errorf("invalid storageType '%s' found in submarine spec, nothing will be created. Valid storage types are 'nfs' and 'host'", storageType)
   	return err
   }
   ```

##########
File path: submarine-cloud-v2/pkg/controller/controller.go
##########
@@ -380,102 +383,129 @@ func (c *Controller) processNextWorkItem() bool {
 // syncHandler compares the actual state with the desired, and attempts to
 // converge the two. It then updates the Status block of the Submarine resource
 // with the current status of the resource.
+// State Machine for Submarine
+//+-----------------------------------------------------------------+
+//|      +---------+         +----------+          +----------+     |
+//|      |         |         |          |          |          |     |
+//|      |   New   +---------> Creating +----------> Running  |     |
+//|      |         |         |          |          |          |     |
+//|      +----+----+         +-----+----+          +-----+----+     |
+//|           |                    |                     |          |
+//|           |                    |                     |          |
+//|           |                    |                     |          |
+//|           |                    |               +-----v----+     |
+//|           |                    |               |          |     |
+//|           +--------------------+--------------->  Failed  |     |
+//|                                                |          |     |
+//|                                                +----------+     |
+//+-----------------------------------------------------------------+
 func (c *Controller) syncHandler(key string) error {
 	// Convert the namespace/name string into a distinct namespace and name
 	namespace, name, err := cache.SplitMetaNamespaceKey(key)
 	if err != nil {
-		utilruntime.HandleError(fmt.Errorf("Invalid resource key: %s", key))
+		utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
 		return nil
 	}
 	klog.Info("syncHandler: ", key)
 
 	// Get the Submarine resource with this namespace/name
-	submarine, err := c.submarinesLister.Submarines(namespace).Get(name)
+	submarine, err := c.getSubmarine(namespace, name)
 	if err != nil {
+		return err
+	}
+	if submarine == nil {
 		// The Submarine resource may no longer exist, in which case we stop
 		// processing
-		if errors.IsNotFound(err) {
-			utilruntime.HandleError(fmt.Errorf("submarine '%s' in work queue no longer exists", key))
-			return nil
-		}
-		return err
+		utilruntime.HandleError(fmt.Errorf("submarine '%s' in work queue no longer exists", key))
+		return nil
 	}
 
 	// Submarine is in the terminating process

Review comment:
       Can you add more comments for this line? For example, "Submarine is in the terminating process, only used when cascading-delete=foreground, otherwise the submarine will be recreated"




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@submarine.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org