You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by mi...@apache.org on 2023/05/19 03:36:40 UTC

[shardingsphere-on-cloud] branch main updated: feat(storage-node): unregister storage unit when storage node be deleted

This is an automated email from the ASF dual-hosted git repository.

miaoliyao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/shardingsphere-on-cloud.git


The following commit(s) were added to refs/heads/main by this push:
     new cd25639  feat(storage-node): unregister storage unit when storage node be deleted
     new c9ba968  Merge pull request #373 from Xu-Wentao/storage-node
cd25639 is described below

commit cd25639cd8c7b29ca9a978b3b18b7ea2026a440f
Author: xuwentao <cu...@yahoo.com>
AuthorDate: Thu May 18 19:41:12 2023 +0800

    feat(storage-node): unregister storage unit when storage node be deleted
---
 .../controllers/storage_ndoe_controller_test.go    | 76 ++++++++++++++++++++++
 .../pkg/controllers/storage_node_controller.go     | 32 +++++++++
 .../pkg/shardingsphere/shardingsphere.go           | 26 +-------
 .../test/e2e/storage_node_controller_test.go       | 42 ++++++++++--
 4 files changed, 145 insertions(+), 31 deletions(-)

diff --git a/shardingsphere-operator/pkg/controllers/storage_ndoe_controller_test.go b/shardingsphere-operator/pkg/controllers/storage_ndoe_controller_test.go
index 393c00c..04f01f1 100644
--- a/shardingsphere-operator/pkg/controllers/storage_ndoe_controller_test.go
+++ b/shardingsphere-operator/pkg/controllers/storage_ndoe_controller_test.go
@@ -645,5 +645,81 @@ var _ = Describe("StorageNode Controller Mock Test", func() {
 			Expect(reconciler.registerStorageUnit(ctx, sn)).To(BeNil())
 			Expect(sn.Status.Registered).To(BeTrue())
 		})
+
+		Context("Test unregisterStorageUnit", func() {
+			BeforeEach(func() {
+				mockCtrl = gomock.NewController(GinkgoT())
+				mockSS = mock_shardingsphere.NewMockIServer(mockCtrl)
+				monkey.Patch(shardingsphere.NewServer, func(_, _ string, _ uint, _, _ string) (shardingsphere.IServer, error) {
+					return mockSS, nil
+				})
+			})
+			AfterEach(func() {
+				mockCtrl.Finish()
+				monkey.UnpatchAll()
+			})
+			It("should be successful when unregister storage unit", func() {
+				testName := "test-unregister-storage-unit"
+
+				cn := &v1alpha1.ComputeNode{
+					ObjectMeta: metav1.ObjectMeta{
+						Name:      testName,
+						Namespace: defaultTestNamespace,
+					},
+					Spec: v1alpha1.ComputeNodeSpec{
+						Bootstrap: v1alpha1.BootstrapConfig{
+							ServerConfig: v1alpha1.ServerConfig{
+								Authority: v1alpha1.ComputeNodeAuthority{
+									Users: []v1alpha1.ComputeNodeUser{
+										{
+											User:     "root",
+											Password: "root",
+										},
+									},
+								},
+							},
+						},
+					},
+				}
+				svc := &corev1.Service{
+					ObjectMeta: metav1.ObjectMeta{
+						Name:      testName,
+						Namespace: defaultTestNamespace,
+					},
+					Spec: corev1.ServiceSpec{
+						Ports: []corev1.ServicePort{
+							{
+								Name:     "http",
+								Protocol: "TCP",
+								Port:     3307,
+							},
+						},
+					},
+				}
+				Expect(fakeClient.Create(ctx, cn)).Should(Succeed())
+				Expect(fakeClient.Create(ctx, svc)).Should(Succeed())
+
+				sn := &v1alpha1.StorageNode{
+					ObjectMeta: metav1.ObjectMeta{
+						Name:      testName,
+						Namespace: defaultTestNamespace,
+						Annotations: map[string]string{
+							AnnotationKeyLogicDatabaseName:           testName,
+							dbmeshv1alpha1.AnnotationsInstanceDBName: testName,
+							AnnotationKeyComputeNodeName:             testName,
+							AnnotationKeyComputeNodeNamespace:        defaultTestNamespace,
+						},
+					},
+					Status: v1alpha1.StorageNodeStatus{
+						Registered: true,
+					},
+				}
+				Expect(fakeClient.Create(ctx, sn)).Should(Succeed())
+
+				mockSS.EXPECT().UnRegisterStorageUnit(gomock.Any()).Return(nil)
+				mockSS.EXPECT().Close().Return(nil)
+				Expect(reconciler.unregisterStorageUnit(ctx, sn)).To(BeNil())
+			})
+		})
 	})
 })
diff --git a/shardingsphere-operator/pkg/controllers/storage_node_controller.go b/shardingsphere-operator/pkg/controllers/storage_node_controller.go
index 1628e37..b010b8f 100644
--- a/shardingsphere-operator/pkg/controllers/storage_node_controller.go
+++ b/shardingsphere-operator/pkg/controllers/storage_node_controller.go
@@ -118,6 +118,12 @@ func (r *StorageNodeReconciler) finalize(ctx context.Context, node *v1alpha1.Sto
 		return ctrl.Result{}, nil
 	}
 
+	// Try to unregister storage unit in shardingsphere.
+	if err = r.unregisterStorageUnit(ctx, node); err != nil {
+		r.Log.Error(err, "failed to delete storage unit")
+		return ctrl.Result{RequeueAfter: defaultRequeueTime}, err
+	}
+
 	if err = r.deleteDatabaseCluster(ctx, node, databaseClass); err != nil {
 		r.Log.Error(err, "failed to delete database cluster")
 		return ctrl.Result{RequeueAfter: defaultRequeueTime}, err
@@ -506,6 +512,32 @@ func (r *StorageNodeReconciler) registerStorageUnit(ctx context.Context, node *v
 	return nil
 }
 
+func (r *StorageNodeReconciler) unregisterStorageUnit(ctx context.Context, node *v1alpha1.StorageNode) error {
+	if !node.Status.Registered {
+		return nil
+	}
+	if err := r.validateComputeNodeAnnotations(node); err != nil {
+		return err
+	}
+
+	ssServer, err := r.getShardingsphereServer(ctx, node)
+	if err != nil {
+		return fmt.Errorf("getShardingsphereServer failed: %w", err)
+	}
+
+	defer ssServer.Close()
+
+	// TODO how to set ds name?
+	if err := ssServer.UnRegisterStorageUnit("ds_0"); err != nil {
+		return fmt.Errorf("unregister storage unit failed: %w", err)
+	}
+
+	r.Recorder.Eventf(node, corev1.EventTypeNormal, "StorageUnitUnRegistered", "StorageUnit of node %s/%s is unregistered", node.GetNamespace(), node.GetName())
+
+	node.Status.Registered = false
+	return nil
+}
+
 func (r *StorageNodeReconciler) validateComputeNodeAnnotations(node *v1alpha1.StorageNode) error {
 	requiredAnnos := []string{
 		AnnotationKeyLogicDatabaseName,
diff --git a/shardingsphere-operator/pkg/shardingsphere/shardingsphere.go b/shardingsphere-operator/pkg/shardingsphere/shardingsphere.go
index 746689a..9b2a0d9 100644
--- a/shardingsphere-operator/pkg/shardingsphere/shardingsphere.go
+++ b/shardingsphere-operator/pkg/shardingsphere/shardingsphere.go
@@ -39,9 +39,7 @@ const (
 	DistSQLDropTable = `DROP TABLE %s;`
 )
 
-var (
-	ruleTypeMap = map[string]string{}
-)
+var ruleTypeMap = map[string]string{}
 
 type Rule struct {
 	Type string
@@ -142,22 +140,11 @@ func (s *server) UnRegisterStorageUnit(dsName string) error {
 		return fmt.Errorf("get rules used error: %w", err)
 	}
 
-	// TODO DISCUSS: should we drop all tables used by storage unit?
-	// clean all rules and tables used by storage unit
-	tables := map[string]struct{}{}
+	// clean all rules used by storage unit
 	for _, rule := range rules {
 		if err := s.dropRule(rule.Type, rule.Name); err != nil {
 			return fmt.Errorf("drop rule error: %w", err)
 		}
-		if _, ok := tables[rule.Name]; !ok {
-			tables[rule.Name] = struct{}{}
-		}
-	}
-
-	for table := range tables {
-		if err := s.dropTable(table); err != nil {
-			return fmt.Errorf("drop table error: %w", err)
-		}
 	}
 
 	distSQL := fmt.Sprintf(DistSQLUnRegisterStorageUnit, dsName)
@@ -181,15 +168,6 @@ func (s *server) dropRule(ruleType, ruleName string) error {
 	return nil
 }
 
-func (s *server) dropTable(tableName string) error {
-	distSQL := fmt.Sprintf(DistSQLDropTable, tableName)
-	_, err := s.db.Exec(distSQL)
-	if err != nil {
-		return fmt.Errorf("drop table fail, err: %s", err)
-	}
-	return nil
-}
-
 func init() {
 	// init rule type map
 	// implement more rule type if needed
diff --git a/shardingsphere-operator/test/e2e/storage_node_controller_test.go b/shardingsphere-operator/test/e2e/storage_node_controller_test.go
index 255ec71..0c1a681 100644
--- a/shardingsphere-operator/test/e2e/storage_node_controller_test.go
+++ b/shardingsphere-operator/test/e2e/storage_node_controller_test.go
@@ -21,6 +21,7 @@ import (
 	"context"
 	"database/sql"
 	"github.com/DATA-DOG/go-sqlmock"
+	apierrors "k8s.io/apimachinery/pkg/api/errors"
 	"reflect"
 	"regexp"
 	"time"
@@ -145,15 +146,13 @@ var _ = Describe("StorageNode Controller Suite Test", func() {
 			}, 10*time.Second, 1*time.Second).Should(BeTrue())
 		})
 
-		It("should register storage unit success", func() {
+		It("should register and unregister storage unit success", func() {
 			// mock mysql
 			db, dbmock, err := sqlmock.New()
 			Expect(err).Should(Succeed())
 			Expect(dbmock).ShouldNot(BeNil())
-			defer db.Close()
-
 			// mock rds DescribeDBInstances func returns success
-			monkey.PatchInstanceMethod(reflect.TypeOf(&aws.RdsClient{}), "GetInstance", func(_ *aws.RdsClient, _ context.Context, _ *v1alpha1.StorageNode) (*dbmesh_rds.DescInstance, error) {
+			g := monkey.PatchInstanceMethod(reflect.TypeOf(&aws.RdsClient{}), "GetInstance", func(_ *aws.RdsClient, _ context.Context, _ *v1alpha1.StorageNode) (*dbmesh_rds.DescInstance, error) {
 				return &dbmesh_rds.DescInstance{
 					DBInstanceStatus: dbmesh_rds.DBInstanceStatusAvailable,
 					Endpoint: dbmesh_rds.Endpoint{
@@ -162,9 +161,15 @@ var _ = Describe("StorageNode Controller Suite Test", func() {
 					},
 				}, nil
 			})
+			monkey.PatchInstanceMethod(reflect.TypeOf(&aws.RdsClient{}), "DeleteInstance", func(_ *aws.RdsClient, _ context.Context, _ *v1alpha1.StorageNode, _ *dbmeshv1alpha1.DatabaseClass) error {
+				return nil
+			})
 			monkey.Patch(sql.Open, func(_ string, _ string) (*sql.DB, error) {
 				return db, nil
 			})
+			monkey.PatchInstanceMethod(reflect.TypeOf(db), "Close", func(_ *sql.DB) error {
+				return nil
+			})
 
 			cn := &v1alpha1.ComputeNode{
 				ObjectMeta: metav1.ObjectMeta{
@@ -238,20 +243,43 @@ var _ = Describe("StorageNode Controller Suite Test", func() {
 			Expect(k8sClient.Create(ctx, cn)).Should(Succeed())
 			Expect(k8sClient.Create(ctx, node)).Should(Succeed())
 
-			dbmock.ExpectExec(regexp.QuoteMeta("CREATE DATABASE IF NOT EXISTS")).WillReturnResult(sqlmock.NewResult(0, 0))
+			dbmock.ExpectExec(regexp.QuoteMeta("CREATE DATABASE IF NOT EXISTS")).WillReturnResult(sqlmock.NewResult(1, 1))
 			dbmock.ExpectExec(regexp.QuoteMeta("REGISTER STORAGE UNIT IF NOT EXISTS")).WillReturnResult(sqlmock.NewResult(0, 0))
 
 			Eventually(func() v1alpha1.StorageNodePhaseStatus {
 				newSN := &v1alpha1.StorageNode{}
 				Expect(k8sClient.Get(ctx, client.ObjectKey{Name: nodeName, Namespace: "default"}, newSN)).Should(Succeed())
 				return newSN.Status.Phase
-			}, 10, 1).Should(Equal(v1alpha1.StorageNodePhaseReady))
+			}, 20, 2).Should(Equal(v1alpha1.StorageNodePhaseReady))
 
 			Eventually(func() bool {
 				newSN := &v1alpha1.StorageNode{}
 				Expect(k8sClient.Get(ctx, client.ObjectKey{Name: nodeName, Namespace: "default"}, newSN)).Should(Succeed())
 				return newSN.Status.Registered
-			}, 10, 1).Should(BeTrue())
+			}, 20, 2).Should(BeTrue())
+
+			// delete storage node
+			Expect(k8sClient.Delete(ctx, node)).Should(Succeed())
+
+			dbmock.ExpectQuery(regexp.QuoteMeta("SHOW RULES USED STORAGE UNIT")).WillReturnRows(sqlmock.NewRows([]string{"type", "name"}).AddRow("sharding", "t_order"))
+			dbmock.ExpectExec("DROP SHARDING TABLE RULE").WillReturnResult(sqlmock.NewResult(1, 1))
+			dbmock.ExpectExec(regexp.QuoteMeta("UNREGISTER STORAGE UNIT")).WillReturnResult(sqlmock.NewResult(0, 0))
+			Eventually(func() v1alpha1.StorageNodePhaseStatus {
+				newSN := &v1alpha1.StorageNode{}
+				Expect(k8sClient.Get(ctx, client.ObjectKey{Name: nodeName, Namespace: "default"}, newSN)).Should(Succeed())
+				return newSN.Status.Phase
+			}, 20, 2).Should(Equal(v1alpha1.StorageNodePhaseDeleting))
+
+			g.Unpatch()
+			monkey.PatchInstanceMethod(reflect.TypeOf(&aws.RdsClient{}), "GetInstance", func(_ *aws.RdsClient, _ context.Context, _ *v1alpha1.StorageNode) (*dbmesh_rds.DescInstance, error) {
+				return nil, nil
+			})
+
+			Eventually(func() bool {
+				newSN := &v1alpha1.StorageNode{}
+				err := k8sClient.Get(ctx, client.ObjectKey{Name: nodeName, Namespace: "default"}, newSN)
+				return apierrors.IsNotFound(err)
+			}, 20, 2).Should(BeTrue())
 		})
 	})
 })