You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by mi...@apache.org on 2023/05/18 04:22:17 UTC
[shardingsphere-on-cloud] branch main updated: refactor(storage-node): refatcor finalize logic
This is an automated email from the ASF dual-hosted git repository.
miaoliyao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/shardingsphere-on-cloud.git
The following commit(s) were added to refs/heads/main by this push:
new 9ddafa2 refactor(storage-node): refatcor finalize logic
new fd24a0d Merge pull request #370 from Xu-Wentao/storage-node
9ddafa2 is described below
commit 9ddafa211f7e5812f90e4cbd8df4d6f4187c90ae
Author: xuwentao <cu...@yahoo.com>
AuthorDate: Wed May 17 19:28:52 2023 +0800
refactor(storage-node): refatcor finalize logic
---
...ngsphereproxy-storagenode-aws-rds-instance.yaml | 8 +
.../controllers/storage_ndoe_controller_test.go | 11 +-
.../pkg/controllers/storage_node_controller.go | 169 +++++++++------------
3 files changed, 83 insertions(+), 105 deletions(-)
diff --git a/examples/operator/shardingsphereproxy-storagenode-aws-rds-instance.yaml b/examples/operator/shardingsphereproxy-storagenode-aws-rds-instance.yaml
index ab30502..97b3b41 100644
--- a/examples/operator/shardingsphereproxy-storagenode-aws-rds-instance.yaml
+++ b/examples/operator/shardingsphereproxy-storagenode-aws-rds-instance.yaml
@@ -38,5 +38,13 @@ metadata:
name: storage-node-example-1
annotations:
databaseclass.database-mesh.io/instance-identifier: "storage-node-example-1"
+ "databaseclass.database-mesh.io/instance-db-name": "test_db"
+
+ # annos about register storage unit
+ shardingsphere.apache.org/register-storage-unit-enabled: "false" # set ture if you want to test auto register storage unit.
+ shardingsphere.apache.org/logic-database-name: "sharding_db"
+ "shardingsphere.apache.org/compute-node-namespace": "default"
+ "shardingsphere.apache.org/compute-node-name": "shardingsphere-operator-shardingsphere-proxy"
+
spec:
databaseClassName: "aws-rds-instance-mysql-5.7"
diff --git a/shardingsphere-operator/pkg/controllers/storage_ndoe_controller_test.go b/shardingsphere-operator/pkg/controllers/storage_ndoe_controller_test.go
index e97255e..393c00c 100644
--- a/shardingsphere-operator/pkg/controllers/storage_ndoe_controller_test.go
+++ b/shardingsphere-operator/pkg/controllers/storage_ndoe_controller_test.go
@@ -254,14 +254,6 @@ var _ = Describe("StorageNode Controller Mock Test", func() {
Port: 3306,
},
}
- instanceInDeleting = dbmesh_rds.DescInstance{
- DBInstanceIdentifier: defaultTestInstanceIdentifier,
- DBInstanceStatus: dbmesh_rds.DBInstanceStatusDeleting,
- Endpoint: dbmesh_rds.Endpoint{
- Address: "127.0.0.1",
- Port: 3306,
- },
- }
)
It("should be successful when instance is in available status", func() {
deletingStorageNode := "test-deleting-storage-node"
@@ -295,10 +287,11 @@ var _ = Describe("StorageNode Controller Mock Test", func() {
Namespace: defaultTestNamespace,
},
})).Should(Succeed())
+
// mock aws rds client, delete instance
mockAws.EXPECT().GetInstance(gomock.Any(), gomock.Any()).Return(&rdsInstanceAvailable, nil)
mockAws.EXPECT().DeleteInstance(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil)
- mockAws.EXPECT().GetInstance(gomock.Any(), gomock.Any()).Return(&instanceInDeleting, nil)
+
_, err = reconciler.Reconcile(ctx, req)
Expect(err).To(BeNil())
diff --git a/shardingsphere-operator/pkg/controllers/storage_node_controller.go b/shardingsphere-operator/pkg/controllers/storage_node_controller.go
index eeb0c89..1628e37 100644
--- a/shardingsphere-operator/pkg/controllers/storage_node_controller.go
+++ b/shardingsphere-operator/pkg/controllers/storage_node_controller.go
@@ -19,7 +19,6 @@ package controllers
import (
"context"
- "encoding/json"
"fmt"
"reflect"
"strings"
@@ -68,19 +67,12 @@ type StorageNodeReconciler struct {
// Reconcile handles main function of this controller
// nolint:gocognit
func (r *StorageNodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
- r.Log.WithValues(StorageNodeControllerName, req.NamespacedName)
-
- r.Log.Info("Reconciling StorageNode")
+ r.Log.Info(fmt.Sprintf("Reconciling StorageNode %s", req.NamespacedName))
// get storage node
node := &v1alpha1.StorageNode{}
if err := r.Get(ctx, req.NamespacedName, node); err != nil {
- if client.IgnoreNotFound(err) == nil {
- r.Log.Info(fmt.Sprintf("StorageNode %s/%s is not exist", req.Namespace, req.Name))
- return ctrl.Result{}, nil
- }
- r.Log.Error(err, fmt.Sprintf("unable to fetch StorageNode %s/%s", req.Namespace, req.Name))
- return ctrl.Result{Requeue: true}, err
+ return ctrl.Result{}, client.IgnoreNotFound(err)
}
// Get databaseClass with storageNode.Spec.DatabaseClassName
@@ -91,7 +83,6 @@ func (r *StorageNodeReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}
// finalize storage node
- // nolint: nestif
if node.ObjectMeta.DeletionTimestamp.IsZero() {
// The object is not being deleted, so if it does not have our finalizer,
// then lets add the finalizer and update the object. This is equivalent to registering our finalizer.
@@ -102,72 +93,73 @@ func (r *StorageNodeReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}
}
} else if slices.Contains(node.ObjectMeta.Finalizers, FinalizerName) {
- switch node.Status.Phase {
- case v1alpha1.StorageNodePhaseReady, v1alpha1.StorageNodePhaseNotReady:
- if err := r.deleteDatabaseCluster(ctx, node, databaseClass); err != nil {
- return ctrl.Result{RequeueAfter: defaultRequeueTime}, err
- }
- case v1alpha1.StorageNodePhaseDeleting:
- ins, err := aws.NewRdsClient(r.AwsRDS).GetInstance(ctx, node)
- if err != nil {
- return ctrl.Result{RequeueAfter: defaultRequeueTime}, err
- }
- if ins == nil {
- // update storage node status to v1alpha1.StorageNodePhaseDeleteComplete
- node.Status.Phase = v1alpha1.StorageNodePhaseDeleteComplete
- node.Status.Instances = nil
- if err := r.Status().Update(ctx, node); err != nil {
- r.Log.Error(err, "failed to update storage node status")
- }
- return ctrl.Result{RequeueAfter: defaultRequeueTime}, err
- }
- r.Log.V(2).Info("RDS instance is still deleting")
- return ctrl.Result{RequeueAfter: defaultRequeueTime}, err
- case v1alpha1.StorageNodePhaseDeleteComplete:
- // remove our finalizer from the list and update it.
- node.ObjectMeta.Finalizers = slices.Filter([]string{}, node.ObjectMeta.Finalizers, func(f string) bool {
- return f != FinalizerName
- })
- if err := r.Update(ctx, node); err != nil {
- r.Log.Error(err, "failed to remove finalizer")
- }
- return ctrl.Result{}, nil
- default:
- r.Recorder.Event(node, corev1.EventTypeWarning, fmt.Sprintf("Delete %s/%s Failed", node.GetNamespace(), node.GetName()), "StorageNode is not in a valid phase")
- return ctrl.Result{RequeueAfter: defaultRequeueTime}, err
- }
+ return r.finalize(ctx, node, databaseClass)
}
+ // reconcile storage node
return r.reconcile(ctx, databaseClass, node)
}
+func (r *StorageNodeReconciler) finalize(ctx context.Context, node *v1alpha1.StorageNode, databaseClass *dbmeshv1alpha1.DatabaseClass) (ctrl.Result, error) {
+ var err error
+ switch node.Status.Phase {
+ case v1alpha1.StorageNodePhaseReady, v1alpha1.StorageNodePhaseNotReady:
+ // set storage node status to deleting
+ node.Status.Phase = v1alpha1.StorageNodePhaseDeleting
+ case v1alpha1.StorageNodePhaseDeleting:
+ break
+ case v1alpha1.StorageNodePhaseDeleteComplete:
+ node.ObjectMeta.Finalizers = slices.Filter([]string{}, node.ObjectMeta.Finalizers, func(f string) bool {
+ return f != FinalizerName
+ })
+ if err = r.Update(ctx, node); err != nil {
+ r.Log.Error(err, "failed to remove finalizer")
+ }
+ return ctrl.Result{}, nil
+ }
+
+ if err = r.deleteDatabaseCluster(ctx, node, databaseClass); err != nil {
+ r.Log.Error(err, "failed to delete database cluster")
+ return ctrl.Result{RequeueAfter: defaultRequeueTime}, err
+ }
+
+ desiredState := computeDesiredState(node.Status)
+
+ if !reflect.DeepEqual(node.Status, desiredState) {
+ node.Status = desiredState
+ err := r.Status().Update(ctx, node)
+ if err != nil {
+ r.Log.Error(err, fmt.Sprintf("unable to update StorageNode %s/%s", node.GetNamespace(), node.GetName()))
+ return ctrl.Result{Requeue: true}, err
+ }
+ }
+
+ return ctrl.Result{RequeueAfter: defaultRequeueTime}, nil
+}
+
func (r *StorageNodeReconciler) reconcile(ctx context.Context, dbClass *dbmeshv1alpha1.DatabaseClass, node *v1alpha1.StorageNode) (ctrl.Result, error) {
// reconcile storage node with databaseClass
switch dbClass.Spec.Provisioner {
case dbmeshv1alpha1.ProvisionerAWSRDSInstance:
if err := r.reconcileAwsRdsInstance(ctx, aws.NewRdsClient(r.AwsRDS), node, dbClass); err != nil {
r.Log.Error(err, fmt.Sprintf("unable to reconcile AWS RDS Instance %s/%s, err:%s", node.GetNamespace(), node.GetName(), err.Error()))
- r.Recorder.Event(node, corev1.EventTypeWarning, fmt.Sprintf("Reconcile %s/%s Failed", node.GetNamespace(), node.GetName()), err.Error())
+ r.Recorder.Eventf(node, corev1.EventTypeWarning, "Reconcile Failed", fmt.Sprintf("unable to reconcile AWS RDS Instance %s/%s, err:%s", node.GetNamespace(), node.GetName(), err.Error()))
}
case dbmeshv1alpha1.ProvisionerAWSAurora:
if err := r.reconcileAwsAurora(ctx, aws.NewRdsClient(r.AwsRDS), node, dbClass); err != nil {
- r.Recorder.Event(node, corev1.EventTypeWarning, fmt.Sprintf("Reconcile %s/%s Failed", node.GetNamespace(), node.GetName()), err.Error())
+ r.Recorder.Eventf(node, corev1.EventTypeWarning, "Reconcile Failed", fmt.Sprintf("unable to reconcile AWS Aurora %s/%s, err:%s", node.GetNamespace(), node.GetName(), err.Error()))
}
default:
r.Recorder.Event(node, corev1.EventTypeWarning, "UnsupportedDatabaseProvisioner", fmt.Sprintf("unsupported database provisioner %s", dbClass.Spec.Provisioner))
r.Log.Error(nil, fmt.Sprintf("unsupported database provisioner %s", dbClass.Spec.Provisioner))
}
- d, _ := json.MarshalIndent(node.Status, "", " ")
- r.Log.Info(string(d))
-
// register storage unit if needed.
if err := r.registerStorageUnit(ctx, node); err != nil {
r.Recorder.Eventf(node, corev1.EventTypeWarning, "RegisterStorageUnitFailed", "unable to register storage unit %s/%s", node.GetNamespace(), node.GetName())
return ctrl.Result{Requeue: true}, err
}
- // finally, update status
desiredState := computeDesiredState(node.Status)
if !reflect.DeepEqual(node.Status, desiredState) {
@@ -179,9 +171,6 @@ func (r *StorageNodeReconciler) reconcile(ctx context.Context, dbClass *dbmeshv1
}
}
- d, _ = json.MarshalIndent(node.Status, "", " ")
- r.Log.Info(string(d))
-
return ctrl.Result{RequeueAfter: defaultRequeueTime}, nil
}
@@ -215,26 +204,22 @@ func (r *StorageNodeReconciler) getDatabaseClass(ctx context.Context, node *v1al
func computeDesiredState(status v1alpha1.StorageNodeStatus) v1alpha1.StorageNodeStatus {
// Initialize a new status object based on the current state
desiredState := status
-
clusterStatus := status.Cluster.Status
- if (clusterStatus == "" || clusterStatus == "Ready") && allInstancesReady(status.Instances) {
- desiredState.Phase = v1alpha1.StorageNodePhaseReady
+ if status.Phase == v1alpha1.StorageNodePhaseDeleting {
+ // If the storage node is being deleted, check if all instances are deleted.
+ if len(status.Instances) == 0 {
+ desiredState.Phase = v1alpha1.StorageNodePhaseDeleteComplete
+ }
} else {
- desiredState.Phase = v1alpha1.StorageNodePhaseNotReady
- }
-
- for idx := range status.Instances {
- ins := &status.Instances[idx]
- if ins.Status == string(rds.DBInstanceStatusDeleting) {
- desiredState.Phase = v1alpha1.StorageNodePhaseDeleting
+ // If the storage node is not being deleted, check if all instances are ready.
+ if (clusterStatus == "" || clusterStatus == "Ready") && allInstancesReady(status.Instances) {
+ desiredState.Phase = v1alpha1.StorageNodePhaseReady
+ } else {
+ desiredState.Phase = v1alpha1.StorageNodePhaseNotReady
}
}
- if (status.Phase == v1alpha1.StorageNodePhaseDeleting || status.Phase == v1alpha1.StorageNodePhaseDeleteComplete) && len(status.Instances) == 0 {
- desiredState.Phase = v1alpha1.StorageNodePhaseDeleteComplete
- }
-
desiredState.Conditions = computeNewConditions(desiredState, status, clusterStatus)
return desiredState
@@ -318,10 +303,6 @@ func allInstancesReady(instances []v1alpha1.InstanceStatus) bool {
}
func (r *StorageNodeReconciler) reconcileAwsRdsInstance(ctx context.Context, client aws.IRdsClient, node *v1alpha1.StorageNode, dbClass *dbmeshv1alpha1.DatabaseClass) error {
- if node.Status.Phase == v1alpha1.StorageNodePhaseDeleteComplete {
- return nil
- }
-
instance, err := client.GetInstance(ctx, node)
if err != nil {
return err
@@ -349,6 +330,11 @@ func (r *StorageNodeReconciler) reconcileAwsRdsInstance(ctx context.Context, cli
func updateAWSRDSInstanceStatus(node *v1alpha1.StorageNode, instance *rds.DescInstance) error {
instances := make([]v1alpha1.InstanceStatus, 0)
+ if instance == nil {
+ node.Status.Instances = instances
+ return nil
+ }
+
status := instance.DBInstanceStatus
if status == rds.DBInstanceStatusAvailable {
status = rds.DBInstanceStatusReady
@@ -449,23 +435,14 @@ func (r *StorageNodeReconciler) deleteAWSRDSInstance(ctx context.Context, client
return err
}
- if instance == nil {
- r.Log.Info(fmt.Sprintf("instance %s is not found", node.Annotations[dbmeshv1alpha1.AnnotationsInstanceIdentifier]))
- return nil
- }
-
- if instance.DBInstanceStatus == rds.DBInstanceStatusDeleting {
- r.Log.Info(fmt.Sprintf("instance %s is deleting", node.Annotations[dbmeshv1alpha1.AnnotationsInstanceIdentifier]))
- return nil
- }
-
- if err := client.DeleteInstance(ctx, node, databaseClass); err != nil {
- r.Recorder.Eventf(node, corev1.EventTypeWarning, "DeleteFailed", "Failed to delete instance %s: %s", node.Annotations[dbmeshv1alpha1.AnnotationsInstanceIdentifier], err.Error())
- return err
+ if instance != nil && instance.DBInstanceStatus != rds.DBInstanceStatusDeleting {
+ if err := client.DeleteInstance(ctx, node, databaseClass); err != nil {
+ r.Recorder.Eventf(node, corev1.EventTypeWarning, "DeleteFailed", "Failed to delete instance %s: %s", node.Annotations[dbmeshv1alpha1.AnnotationsInstanceIdentifier], err.Error())
+ return err
+ }
+ r.Recorder.Event(node, corev1.EventTypeNormal, "Deleting", fmt.Sprintf("instance %s is deleting", node.Annotations[dbmeshv1alpha1.AnnotationsInstanceIdentifier]))
}
- r.Recorder.Event(node, corev1.EventTypeNormal, "Deleting", fmt.Sprintf("instance %s is deleting", node.Annotations[dbmeshv1alpha1.AnnotationsInstanceIdentifier]))
-
// update instance status
if err := updateAWSRDSInstanceStatus(node, instance); err != nil {
return fmt.Errorf("updateAWSRDSInstanceStatus failed: %w", err)
@@ -478,10 +455,13 @@ func (r *StorageNodeReconciler) deleteAWSRDSInstance(ctx context.Context, client
func (r *StorageNodeReconciler) registerStorageUnit(ctx context.Context, node *v1alpha1.StorageNode) error {
// if register storage unit is not enabled, return
if node.Annotations[AnnotationKeyRegisterStorageUnitEnabled] != "true" {
- r.Log.Info(fmt.Sprintf("register storage unit is not enabled for node %s/%s", node.GetNamespace(), node.GetName()))
return nil
}
+ if err := r.validateComputeNodeAnnotations(node); err != nil {
+ return err
+ }
+
// if storage unit is already registered, return
if node.Status.Registered {
return nil
@@ -489,14 +469,10 @@ func (r *StorageNodeReconciler) registerStorageUnit(ctx context.Context, node *v
// if node is not ready, return
if node.Status.Phase != v1alpha1.StorageNodePhaseReady {
- r.Recorder.Eventf(node, corev1.EventTypeWarning, "RegisterCanceled", "Canceled to register storage unit for node %s/%s: node is not ready", node.GetNamespace(), node.GetName())
+ r.Recorder.Eventf(node, corev1.EventTypeWarning, "RegisterWaiting", "Waiting to register storage unit for node %s/%s: node is not ready", node.GetNamespace(), node.GetName())
return nil
}
- if err := validateComputeNodeAnnotations(node); err != nil {
- return err
- }
-
logicDBName := node.Annotations[AnnotationKeyLogicDatabaseName]
dbName := node.Annotations[dbmeshv1alpha1.AnnotationsInstanceDBName]
@@ -517,7 +493,7 @@ func (r *StorageNodeReconciler) registerStorageUnit(ctx context.Context, node *v
ins := node.Status.Instances[0]
host := ins.Endpoint.Address
port := ins.Endpoint.Port
- username := strings.Split(node.Annotations[dbmeshv1alpha1.AnnotationsMasterUsername], "@")[0]
+ username := node.Annotations[dbmeshv1alpha1.AnnotationsMasterUsername]
password := node.Annotations[dbmeshv1alpha1.AnnotationsMasterUserPassword]
// TODO how to set ds name?
@@ -530,7 +506,7 @@ func (r *StorageNodeReconciler) registerStorageUnit(ctx context.Context, node *v
return nil
}
-func validateComputeNodeAnnotations(node *v1alpha1.StorageNode) error {
+func (r *StorageNodeReconciler) validateComputeNodeAnnotations(node *v1alpha1.StorageNode) error {
requiredAnnos := []string{
AnnotationKeyLogicDatabaseName,
dbmeshv1alpha1.AnnotationsInstanceDBName,
@@ -540,6 +516,7 @@ func validateComputeNodeAnnotations(node *v1alpha1.StorageNode) error {
for _, anno := range requiredAnnos {
if v, ok := node.Annotations[anno]; !ok || v == "" {
+ r.Recorder.Eventf(node, corev1.EventTypeWarning, "RegisterChecking", "Waiting to register storage unit for node %s/%s: annotation %s is required", node.GetNamespace(), node.GetName(), anno)
return fmt.Errorf("annotation %s is required", anno)
}
}
@@ -574,7 +551,7 @@ func (r *StorageNodeReconciler) getShardingsphereServer(ctx context.Context, nod
return nil, fmt.Errorf("no user in compute node %s/%s", cn.Namespace, cn.Name)
}
- username = serverConf.Authority.Users[0].User
+ username = strings.Split(serverConf.Authority.Users[0].User, "@")[0]
password = serverConf.Authority.Users[0].Password
// get service of compute node