You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by mi...@apache.org on 2023/05/19 03:36:40 UTC
[shardingsphere-on-cloud] branch main updated: feat(storage-node): unregister storage unit when storage node be deleted
This is an automated email from the ASF dual-hosted git repository.
miaoliyao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/shardingsphere-on-cloud.git
The following commit(s) were added to refs/heads/main by this push:
new cd25639 feat(storage-node): unregister storage unit when storage node be deleted
new c9ba968 Merge pull request #373 from Xu-Wentao/storage-node
cd25639 is described below
commit cd25639cd8c7b29ca9a978b3b18b7ea2026a440f
Author: xuwentao <cu...@yahoo.com>
AuthorDate: Thu May 18 19:41:12 2023 +0800
feat(storage-node): unregister storage unit when storage node be deleted
---
.../controllers/storage_ndoe_controller_test.go | 76 ++++++++++++++++++++++
.../pkg/controllers/storage_node_controller.go | 32 +++++++++
.../pkg/shardingsphere/shardingsphere.go | 26 +-------
.../test/e2e/storage_node_controller_test.go | 42 ++++++++++--
4 files changed, 145 insertions(+), 31 deletions(-)
diff --git a/shardingsphere-operator/pkg/controllers/storage_ndoe_controller_test.go b/shardingsphere-operator/pkg/controllers/storage_ndoe_controller_test.go
index 393c00c..04f01f1 100644
--- a/shardingsphere-operator/pkg/controllers/storage_ndoe_controller_test.go
+++ b/shardingsphere-operator/pkg/controllers/storage_ndoe_controller_test.go
@@ -645,5 +645,81 @@ var _ = Describe("StorageNode Controller Mock Test", func() {
Expect(reconciler.registerStorageUnit(ctx, sn)).To(BeNil())
Expect(sn.Status.Registered).To(BeTrue())
})
+
+ Context("Test unregisterStorageUnit", func() {
+ BeforeEach(func() {
+ mockCtrl = gomock.NewController(GinkgoT())
+ mockSS = mock_shardingsphere.NewMockIServer(mockCtrl)
+ monkey.Patch(shardingsphere.NewServer, func(_, _ string, _ uint, _, _ string) (shardingsphere.IServer, error) {
+ return mockSS, nil
+ })
+ })
+ AfterEach(func() {
+ mockCtrl.Finish()
+ monkey.UnpatchAll()
+ })
+ It("should be successful when unregister storage unit", func() {
+ testName := "test-unregister-storage-unit"
+
+ cn := &v1alpha1.ComputeNode{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: testName,
+ Namespace: defaultTestNamespace,
+ },
+ Spec: v1alpha1.ComputeNodeSpec{
+ Bootstrap: v1alpha1.BootstrapConfig{
+ ServerConfig: v1alpha1.ServerConfig{
+ Authority: v1alpha1.ComputeNodeAuthority{
+ Users: []v1alpha1.ComputeNodeUser{
+ {
+ User: "root",
+ Password: "root",
+ },
+ },
+ },
+ },
+ },
+ },
+ }
+ svc := &corev1.Service{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: testName,
+ Namespace: defaultTestNamespace,
+ },
+ Spec: corev1.ServiceSpec{
+ Ports: []corev1.ServicePort{
+ {
+ Name: "http",
+ Protocol: "TCP",
+ Port: 3307,
+ },
+ },
+ },
+ }
+ Expect(fakeClient.Create(ctx, cn)).Should(Succeed())
+ Expect(fakeClient.Create(ctx, svc)).Should(Succeed())
+
+ sn := &v1alpha1.StorageNode{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: testName,
+ Namespace: defaultTestNamespace,
+ Annotations: map[string]string{
+ AnnotationKeyLogicDatabaseName: testName,
+ dbmeshv1alpha1.AnnotationsInstanceDBName: testName,
+ AnnotationKeyComputeNodeName: testName,
+ AnnotationKeyComputeNodeNamespace: defaultTestNamespace,
+ },
+ },
+ Status: v1alpha1.StorageNodeStatus{
+ Registered: true,
+ },
+ }
+ Expect(fakeClient.Create(ctx, sn)).Should(Succeed())
+
+ mockSS.EXPECT().UnRegisterStorageUnit(gomock.Any()).Return(nil)
+ mockSS.EXPECT().Close().Return(nil)
+ Expect(reconciler.unregisterStorageUnit(ctx, sn)).To(BeNil())
+ })
+ })
})
})
diff --git a/shardingsphere-operator/pkg/controllers/storage_node_controller.go b/shardingsphere-operator/pkg/controllers/storage_node_controller.go
index 1628e37..b010b8f 100644
--- a/shardingsphere-operator/pkg/controllers/storage_node_controller.go
+++ b/shardingsphere-operator/pkg/controllers/storage_node_controller.go
@@ -118,6 +118,12 @@ func (r *StorageNodeReconciler) finalize(ctx context.Context, node *v1alpha1.Sto
return ctrl.Result{}, nil
}
+ // Try to unregister storage unit in shardingsphere.
+ if err = r.unregisterStorageUnit(ctx, node); err != nil {
+ r.Log.Error(err, "failed to delete storage unit")
+ return ctrl.Result{RequeueAfter: defaultRequeueTime}, err
+ }
+
if err = r.deleteDatabaseCluster(ctx, node, databaseClass); err != nil {
r.Log.Error(err, "failed to delete database cluster")
return ctrl.Result{RequeueAfter: defaultRequeueTime}, err
@@ -506,6 +512,32 @@ func (r *StorageNodeReconciler) registerStorageUnit(ctx context.Context, node *v
return nil
}
+func (r *StorageNodeReconciler) unregisterStorageUnit(ctx context.Context, node *v1alpha1.StorageNode) error {
+ if !node.Status.Registered {
+ return nil
+ }
+ if err := r.validateComputeNodeAnnotations(node); err != nil {
+ return err
+ }
+
+ ssServer, err := r.getShardingsphereServer(ctx, node)
+ if err != nil {
+ return fmt.Errorf("getShardingsphereServer failed: %w", err)
+ }
+
+ defer ssServer.Close()
+
+ // TODO how to set ds name?
+ if err := ssServer.UnRegisterStorageUnit("ds_0"); err != nil {
+ return fmt.Errorf("unregister storage unit failed: %w", err)
+ }
+
+ r.Recorder.Eventf(node, corev1.EventTypeNormal, "StorageUnitUnRegistered", "StorageUnit of node %s/%s is unregistered", node.GetNamespace(), node.GetName())
+
+ node.Status.Registered = false
+ return nil
+}
+
func (r *StorageNodeReconciler) validateComputeNodeAnnotations(node *v1alpha1.StorageNode) error {
requiredAnnos := []string{
AnnotationKeyLogicDatabaseName,
diff --git a/shardingsphere-operator/pkg/shardingsphere/shardingsphere.go b/shardingsphere-operator/pkg/shardingsphere/shardingsphere.go
index 746689a..9b2a0d9 100644
--- a/shardingsphere-operator/pkg/shardingsphere/shardingsphere.go
+++ b/shardingsphere-operator/pkg/shardingsphere/shardingsphere.go
@@ -39,9 +39,7 @@ const (
DistSQLDropTable = `DROP TABLE %s;`
)
-var (
- ruleTypeMap = map[string]string{}
-)
+var ruleTypeMap = map[string]string{}
type Rule struct {
Type string
@@ -142,22 +140,11 @@ func (s *server) UnRegisterStorageUnit(dsName string) error {
return fmt.Errorf("get rules used error: %w", err)
}
- // TODO DISCUSS: should we drop all tables used by storage unit?
- // clean all rules and tables used by storage unit
- tables := map[string]struct{}{}
+ // clean all rules used by storage unit
for _, rule := range rules {
if err := s.dropRule(rule.Type, rule.Name); err != nil {
return fmt.Errorf("drop rule error: %w", err)
}
- if _, ok := tables[rule.Name]; !ok {
- tables[rule.Name] = struct{}{}
- }
- }
-
- for table := range tables {
- if err := s.dropTable(table); err != nil {
- return fmt.Errorf("drop table error: %w", err)
- }
}
distSQL := fmt.Sprintf(DistSQLUnRegisterStorageUnit, dsName)
@@ -181,15 +168,6 @@ func (s *server) dropRule(ruleType, ruleName string) error {
return nil
}
-func (s *server) dropTable(tableName string) error {
- distSQL := fmt.Sprintf(DistSQLDropTable, tableName)
- _, err := s.db.Exec(distSQL)
- if err != nil {
- return fmt.Errorf("drop table fail, err: %s", err)
- }
- return nil
-}
-
func init() {
// init rule type map
// implement more rule type if needed
diff --git a/shardingsphere-operator/test/e2e/storage_node_controller_test.go b/shardingsphere-operator/test/e2e/storage_node_controller_test.go
index 255ec71..0c1a681 100644
--- a/shardingsphere-operator/test/e2e/storage_node_controller_test.go
+++ b/shardingsphere-operator/test/e2e/storage_node_controller_test.go
@@ -21,6 +21,7 @@ import (
"context"
"database/sql"
"github.com/DATA-DOG/go-sqlmock"
+ apierrors "k8s.io/apimachinery/pkg/api/errors"
"reflect"
"regexp"
"time"
@@ -145,15 +146,13 @@ var _ = Describe("StorageNode Controller Suite Test", func() {
}, 10*time.Second, 1*time.Second).Should(BeTrue())
})
- It("should register storage unit success", func() {
+ It("should register and unregister storage unit success", func() {
// mock mysql
db, dbmock, err := sqlmock.New()
Expect(err).Should(Succeed())
Expect(dbmock).ShouldNot(BeNil())
- defer db.Close()
-
// mock rds DescribeDBInstances func returns success
- monkey.PatchInstanceMethod(reflect.TypeOf(&aws.RdsClient{}), "GetInstance", func(_ *aws.RdsClient, _ context.Context, _ *v1alpha1.StorageNode) (*dbmesh_rds.DescInstance, error) {
+ g := monkey.PatchInstanceMethod(reflect.TypeOf(&aws.RdsClient{}), "GetInstance", func(_ *aws.RdsClient, _ context.Context, _ *v1alpha1.StorageNode) (*dbmesh_rds.DescInstance, error) {
return &dbmesh_rds.DescInstance{
DBInstanceStatus: dbmesh_rds.DBInstanceStatusAvailable,
Endpoint: dbmesh_rds.Endpoint{
@@ -162,9 +161,15 @@ var _ = Describe("StorageNode Controller Suite Test", func() {
},
}, nil
})
+ monkey.PatchInstanceMethod(reflect.TypeOf(&aws.RdsClient{}), "DeleteInstance", func(_ *aws.RdsClient, _ context.Context, _ *v1alpha1.StorageNode, _ *dbmeshv1alpha1.DatabaseClass) error {
+ return nil
+ })
monkey.Patch(sql.Open, func(_ string, _ string) (*sql.DB, error) {
return db, nil
})
+ monkey.PatchInstanceMethod(reflect.TypeOf(db), "Close", func(_ *sql.DB) error {
+ return nil
+ })
cn := &v1alpha1.ComputeNode{
ObjectMeta: metav1.ObjectMeta{
@@ -238,20 +243,43 @@ var _ = Describe("StorageNode Controller Suite Test", func() {
Expect(k8sClient.Create(ctx, cn)).Should(Succeed())
Expect(k8sClient.Create(ctx, node)).Should(Succeed())
- dbmock.ExpectExec(regexp.QuoteMeta("CREATE DATABASE IF NOT EXISTS")).WillReturnResult(sqlmock.NewResult(0, 0))
+ dbmock.ExpectExec(regexp.QuoteMeta("CREATE DATABASE IF NOT EXISTS")).WillReturnResult(sqlmock.NewResult(1, 1))
dbmock.ExpectExec(regexp.QuoteMeta("REGISTER STORAGE UNIT IF NOT EXISTS")).WillReturnResult(sqlmock.NewResult(0, 0))
Eventually(func() v1alpha1.StorageNodePhaseStatus {
newSN := &v1alpha1.StorageNode{}
Expect(k8sClient.Get(ctx, client.ObjectKey{Name: nodeName, Namespace: "default"}, newSN)).Should(Succeed())
return newSN.Status.Phase
- }, 10, 1).Should(Equal(v1alpha1.StorageNodePhaseReady))
+ }, 20, 2).Should(Equal(v1alpha1.StorageNodePhaseReady))
Eventually(func() bool {
newSN := &v1alpha1.StorageNode{}
Expect(k8sClient.Get(ctx, client.ObjectKey{Name: nodeName, Namespace: "default"}, newSN)).Should(Succeed())
return newSN.Status.Registered
- }, 10, 1).Should(BeTrue())
+ }, 20, 2).Should(BeTrue())
+
+ // delete storage node
+ Expect(k8sClient.Delete(ctx, node)).Should(Succeed())
+
+ dbmock.ExpectQuery(regexp.QuoteMeta("SHOW RULES USED STORAGE UNIT")).WillReturnRows(sqlmock.NewRows([]string{"type", "name"}).AddRow("sharding", "t_order"))
+ dbmock.ExpectExec("DROP SHARDING TABLE RULE").WillReturnResult(sqlmock.NewResult(1, 1))
+ dbmock.ExpectExec(regexp.QuoteMeta("UNREGISTER STORAGE UNIT")).WillReturnResult(sqlmock.NewResult(0, 0))
+ Eventually(func() v1alpha1.StorageNodePhaseStatus {
+ newSN := &v1alpha1.StorageNode{}
+ Expect(k8sClient.Get(ctx, client.ObjectKey{Name: nodeName, Namespace: "default"}, newSN)).Should(Succeed())
+ return newSN.Status.Phase
+ }, 20, 2).Should(Equal(v1alpha1.StorageNodePhaseDeleting))
+
+ g.Unpatch()
+ monkey.PatchInstanceMethod(reflect.TypeOf(&aws.RdsClient{}), "GetInstance", func(_ *aws.RdsClient, _ context.Context, _ *v1alpha1.StorageNode) (*dbmesh_rds.DescInstance, error) {
+ return nil, nil
+ })
+
+ Eventually(func() bool {
+ newSN := &v1alpha1.StorageNode{}
+ err := k8sClient.Get(ctx, client.ObjectKey{Name: nodeName, Namespace: "default"}, newSN)
+ return apierrors.IsNotFound(err)
+ }, 20, 2).Should(BeTrue())
})
})
})