You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by mi...@apache.org on 2023/05/18 04:22:17 UTC

[shardingsphere-on-cloud] branch main updated: refactor(storage-node): refatcor finalize logic

This is an automated email from the ASF dual-hosted git repository.

miaoliyao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/shardingsphere-on-cloud.git


The following commit(s) were added to refs/heads/main by this push:
     new 9ddafa2  refactor(storage-node): refatcor finalize logic
     new fd24a0d  Merge pull request #370 from Xu-Wentao/storage-node
9ddafa2 is described below

commit 9ddafa211f7e5812f90e4cbd8df4d6f4187c90ae
Author: xuwentao <cu...@yahoo.com>
AuthorDate: Wed May 17 19:28:52 2023 +0800

    refactor(storage-node): refatcor finalize logic
---
 ...ngsphereproxy-storagenode-aws-rds-instance.yaml |   8 +
 .../controllers/storage_ndoe_controller_test.go    |  11 +-
 .../pkg/controllers/storage_node_controller.go     | 169 +++++++++------------
 3 files changed, 83 insertions(+), 105 deletions(-)

diff --git a/examples/operator/shardingsphereproxy-storagenode-aws-rds-instance.yaml b/examples/operator/shardingsphereproxy-storagenode-aws-rds-instance.yaml
index ab30502..97b3b41 100644
--- a/examples/operator/shardingsphereproxy-storagenode-aws-rds-instance.yaml
+++ b/examples/operator/shardingsphereproxy-storagenode-aws-rds-instance.yaml
@@ -38,5 +38,13 @@ metadata:
   name: storage-node-example-1
   annotations:
     databaseclass.database-mesh.io/instance-identifier: "storage-node-example-1"
+    "databaseclass.database-mesh.io/instance-db-name": "test_db"
+
+    # annos about register storage unit
+    shardingsphere.apache.org/register-storage-unit-enabled: "false" # set ture if you want to test auto register storage unit.
+    shardingsphere.apache.org/logic-database-name: "sharding_db"
+    "shardingsphere.apache.org/compute-node-namespace": "default"
+    "shardingsphere.apache.org/compute-node-name": "shardingsphere-operator-shardingsphere-proxy"
+
 spec:
   databaseClassName: "aws-rds-instance-mysql-5.7"
diff --git a/shardingsphere-operator/pkg/controllers/storage_ndoe_controller_test.go b/shardingsphere-operator/pkg/controllers/storage_ndoe_controller_test.go
index e97255e..393c00c 100644
--- a/shardingsphere-operator/pkg/controllers/storage_ndoe_controller_test.go
+++ b/shardingsphere-operator/pkg/controllers/storage_ndoe_controller_test.go
@@ -254,14 +254,6 @@ var _ = Describe("StorageNode Controller Mock Test", func() {
 					Port:    3306,
 				},
 			}
-			instanceInDeleting = dbmesh_rds.DescInstance{
-				DBInstanceIdentifier: defaultTestInstanceIdentifier,
-				DBInstanceStatus:     dbmesh_rds.DBInstanceStatusDeleting,
-				Endpoint: dbmesh_rds.Endpoint{
-					Address: "127.0.0.1",
-					Port:    3306,
-				},
-			}
 		)
 		It("should be successful when instance is in available status", func() {
 			deletingStorageNode := "test-deleting-storage-node"
@@ -295,10 +287,11 @@ var _ = Describe("StorageNode Controller Mock Test", func() {
 					Namespace: defaultTestNamespace,
 				},
 			})).Should(Succeed())
+
 			// mock aws rds client, delete instance
 			mockAws.EXPECT().GetInstance(gomock.Any(), gomock.Any()).Return(&rdsInstanceAvailable, nil)
 			mockAws.EXPECT().DeleteInstance(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil)
-			mockAws.EXPECT().GetInstance(gomock.Any(), gomock.Any()).Return(&instanceInDeleting, nil)
+
 			_, err = reconciler.Reconcile(ctx, req)
 			Expect(err).To(BeNil())
 
diff --git a/shardingsphere-operator/pkg/controllers/storage_node_controller.go b/shardingsphere-operator/pkg/controllers/storage_node_controller.go
index eeb0c89..1628e37 100644
--- a/shardingsphere-operator/pkg/controllers/storage_node_controller.go
+++ b/shardingsphere-operator/pkg/controllers/storage_node_controller.go
@@ -19,7 +19,6 @@ package controllers
 
 import (
 	"context"
-	"encoding/json"
 	"fmt"
 	"reflect"
 	"strings"
@@ -68,19 +67,12 @@ type StorageNodeReconciler struct {
 // Reconcile handles main function of this controller
 // nolint:gocognit
 func (r *StorageNodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
-	r.Log.WithValues(StorageNodeControllerName, req.NamespacedName)
-
-	r.Log.Info("Reconciling StorageNode")
+	r.Log.Info(fmt.Sprintf("Reconciling StorageNode %s", req.NamespacedName))
 
 	// get storage node
 	node := &v1alpha1.StorageNode{}
 	if err := r.Get(ctx, req.NamespacedName, node); err != nil {
-		if client.IgnoreNotFound(err) == nil {
-			r.Log.Info(fmt.Sprintf("StorageNode %s/%s is not exist", req.Namespace, req.Name))
-			return ctrl.Result{}, nil
-		}
-		r.Log.Error(err, fmt.Sprintf("unable to fetch StorageNode %s/%s", req.Namespace, req.Name))
-		return ctrl.Result{Requeue: true}, err
+		return ctrl.Result{}, client.IgnoreNotFound(err)
 	}
 
 	// Get databaseClass with storageNode.Spec.DatabaseClassName
@@ -91,7 +83,6 @@ func (r *StorageNodeReconciler) Reconcile(ctx context.Context, req ctrl.Request)
 	}
 
 	// finalize storage node
-	// nolint: nestif
 	if node.ObjectMeta.DeletionTimestamp.IsZero() {
 		// The object is not being deleted, so if it does not have our finalizer,
 		// then lets add the finalizer and update the object. This is equivalent to registering our finalizer.
@@ -102,72 +93,73 @@ func (r *StorageNodeReconciler) Reconcile(ctx context.Context, req ctrl.Request)
 			}
 		}
 	} else if slices.Contains(node.ObjectMeta.Finalizers, FinalizerName) {
-		switch node.Status.Phase {
-		case v1alpha1.StorageNodePhaseReady, v1alpha1.StorageNodePhaseNotReady:
-			if err := r.deleteDatabaseCluster(ctx, node, databaseClass); err != nil {
-				return ctrl.Result{RequeueAfter: defaultRequeueTime}, err
-			}
-		case v1alpha1.StorageNodePhaseDeleting:
-			ins, err := aws.NewRdsClient(r.AwsRDS).GetInstance(ctx, node)
-			if err != nil {
-				return ctrl.Result{RequeueAfter: defaultRequeueTime}, err
-			}
-			if ins == nil {
-				// update storage node status to v1alpha1.StorageNodePhaseDeleteComplete
-				node.Status.Phase = v1alpha1.StorageNodePhaseDeleteComplete
-				node.Status.Instances = nil
-				if err := r.Status().Update(ctx, node); err != nil {
-					r.Log.Error(err, "failed to update storage node status")
-				}
-				return ctrl.Result{RequeueAfter: defaultRequeueTime}, err
-			}
-			r.Log.V(2).Info("RDS instance is still deleting")
-			return ctrl.Result{RequeueAfter: defaultRequeueTime}, err
-		case v1alpha1.StorageNodePhaseDeleteComplete:
-			// remove our finalizer from the list and update it.
-			node.ObjectMeta.Finalizers = slices.Filter([]string{}, node.ObjectMeta.Finalizers, func(f string) bool {
-				return f != FinalizerName
-			})
-			if err := r.Update(ctx, node); err != nil {
-				r.Log.Error(err, "failed to remove finalizer")
-			}
-			return ctrl.Result{}, nil
-		default:
-			r.Recorder.Event(node, corev1.EventTypeWarning, fmt.Sprintf("Delete %s/%s Failed", node.GetNamespace(), node.GetName()), "StorageNode is not in a valid phase")
-			return ctrl.Result{RequeueAfter: defaultRequeueTime}, err
-		}
+		return r.finalize(ctx, node, databaseClass)
 	}
 
+	// reconcile storage node
 	return r.reconcile(ctx, databaseClass, node)
 }
 
+func (r *StorageNodeReconciler) finalize(ctx context.Context, node *v1alpha1.StorageNode, databaseClass *dbmeshv1alpha1.DatabaseClass) (ctrl.Result, error) {
+	var err error
+	switch node.Status.Phase {
+	case v1alpha1.StorageNodePhaseReady, v1alpha1.StorageNodePhaseNotReady:
+		// set storage node status to deleting
+		node.Status.Phase = v1alpha1.StorageNodePhaseDeleting
+	case v1alpha1.StorageNodePhaseDeleting:
+		break
+	case v1alpha1.StorageNodePhaseDeleteComplete:
+		node.ObjectMeta.Finalizers = slices.Filter([]string{}, node.ObjectMeta.Finalizers, func(f string) bool {
+			return f != FinalizerName
+		})
+		if err = r.Update(ctx, node); err != nil {
+			r.Log.Error(err, "failed to remove finalizer")
+		}
+		return ctrl.Result{}, nil
+	}
+
+	if err = r.deleteDatabaseCluster(ctx, node, databaseClass); err != nil {
+		r.Log.Error(err, "failed to delete database cluster")
+		return ctrl.Result{RequeueAfter: defaultRequeueTime}, err
+	}
+
+	desiredState := computeDesiredState(node.Status)
+
+	if !reflect.DeepEqual(node.Status, desiredState) {
+		node.Status = desiredState
+		err := r.Status().Update(ctx, node)
+		if err != nil {
+			r.Log.Error(err, fmt.Sprintf("unable to update StorageNode %s/%s", node.GetNamespace(), node.GetName()))
+			return ctrl.Result{Requeue: true}, err
+		}
+	}
+
+	return ctrl.Result{RequeueAfter: defaultRequeueTime}, nil
+}
+
 func (r *StorageNodeReconciler) reconcile(ctx context.Context, dbClass *dbmeshv1alpha1.DatabaseClass, node *v1alpha1.StorageNode) (ctrl.Result, error) {
 	// reconcile storage node with databaseClass
 	switch dbClass.Spec.Provisioner {
 	case dbmeshv1alpha1.ProvisionerAWSRDSInstance:
 		if err := r.reconcileAwsRdsInstance(ctx, aws.NewRdsClient(r.AwsRDS), node, dbClass); err != nil {
 			r.Log.Error(err, fmt.Sprintf("unable to reconcile AWS RDS Instance %s/%s, err:%s", node.GetNamespace(), node.GetName(), err.Error()))
-			r.Recorder.Event(node, corev1.EventTypeWarning, fmt.Sprintf("Reconcile %s/%s Failed", node.GetNamespace(), node.GetName()), err.Error())
+			r.Recorder.Eventf(node, corev1.EventTypeWarning, "Reconcile Failed", fmt.Sprintf("unable to reconcile AWS RDS Instance %s/%s, err:%s", node.GetNamespace(), node.GetName(), err.Error()))
 		}
 	case dbmeshv1alpha1.ProvisionerAWSAurora:
 		if err := r.reconcileAwsAurora(ctx, aws.NewRdsClient(r.AwsRDS), node, dbClass); err != nil {
-			r.Recorder.Event(node, corev1.EventTypeWarning, fmt.Sprintf("Reconcile %s/%s Failed", node.GetNamespace(), node.GetName()), err.Error())
+			r.Recorder.Eventf(node, corev1.EventTypeWarning, "Reconcile Failed", fmt.Sprintf("unable to reconcile AWS Aurora %s/%s, err:%s", node.GetNamespace(), node.GetName(), err.Error()))
 		}
 	default:
 		r.Recorder.Event(node, corev1.EventTypeWarning, "UnsupportedDatabaseProvisioner", fmt.Sprintf("unsupported database provisioner %s", dbClass.Spec.Provisioner))
 		r.Log.Error(nil, fmt.Sprintf("unsupported database provisioner %s", dbClass.Spec.Provisioner))
 	}
 
-	d, _ := json.MarshalIndent(node.Status, "", "  ")
-	r.Log.Info(string(d))
-
 	// register storage unit if needed.
 	if err := r.registerStorageUnit(ctx, node); err != nil {
 		r.Recorder.Eventf(node, corev1.EventTypeWarning, "RegisterStorageUnitFailed", "unable to register storage unit %s/%s", node.GetNamespace(), node.GetName())
 		return ctrl.Result{Requeue: true}, err
 	}
 
-	// finally, update status
 	desiredState := computeDesiredState(node.Status)
 
 	if !reflect.DeepEqual(node.Status, desiredState) {
@@ -179,9 +171,6 @@ func (r *StorageNodeReconciler) reconcile(ctx context.Context, dbClass *dbmeshv1
 		}
 	}
 
-	d, _ = json.MarshalIndent(node.Status, "", "  ")
-	r.Log.Info(string(d))
-
 	return ctrl.Result{RequeueAfter: defaultRequeueTime}, nil
 }
 
@@ -215,26 +204,22 @@ func (r *StorageNodeReconciler) getDatabaseClass(ctx context.Context, node *v1al
 func computeDesiredState(status v1alpha1.StorageNodeStatus) v1alpha1.StorageNodeStatus {
 	// Initialize a new status object based on the current state
 	desiredState := status
-
 	clusterStatus := status.Cluster.Status
 
-	if (clusterStatus == "" || clusterStatus == "Ready") && allInstancesReady(status.Instances) {
-		desiredState.Phase = v1alpha1.StorageNodePhaseReady
+	if status.Phase == v1alpha1.StorageNodePhaseDeleting {
+		// If the storage node is being deleted, check if all instances are deleted.
+		if len(status.Instances) == 0 {
+			desiredState.Phase = v1alpha1.StorageNodePhaseDeleteComplete
+		}
 	} else {
-		desiredState.Phase = v1alpha1.StorageNodePhaseNotReady
-	}
-
-	for idx := range status.Instances {
-		ins := &status.Instances[idx]
-		if ins.Status == string(rds.DBInstanceStatusDeleting) {
-			desiredState.Phase = v1alpha1.StorageNodePhaseDeleting
+		// If the storage node is not being deleted, check if all instances are ready.
+		if (clusterStatus == "" || clusterStatus == "Ready") && allInstancesReady(status.Instances) {
+			desiredState.Phase = v1alpha1.StorageNodePhaseReady
+		} else {
+			desiredState.Phase = v1alpha1.StorageNodePhaseNotReady
 		}
 	}
 
-	if (status.Phase == v1alpha1.StorageNodePhaseDeleting || status.Phase == v1alpha1.StorageNodePhaseDeleteComplete) && len(status.Instances) == 0 {
-		desiredState.Phase = v1alpha1.StorageNodePhaseDeleteComplete
-	}
-
 	desiredState.Conditions = computeNewConditions(desiredState, status, clusterStatus)
 
 	return desiredState
@@ -318,10 +303,6 @@ func allInstancesReady(instances []v1alpha1.InstanceStatus) bool {
 }
 
 func (r *StorageNodeReconciler) reconcileAwsRdsInstance(ctx context.Context, client aws.IRdsClient, node *v1alpha1.StorageNode, dbClass *dbmeshv1alpha1.DatabaseClass) error {
-	if node.Status.Phase == v1alpha1.StorageNodePhaseDeleteComplete {
-		return nil
-	}
-
 	instance, err := client.GetInstance(ctx, node)
 	if err != nil {
 		return err
@@ -349,6 +330,11 @@ func (r *StorageNodeReconciler) reconcileAwsRdsInstance(ctx context.Context, cli
 func updateAWSRDSInstanceStatus(node *v1alpha1.StorageNode, instance *rds.DescInstance) error {
 	instances := make([]v1alpha1.InstanceStatus, 0)
 
+	if instance == nil {
+		node.Status.Instances = instances
+		return nil
+	}
+
 	status := instance.DBInstanceStatus
 	if status == rds.DBInstanceStatusAvailable {
 		status = rds.DBInstanceStatusReady
@@ -449,23 +435,14 @@ func (r *StorageNodeReconciler) deleteAWSRDSInstance(ctx context.Context, client
 		return err
 	}
 
-	if instance == nil {
-		r.Log.Info(fmt.Sprintf("instance %s is not found", node.Annotations[dbmeshv1alpha1.AnnotationsInstanceIdentifier]))
-		return nil
-	}
-
-	if instance.DBInstanceStatus == rds.DBInstanceStatusDeleting {
-		r.Log.Info(fmt.Sprintf("instance %s is deleting", node.Annotations[dbmeshv1alpha1.AnnotationsInstanceIdentifier]))
-		return nil
-	}
-
-	if err := client.DeleteInstance(ctx, node, databaseClass); err != nil {
-		r.Recorder.Eventf(node, corev1.EventTypeWarning, "DeleteFailed", "Failed to delete instance %s: %s", node.Annotations[dbmeshv1alpha1.AnnotationsInstanceIdentifier], err.Error())
-		return err
+	if instance != nil && instance.DBInstanceStatus != rds.DBInstanceStatusDeleting {
+		if err := client.DeleteInstance(ctx, node, databaseClass); err != nil {
+			r.Recorder.Eventf(node, corev1.EventTypeWarning, "DeleteFailed", "Failed to delete instance %s: %s", node.Annotations[dbmeshv1alpha1.AnnotationsInstanceIdentifier], err.Error())
+			return err
+		}
+		r.Recorder.Event(node, corev1.EventTypeNormal, "Deleting", fmt.Sprintf("instance %s is deleting", node.Annotations[dbmeshv1alpha1.AnnotationsInstanceIdentifier]))
 	}
 
-	r.Recorder.Event(node, corev1.EventTypeNormal, "Deleting", fmt.Sprintf("instance %s is deleting", node.Annotations[dbmeshv1alpha1.AnnotationsInstanceIdentifier]))
-
 	// update instance status
 	if err := updateAWSRDSInstanceStatus(node, instance); err != nil {
 		return fmt.Errorf("updateAWSRDSInstanceStatus failed: %w", err)
@@ -478,10 +455,13 @@ func (r *StorageNodeReconciler) deleteAWSRDSInstance(ctx context.Context, client
 func (r *StorageNodeReconciler) registerStorageUnit(ctx context.Context, node *v1alpha1.StorageNode) error {
 	// if register storage unit is not enabled, return
 	if node.Annotations[AnnotationKeyRegisterStorageUnitEnabled] != "true" {
-		r.Log.Info(fmt.Sprintf("register storage unit is not enabled for node %s/%s", node.GetNamespace(), node.GetName()))
 		return nil
 	}
 
+	if err := r.validateComputeNodeAnnotations(node); err != nil {
+		return err
+	}
+
 	// if storage unit is already registered, return
 	if node.Status.Registered {
 		return nil
@@ -489,14 +469,10 @@ func (r *StorageNodeReconciler) registerStorageUnit(ctx context.Context, node *v
 
 	// if node is not ready, return
 	if node.Status.Phase != v1alpha1.StorageNodePhaseReady {
-		r.Recorder.Eventf(node, corev1.EventTypeWarning, "RegisterCanceled", "Canceled to register storage unit for node %s/%s: node is not ready", node.GetNamespace(), node.GetName())
+		r.Recorder.Eventf(node, corev1.EventTypeWarning, "RegisterWaiting", "Waiting to register storage unit for node %s/%s: node is not ready", node.GetNamespace(), node.GetName())
 		return nil
 	}
 
-	if err := validateComputeNodeAnnotations(node); err != nil {
-		return err
-	}
-
 	logicDBName := node.Annotations[AnnotationKeyLogicDatabaseName]
 	dbName := node.Annotations[dbmeshv1alpha1.AnnotationsInstanceDBName]
 
@@ -517,7 +493,7 @@ func (r *StorageNodeReconciler) registerStorageUnit(ctx context.Context, node *v
 	ins := node.Status.Instances[0]
 	host := ins.Endpoint.Address
 	port := ins.Endpoint.Port
-	username := strings.Split(node.Annotations[dbmeshv1alpha1.AnnotationsMasterUsername], "@")[0]
+	username := node.Annotations[dbmeshv1alpha1.AnnotationsMasterUsername]
 	password := node.Annotations[dbmeshv1alpha1.AnnotationsMasterUserPassword]
 
 	// TODO how to set ds name?
@@ -530,7 +506,7 @@ func (r *StorageNodeReconciler) registerStorageUnit(ctx context.Context, node *v
 	return nil
 }
 
-func validateComputeNodeAnnotations(node *v1alpha1.StorageNode) error {
+func (r *StorageNodeReconciler) validateComputeNodeAnnotations(node *v1alpha1.StorageNode) error {
 	requiredAnnos := []string{
 		AnnotationKeyLogicDatabaseName,
 		dbmeshv1alpha1.AnnotationsInstanceDBName,
@@ -540,6 +516,7 @@ func validateComputeNodeAnnotations(node *v1alpha1.StorageNode) error {
 
 	for _, anno := range requiredAnnos {
 		if v, ok := node.Annotations[anno]; !ok || v == "" {
+			r.Recorder.Eventf(node, corev1.EventTypeWarning, "RegisterChecking", "Waiting to register storage unit for node %s/%s: annotation %s is required", node.GetNamespace(), node.GetName(), anno)
 			return fmt.Errorf("annotation %s is required", anno)
 		}
 	}
@@ -574,7 +551,7 @@ func (r *StorageNodeReconciler) getShardingsphereServer(ctx context.Context, nod
 		return nil, fmt.Errorf("no user in compute node %s/%s", cn.Namespace, cn.Name)
 	}
 
-	username = serverConf.Authority.Users[0].User
+	username = strings.Split(serverConf.Authority.Users[0].User, "@")[0]
 	password = serverConf.Authority.Users[0].Password
 
 	// get service of compute node