You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by mi...@apache.org on 2023/06/08 06:30:45 UTC

[shardingsphere-on-cloud] branch main updated: feat: storage node support aws rds cluster

This is an automated email from the ASF dual-hosted git repository.

miaoliyao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/shardingsphere-on-cloud.git


The following commit(s) were added to refs/heads/main by this push:
     new bd4c5bf  feat: storage node support aws rds cluster
     new 4656bf7  Merge pull request #402 from Xu-Wentao/sn-aws-rds-cluster
bd4c5bf is described below

commit bd4c5bf83f9274238dcc1c92cc786eb630a22f7a
Author: xuwentao <cu...@yahoo.com>
AuthorDate: Wed Jun 7 20:27:23 2023 +0800

    feat: storage node support aws rds cluster
---
 .../api/v1alpha1/storage_node_types.go             |   7 +-
 .../api/v1alpha1/storageprovider_types.go          |  19 +-
 shardingsphere-operator/go.mod                     |   2 +-
 shardingsphere-operator/go.sum                     |  10 +-
 .../controllers/storage_ndoe_controller_test.go    | 464 ++++++++++++++++++++-
 .../pkg/controllers/storage_node_controller.go     |  73 +++-
 .../pkg/reconcile/storagenode/aws/aurora.go        |   2 +-
 .../pkg/reconcile/storagenode/aws/aws.go           |   4 +
 .../pkg/reconcile/storagenode/aws/mocks/aws.go     |  43 ++
 .../pkg/reconcile/storagenode/aws/rdscluster.go    | 197 +++++++++
 .../test/e2e/storage_node_controller_test.go       |   6 +-
 11 files changed, 792 insertions(+), 35 deletions(-)

diff --git a/shardingsphere-operator/api/v1alpha1/storage_node_types.go b/shardingsphere-operator/api/v1alpha1/storage_node_types.go
index e92d78a..1dc1d97 100644
--- a/shardingsphere-operator/api/v1alpha1/storage_node_types.go
+++ b/shardingsphere-operator/api/v1alpha1/storage_node_types.go
@@ -120,10 +120,9 @@ type StorageNodeSpec struct {
 	// if not set, will NOT create database
 	Schema string `json:"schema"`
 	// +optional
-	// only for cluster provider like AWS RDS Cluster/ AWS Aurora Cluster
-	// The Default value is 1 for cluster provider
-	// will not be effective for single instance, instance will always be 1 for single instance
-	// Example: 2, means 2 instances in the cluster(1 primary + 1 reader)
+	// Only for aws aurora storage provider right now. And the default value is 1.
+	// aws rds instance is always 1.
+	// aws rds cluster will auto create 3 instances(1 primary and 2 replicas).
 	// +kubebuilder:default=1
 	Replicas int32 `json:"replicas"`
 }
diff --git a/shardingsphere-operator/api/v1alpha1/storageprovider_types.go b/shardingsphere-operator/api/v1alpha1/storageprovider_types.go
index 8cd579f..02cb15f 100644
--- a/shardingsphere-operator/api/v1alpha1/storageprovider_types.go
+++ b/shardingsphere-operator/api/v1alpha1/storageprovider_types.go
@@ -25,15 +25,16 @@ import (
 // NOTE: json tags are required.  Any new fields you add must have json tags for the fields to be serialized.
 
 const (
-	AnnotationsVPCSecurityGroupIds = "storageproviders.shardingsphere.apache.org/vpc-security-group-ids"
-	AnnotationsSubnetGroupName     = "storageproviders.shardingsphere.apache.org/vpc-subnet-group-name"
-	AnnotationsAvailabilityZones   = "storageproviders.shardingsphere.apache.org/availability-zones"
-	AnnotationsClusterIdentifier   = "storageproviders.shardingsphere.apache.org/cluster-identifier"
-	AnnotationsInstanceIdentifier  = "storageproviders.shardingsphere.apache.org/instance-identifier"
-	AnnotationsInstanceDBName      = "storageproviders.shardingsphere.apache.org/instance-db-name"
-	AnnotationsSnapshotIdentifier  = "storageproviders.shardingsphere.apache.org/snapshot-identifier"
-	AnnotationsMasterUsername      = "storageproviders.shardingsphere.apache.org/master-username"
-	AnnotationsMasterUserPassword  = "storageproviders.shardingsphere.apache.org/master-user-password"
+	AnnotationsVPCSecurityGroupIds     = "storageproviders.shardingsphere.apache.org/vpc-security-group-ids"
+	AnnotationsSubnetGroupName         = "storageproviders.shardingsphere.apache.org/vpc-subnet-group-name"
+	AnnotationsAvailabilityZones       = "storageproviders.shardingsphere.apache.org/availability-zones"
+	AnnotationsClusterIdentifier       = "storageproviders.shardingsphere.apache.org/cluster-identifier"
+	AnnotationsInstanceIdentifier      = "storageproviders.shardingsphere.apache.org/instance-identifier"
+	AnnotationsInstanceDBName          = "storageproviders.shardingsphere.apache.org/instance-db-name"
+	AnnotationsSnapshotIdentifier      = "storageproviders.shardingsphere.apache.org/snapshot-identifier"
+	AnnotationsMasterUsername          = "storageproviders.shardingsphere.apache.org/master-username"
+	AnnotationsMasterUserPassword      = "storageproviders.shardingsphere.apache.org/master-user-password"
+	AnnotationsFinalSnapshotIdentifier = "storageproviders.shardingsphere.apache.org/final-snapshot-identifier"
 
 	ProvisionerAWSRDSInstance = "storageproviders.shardingsphere.apache.org/aws-rds-instance"
 	ProvisionerAWSRDSCluster  = "storageproviders.shardingsphere.apache.org/aws-rds-cluster"
diff --git a/shardingsphere-operator/go.mod b/shardingsphere-operator/go.mod
index e321341..277acdc 100644
--- a/shardingsphere-operator/go.mod
+++ b/shardingsphere-operator/go.mod
@@ -8,7 +8,7 @@ require (
 	github.com/antlr/antlr4 v0.0.0-20181218183524-be58ebffde8e
 	github.com/chaos-mesh/chaos-mesh/api v0.0.0-20230517110555-afab5b4a7813
 	github.com/cloudnative-pg/cloudnative-pg v1.20.0
-	github.com/database-mesh/golang-sdk v0.0.0-20230607082802-81e878c57517
+	github.com/database-mesh/golang-sdk v0.0.0-20230608051131-717115b848ac
 	github.com/go-logr/logr v1.2.4
 	github.com/go-sql-driver/mysql v1.7.1
 	github.com/golang/mock v1.6.0
diff --git a/shardingsphere-operator/go.sum b/shardingsphere-operator/go.sum
index 905c132..06ec0b2 100644
--- a/shardingsphere-operator/go.sum
+++ b/shardingsphere-operator/go.sum
@@ -64,12 +64,10 @@ github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGX
 github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
 github.com/cncf/xds/go v0.0.0-20210312221358-fbca930ec8ed/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
 github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
-github.com/database-mesh/golang-sdk v0.0.0-20230606100535-23037381e4fb h1:p3tpHo24HjA7rW/JMjD9/6klWAVMn4fIefHXKgggVAg=
-github.com/database-mesh/golang-sdk v0.0.0-20230606100535-23037381e4fb/go.mod h1:yUEdo+aGdROl9oC7A1GeDB9/ubUtV2k73uLL+qC3PC4=
-github.com/database-mesh/golang-sdk v0.0.0-20230607065347-ac17e15902a6 h1:qm4zHib+RLX02dX+V8jpY5Zti0GBBZxoIBEceu/AuCQ=
-github.com/database-mesh/golang-sdk v0.0.0-20230607065347-ac17e15902a6/go.mod h1:yUEdo+aGdROl9oC7A1GeDB9/ubUtV2k73uLL+qC3PC4=
-github.com/database-mesh/golang-sdk v0.0.0-20230607082802-81e878c57517 h1:qS2ZfFpV+Vq3um8WYxWkgTpuoYT9ki63H2tA/Tb9SWw=
-github.com/database-mesh/golang-sdk v0.0.0-20230607082802-81e878c57517/go.mod h1:yUEdo+aGdROl9oC7A1GeDB9/ubUtV2k73uLL+qC3PC4=
+github.com/database-mesh/golang-sdk v0.0.0-20230608033816-2b0eafd08bb8 h1:ykLnu04rJx6qLn4QL0XcWpL3AoIGYDYLqRX7qpPB+Rw=
+github.com/database-mesh/golang-sdk v0.0.0-20230608033816-2b0eafd08bb8/go.mod h1:yUEdo+aGdROl9oC7A1GeDB9/ubUtV2k73uLL+qC3PC4=
+github.com/database-mesh/golang-sdk v0.0.0-20230608051131-717115b848ac h1:XxVNCAoMMSaDgKrPKV6jhXsknsXnHJ9PphpE/0DQldY=
+github.com/database-mesh/golang-sdk v0.0.0-20230608051131-717115b848ac/go.mod h1:yUEdo+aGdROl9oC7A1GeDB9/ubUtV2k73uLL+qC3PC4=
 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
 github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
diff --git a/shardingsphere-operator/pkg/controllers/storage_ndoe_controller_test.go b/shardingsphere-operator/pkg/controllers/storage_ndoe_controller_test.go
index fadd47a..f3632a5 100644
--- a/shardingsphere-operator/pkg/controllers/storage_ndoe_controller_test.go
+++ b/shardingsphere-operator/pkg/controllers/storage_ndoe_controller_test.go
@@ -799,7 +799,7 @@ var _ = Describe("StorageNode Controller Mock Test For AWS Aurora", func() {
 				PrimaryEndpoint:     "test-aws-aurora.cluster-xxxxxx.us-east-1.rds.amazonaws.com",
 				ReaderEndpoint:      "test-aws-aurora.cluster-ro-xxxxxx.us-east-1.rds.amazonaws.com",
 				Port:                3306,
-				Status:              dbmesh_rds.DBClusterStatusAvailable,
+				Status:              string(dbmesh_rds.DBClusterStatusAvailable),
 			}
 			descInstance := &dbmesh_rds.DescInstance{
 				DBInstanceIdentifier: "test-aws-aurora-1",
@@ -850,7 +850,7 @@ var _ = Describe("StorageNode Controller Mock Test For AWS Aurora", func() {
 				Status: v1alpha1.StorageNodeStatus{
 					Phase: v1alpha1.StorageNodePhaseReady,
 					Cluster: v1alpha1.ClusterStatus{
-						Status:          dbmesh_rds.DBClusterStatusAvailable,
+						Status:          string(dbmesh_rds.DBClusterStatusAvailable),
 						PrimaryEndpoint: v1alpha1.Endpoint{Address: "test-aws-aurora.cluster-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306},
 						ReaderEndpoints: []v1alpha1.Endpoint{{Address: "test-aws-aurora.cluster-ro-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306}},
 					},
@@ -870,7 +870,7 @@ var _ = Describe("StorageNode Controller Mock Test For AWS Aurora", func() {
 				PrimaryEndpoint:     "test-aws-aurora.cluster-xxxxxx.us-east-1.rds.amazonaws.com",
 				ReaderEndpoint:      "test-aws-aurora.cluster-ro-xxxxxx.us-east-1.rds.amazonaws.com",
 				Port:                3306,
-				Status:              dbmesh_rds.DBClusterStatusAvailable,
+				Status:              string(dbmesh_rds.DBClusterStatusAvailable),
 			}
 
 			descInstance := &dbmesh_rds.DescInstance{
@@ -924,7 +924,7 @@ var _ = Describe("StorageNode Controller Mock Test For AWS Aurora", func() {
 				Status: v1alpha1.StorageNodeStatus{
 					Phase: v1alpha1.StorageNodePhaseDeleting,
 					Cluster: v1alpha1.ClusterStatus{
-						Status:          dbmesh_rds.DBClusterStatusDeleting,
+						Status:          string(dbmesh_rds.DBClusterStatusDeleting),
 						PrimaryEndpoint: v1alpha1.Endpoint{Address: "test-aws-aurora.cluster-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306},
 						ReaderEndpoints: []v1alpha1.Endpoint{{Address: "test-aws-aurora.cluster-ro-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306}},
 					},
@@ -1007,7 +1007,7 @@ var _ = Describe("StorageNode Controller Mock Test For AWS Aurora", func() {
 				Status: v1alpha1.StorageNodeStatus{
 					Phase: v1alpha1.StorageNodePhaseReady,
 					Cluster: v1alpha1.ClusterStatus{
-						Status:          dbmesh_rds.DBClusterStatusAvailable,
+						Status:          string(dbmesh_rds.DBClusterStatusAvailable),
 						PrimaryEndpoint: v1alpha1.Endpoint{Address: "test-aws-aurora.cluster-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306},
 						ReaderEndpoints: []v1alpha1.Endpoint{{Address: "test-aws-aurora.cluster-ro-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306}},
 					},
@@ -1056,7 +1056,7 @@ var _ = Describe("StorageNode Controller Mock Test For AWS Aurora", func() {
 				DBClusterIdentifier: "test-aws-aurora",
 				PrimaryEndpoint:     "test-aws-aurora.cluster-xxxxxx.us-east-1.rds.amazonaws.com",
 				Port:                int32(3306),
-				Status:              dbmesh_rds.DBClusterStatusAvailable,
+				Status:              string(dbmesh_rds.DBClusterStatusAvailable),
 			}, nil).Times(1)
 			// mock get instances of aws aurora are available
 			mockAws.EXPECT().GetInstancesByFilters(gomock.Any(), gomock.Any()).Return([]*dbmesh_rds.DescInstance{
@@ -1145,7 +1145,7 @@ var _ = Describe("StorageNode Controller Mock Test For AWS Aurora", func() {
 				DBClusterIdentifier: "test-aws-aurora",
 				PrimaryEndpoint:     "test-aws-aurora.cluster-xxxxxx.us-east-1.rds.amazonaws.com",
 				Port:                int32(3306),
-				Status:              dbmesh_rds.DBClusterStatusAvailable,
+				Status:              string(dbmesh_rds.DBClusterStatusAvailable),
 				DBClusterMembers: []dbmesh_rds.ClusterMember{
 					{DBInstanceIdentifier: "test-aws-aurora-instance-0"},
 					{DBInstanceIdentifier: "test-aws-aurora-instance-1"},
@@ -1183,5 +1183,455 @@ var _ = Describe("StorageNode Controller Mock Test For AWS Aurora", func() {
 			Expect(storageNode.Status.Registered).To(BeFalse())
 		})
 	})
+})
+
+var _ = Describe("StorageNode Controller Mock Test For AWS RDS Cluster", func() {
+	var provider *v1alpha1.StorageProvider
+	var providerName = "aws-rds-cluster"
+	BeforeEach(func() {
+		provider = &v1alpha1.StorageProvider{
+			ObjectMeta: metav1.ObjectMeta{
+				Name: providerName,
+			},
+			Spec: v1alpha1.StorageProviderSpec{
+				Provisioner: v1alpha1.ProvisionerAWSRDSCluster,
+				Parameters: map[string]string{
+					"engine":             "mysql",
+					"engineVersion":      "5.7",
+					"masterUsername":     "root",
+					"masterUserPassword": "root",
+					"allocatedStorage":   "20",
+				},
+			},
+		}
+		Expect(fakeClient.Create(ctx, provider)).Should(Succeed())
+
+		// mock aws client
+		// mock aws rds client
+		mockCtrl = gomock.NewController(GinkgoT())
+		mockAws = mock_aws.NewMockIRdsClient(mockCtrl)
+		monkey.Patch(aws.NewRdsClient, func(rds dbmesh_rds.RDS) aws.IRdsClient {
+			return mockAws
+		})
+		mockSS = mock_shardingsphere.NewMockIServer(mockCtrl)
+		monkey.Patch(shardingsphere.NewServer, func(_, _ string, _ uint, _, _ string) (shardingsphere.IServer, error) {
+			return mockSS, nil
+		})
+	})
+
+	AfterEach(func() {
+		mockCtrl.Finish()
+		monkey.UnpatchAll()
+	})
+
+	Context("reconcile storage node", func() {
+		It("should success when aws rds cluster is not exits", func() {
+			name := "test-aws-rds-cluster-not-exists"
+			namespacedName := types.NamespacedName{
+				Name:      name,
+				Namespace: defaultTestNamespace,
+			}
+			storageNode := v1alpha1.StorageNode{
+				ObjectMeta: metav1.ObjectMeta{
+					Name:      name,
+					Namespace: defaultTestNamespace,
+					Annotations: map[string]string{
+						v1alpha1.AnnotationsClusterIdentifier: "test-aws-rds-cluster",
+					},
+				},
+				Spec: v1alpha1.StorageNodeSpec{
+					StorageProviderName: providerName,
+				},
+			}
+			Expect(fakeClient.Create(ctx, &storageNode)).Should(Succeed())
+
+			descCluster := &dbmesh_rds.DescCluster{
+				DBClusterIdentifier: "test-aws-rds-cluster",
+				PrimaryEndpoint:     "test-aws-rds-cluster.cluster-xxxxxx.us-east-1.rds.amazonaws.com",
+				ReaderEndpoint:      "test-aws-rds-cluster.cluster-ro-xxxxxx.us-east-1.rds.amazonaws.com",
+				Port:                3306,
+				Status:              string(dbmesh_rds.DBClusterStatusAvailable),
+			}
+			descInstance := &dbmesh_rds.DescInstance{
+				DBInstanceIdentifier: "test-aws-rds-cluster-instance-1",
+				DBClusterIdentifier:  "test-aws-rds-cluster",
+				Endpoint: dbmesh_rds.Endpoint{
+					Address: "test-aws-rds-cluster-instance-1.cluster-xxxxxx.us-east-1.rds.amazonaws.com",
+					Port:    3306,
+				},
+				DBInstanceStatus: dbmesh_rds.DBInstanceStatusAvailable,
+			}
+
+			// mock aws rds cluster is not exist
+			mockAws.EXPECT().GetRDSCluster(gomock.Any(), gomock.Any()).Return(nil, nil).Times(1)
+			// mock create aws aurora cluster
+			mockAws.EXPECT().CreateRDSCluster(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil)
+			// mock aws aurora cluster is created
+			mockAws.EXPECT().GetRDSCluster(gomock.Any(), gomock.Any()).Return(descCluster, nil).Times(1)
+			// mock aws instance is created
+			mockAws.EXPECT().GetInstancesByFilters(gomock.Any(), gomock.Any()).Return([]*dbmesh_rds.DescInstance{descInstance}, nil).Times(1)
+
+			req := ctrl.Request{NamespacedName: namespacedName}
+			_, err := reconciler.Reconcile(ctx, req)
+			Expect(err).To(BeNil())
+			sn := &v1alpha1.StorageNode{}
+			Expect(fakeClient.Get(ctx, namespacedName, sn)).Should(Succeed())
+			Expect(sn.Status.Phase).To(Equal(v1alpha1.StorageNodePhaseReady))
+		})
+
+		It("should success when storage node been delete", func() {
+			name := "test-aws-rds-cluster-deleted"
+			namespacedName := types.NamespacedName{
+				Name:      name,
+				Namespace: defaultTestNamespace,
+			}
+			req := ctrl.Request{NamespacedName: namespacedName}
+			storageNode := &v1alpha1.StorageNode{
+				ObjectMeta: metav1.ObjectMeta{
+					Name:      name,
+					Namespace: defaultTestNamespace,
+					Annotations: map[string]string{
+						v1alpha1.AnnotationsClusterIdentifier: "test-aws-rds-cluster",
+					},
+					Finalizers: []string{FinalizerName},
+				},
+				Spec: v1alpha1.StorageNodeSpec{
+					StorageProviderName: providerName,
+				},
+				Status: v1alpha1.StorageNodeStatus{
+					Phase: v1alpha1.StorageNodePhaseReady,
+					Cluster: v1alpha1.ClusterStatus{
+						Status:          string(dbmesh_rds.DBClusterStatusAvailable),
+						PrimaryEndpoint: v1alpha1.Endpoint{Address: "test-aws-rds-cluster.cluster-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306},
+						ReaderEndpoints: []v1alpha1.Endpoint{{Address: "test-aws-rds-cluster.cluster-ro-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306}},
+					},
+					Instances: []v1alpha1.InstanceStatus{
+						{
+							Status:   string(dbmesh_rds.DBInstanceStatusAvailable),
+							Endpoint: v1alpha1.Endpoint{Address: "test-aws-rds-cluster-instance-1.cluster-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306},
+						},
+					},
+				},
+			}
+
+			Expect(fakeClient.Create(ctx, storageNode)).Should(Succeed())
+
+			descCluster := &dbmesh_rds.DescCluster{
+				DBClusterIdentifier: "test-aws-rds-cluster",
+				PrimaryEndpoint:     "test-aws-rds-cluster.cluster-xxxxxx.us-east-1.rds.amazonaws.com",
+				ReaderEndpoint:      "test-aws-rds-cluster.cluster-ro-xxxxxx.us-east-1.rds.amazonaws.com",
+				Port:                3306,
+				Status:              string(dbmesh_rds.DBClusterStatusAvailable),
+			}
+
+			descInstance := &dbmesh_rds.DescInstance{
+				DBInstanceIdentifier: "test-aws-rds-cluster-1",
+				DBClusterIdentifier:  "test-aws-rds-cluster",
+				Endpoint: dbmesh_rds.Endpoint{
+					Address: "test-aws-rds-cluster-1.cluster-xxxxxx.us-east-1.rds.amazonaws.com",
+					Port:    3306,
+				},
+				DBInstanceStatus: dbmesh_rds.DBInstanceStatusAvailable,
+			}
+
+			Expect(fakeClient.Delete(ctx, storageNode)).Should(Succeed())
+
+			// mock aws rds cluster is exists
+			mockAws.EXPECT().GetRDSCluster(gomock.Any(), gomock.Any()).Return(descCluster, nil).Times(1)
+			// mock get instances of aws rds cluster
+			mockAws.EXPECT().GetInstancesByFilters(gomock.Any(), gomock.Any()).Return([]*dbmesh_rds.DescInstance{descInstance}, nil).Times(1)
+			// mock delete aws rds cluster
+			mockAws.EXPECT().DeleteRDSCluster(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil)
+
+			_, err := reconciler.Reconcile(ctx, req)
+			Expect(err).To(BeNil())
+
+			Expect(fakeClient.Get(ctx, namespacedName, storageNode)).Should(Succeed())
+			Expect(storageNode.DeletionTimestamp).NotTo(BeNil())
+			Expect(storageNode.Status.Phase).To(Equal(v1alpha1.StorageNodePhaseDeleting))
+		})
+
+		It("should be success when storage node is deleting", func() {
+			name := "test-aws-rds-cluster-deleting"
+			namespacedName := types.NamespacedName{
+				Name:      name,
+				Namespace: defaultTestNamespace,
+			}
+			req := ctrl.Request{NamespacedName: namespacedName}
+			deletionTimestamp := metav1.Now()
+			storageNode := &v1alpha1.StorageNode{
+				ObjectMeta: metav1.ObjectMeta{
+					Name:      name,
+					Namespace: defaultTestNamespace,
+					Annotations: map[string]string{
+						v1alpha1.AnnotationsClusterIdentifier: "test-aws-rds-cluster",
+					},
+					Finalizers:        []string{FinalizerName},
+					DeletionTimestamp: &deletionTimestamp,
+				},
+				Spec: v1alpha1.StorageNodeSpec{
+					StorageProviderName: providerName,
+				},
+				Status: v1alpha1.StorageNodeStatus{
+					Phase: v1alpha1.StorageNodePhaseDeleting,
+					Cluster: v1alpha1.ClusterStatus{
+						Status:          string(dbmesh_rds.DBClusterStatusDeleting),
+						PrimaryEndpoint: v1alpha1.Endpoint{Address: "test-aws-rds-cluster.cluster-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306},
+						ReaderEndpoints: []v1alpha1.Endpoint{{Address: "test-aws-rds-cluster.cluster-ro-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306}},
+					},
+					Instances: []v1alpha1.InstanceStatus{
+						{
+							Status:   string(dbmesh_rds.DBInstanceStatusDeleting),
+							Endpoint: v1alpha1.Endpoint{Address: "test-aws-rds-cluster-1.cluster-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306},
+						},
+					},
+				},
+			}
+			Expect(fakeClient.Create(ctx, storageNode)).Should(Succeed())
+
+			// mock aws rds cluster is not exists
+			mockAws.EXPECT().GetRDSCluster(gomock.Any(), gomock.Any()).Return(nil, nil).Times(1)
+			// mock get instances of aws rds cluster is not exists
+			mockAws.EXPECT().GetInstancesByFilters(gomock.Any(), gomock.Any()).Return(nil, nil).Times(1)
+
+			_, err := reconciler.Reconcile(ctx, req)
+			Expect(err).To(BeNil())
+			Expect(fakeClient.Get(ctx, namespacedName, storageNode)).Should(Succeed())
+			Expect(storageNode.Status.Phase).To(Equal(v1alpha1.StorageNodePhaseDeleteComplete))
+		})
 
+		It("should be success when storage node is delete completed", func() {
+			name := "test-aws-rds-cluster-delete-completed"
+			namespacedName := types.NamespacedName{
+				Name:      name,
+				Namespace: defaultTestNamespace,
+			}
+			req := ctrl.Request{NamespacedName: namespacedName}
+			deletionTimestamp := metav1.Now()
+			storageNode := &v1alpha1.StorageNode{
+				ObjectMeta: metav1.ObjectMeta{
+					Name:      name,
+					Namespace: defaultTestNamespace,
+					Annotations: map[string]string{
+						v1alpha1.AnnotationsClusterIdentifier: "test-aws-rds-cluster",
+					},
+					Finalizers:        []string{FinalizerName},
+					DeletionTimestamp: &deletionTimestamp,
+				},
+				Spec: v1alpha1.StorageNodeSpec{
+					StorageProviderName: providerName,
+				},
+				Status: v1alpha1.StorageNodeStatus{
+					Phase: v1alpha1.StorageNodePhaseDeleteComplete,
+				},
+			}
+			Expect(fakeClient.Create(ctx, storageNode)).Should(Succeed())
+
+			_, err := reconciler.Reconcile(ctx, req)
+			Expect(err).To(BeNil())
+			err = fakeClient.Get(ctx, namespacedName, storageNode)
+			Expect(apierrors.IsNotFound(err)).To(BeTrue())
+		})
+
+		It("should be success when storage node is ready for register", func() {
+			name := "test-aws-rds-cluster-ready-for-register"
+			namespacedName := types.NamespacedName{
+				Name:      name,
+				Namespace: defaultTestNamespace,
+			}
+			req := ctrl.Request{NamespacedName: namespacedName}
+			storageNode := &v1alpha1.StorageNode{
+				ObjectMeta: metav1.ObjectMeta{
+					Name:      name,
+					Namespace: defaultTestNamespace,
+					Annotations: map[string]string{
+						v1alpha1.AnnotationsClusterIdentifier:   "test-aws-rds-cluster",
+						AnnotationKeyRegisterStorageUnitEnabled: "true",
+						AnnotationKeyLogicDatabaseName:          "test-logic-db",
+						v1alpha1.AnnotationsInstanceDBName:      "test-instance-db",
+						AnnotationKeyComputeNodeName:            "test-compute-node",
+					},
+				},
+				Spec: v1alpha1.StorageNodeSpec{
+					StorageProviderName: providerName,
+				},
+				Status: v1alpha1.StorageNodeStatus{
+					Phase: v1alpha1.StorageNodePhaseReady,
+					Cluster: v1alpha1.ClusterStatus{
+						Status:          string(dbmesh_rds.DBClusterStatusAvailable),
+						PrimaryEndpoint: v1alpha1.Endpoint{Address: "test-aws-rds-cluster.cluster-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306},
+						ReaderEndpoints: []v1alpha1.Endpoint{{Address: "test-aws-rds-cluster.cluster-ro-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306}},
+					},
+					Instances: []v1alpha1.InstanceStatus{
+						{
+							Status:   string(dbmesh_rds.DBInstanceStatusAvailable),
+							Endpoint: v1alpha1.Endpoint{Address: "test-aws-rds-cluster-1.cluster-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306},
+						},
+					},
+				},
+			}
+			cn := &v1alpha1.ComputeNode{
+				ObjectMeta: metav1.ObjectMeta{
+					Name:      "test-compute-node",
+					Namespace: defaultTestNamespace,
+				},
+				Spec: v1alpha1.ComputeNodeSpec{
+					Bootstrap: v1alpha1.BootstrapConfig{
+						ServerConfig: v1alpha1.ServerConfig{
+							Authority: v1alpha1.ComputeNodeAuthority{
+								Users: []v1alpha1.ComputeNodeUser{{User: "test-user", Password: "test-password"}},
+							},
+							Mode:  v1alpha1.ComputeNodeServerMode{},
+							Props: nil,
+						},
+					},
+				},
+			}
+			svc := &corev1.Service{
+				ObjectMeta: metav1.ObjectMeta{
+					Name:      "test-compute-node",
+					Namespace: defaultTestNamespace,
+				},
+				Spec: corev1.ServiceSpec{
+					Ports: []corev1.ServicePort{
+						{Name: "http", Protocol: "TCP", Port: 3307},
+					},
+				},
+			}
+			Expect(fakeClient.Create(ctx, storageNode)).Should(Succeed())
+			Expect(fakeClient.Create(ctx, cn)).Should(Succeed())
+			Expect(fakeClient.Create(ctx, svc)).Should(Succeed())
+
+			// mock aws rds cluster is available
+			mockAws.EXPECT().GetRDSCluster(gomock.Any(), gomock.Any()).Return(&dbmesh_rds.DescCluster{
+				DBClusterIdentifier: "test-aws-rds-cluster",
+				PrimaryEndpoint:     "test-aws-rds-cluster.cluster-xxxxxx.us-east-1.rds.amazonaws.com",
+				Port:                int32(3306),
+				Status:              string(dbmesh_rds.DBClusterStatusAvailable),
+			}, nil).Times(1)
+			// mock get instances of aws aurora are available
+			mockAws.EXPECT().GetInstancesByFilters(gomock.Any(), gomock.Any()).Return([]*dbmesh_rds.DescInstance{
+				{
+					DBInstanceIdentifier: "test-aws-rds-cluster-instance-0",
+					DBInstanceStatus:     dbmesh_rds.DBInstanceStatusAvailable,
+					Endpoint:             dbmesh_rds.Endpoint{Address: "test-aws-rds-cluster-1.cluster-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306},
+				},
+			}, nil).Times(1)
+
+			host, port, username, password := getDatasourceInfoFromCluster(storageNode, provider)
+
+			// mock shardingsphere
+			mockSS.EXPECT().CreateDatabase(gomock.Any()).Return(nil).Times(1)
+			mockSS.EXPECT().RegisterStorageUnit("test-logic-db", getDSName(storageNode), host, uint(port), "test-instance-db", username, password).Return(nil).Times(1)
+			mockSS.EXPECT().Close().Return(nil).Times(1)
+
+			_, err := reconciler.Reconcile(ctx, req)
+			Expect(err).To(BeNil())
+
+			err = fakeClient.Get(ctx, namespacedName, storageNode)
+			Expect(storageNode.Status.Registered).To(BeTrue())
+		})
+
+		It("should be success unregistered when storage node is deleting", func() {
+			snName := "test-aws-rds-cluster-unregistered"
+			namespacedName := types.NamespacedName{
+				Name:      snName,
+				Namespace: defaultTestNamespace,
+			}
+			req := ctrl.Request{NamespacedName: namespacedName}
+
+			storageNode := &v1alpha1.StorageNode{
+				ObjectMeta: metav1.ObjectMeta{
+					Name:      snName,
+					Namespace: defaultTestNamespace,
+					Annotations: map[string]string{
+						v1alpha1.AnnotationsClusterIdentifier:   "test-aws-rds-cluster",
+						AnnotationKeyRegisterStorageUnitEnabled: "true",
+						AnnotationKeyLogicDatabaseName:          "test-logic-db",
+						v1alpha1.AnnotationsInstanceDBName:      "test-instance-db",
+						AnnotationKeyComputeNodeName:            "test-compute-node",
+					},
+				},
+				Spec: v1alpha1.StorageNodeSpec{
+					StorageProviderName: providerName,
+					Replicas:            2,
+				},
+				Status: v1alpha1.StorageNodeStatus{
+					Phase:      v1alpha1.StorageNodePhaseReady,
+					Registered: true,
+				},
+			}
+			cn := &v1alpha1.ComputeNode{
+				ObjectMeta: metav1.ObjectMeta{
+					Name:      "test-compute-node",
+					Namespace: defaultTestNamespace,
+				},
+				Spec: v1alpha1.ComputeNodeSpec{
+					Bootstrap: v1alpha1.BootstrapConfig{
+						ServerConfig: v1alpha1.ServerConfig{
+							Authority: v1alpha1.ComputeNodeAuthority{
+								Users: []v1alpha1.ComputeNodeUser{{User: "test-user", Password: "test-password"}},
+							},
+						},
+					},
+				},
+			}
+			svc := &corev1.Service{
+				ObjectMeta: metav1.ObjectMeta{
+					Name:      "test-compute-node",
+					Namespace: defaultTestNamespace,
+				},
+				Spec: corev1.ServiceSpec{
+					Ports: []corev1.ServicePort{
+						{Name: "http", Protocol: "TCP", Port: 3307},
+					},
+				},
+			}
+			Expect(fakeClient.Create(ctx, storageNode)).Should(Succeed())
+			Expect(fakeClient.Create(ctx, cn)).Should(Succeed())
+			Expect(fakeClient.Create(ctx, svc)).Should(Succeed())
+
+			// mock aws rds cluster is available
+			mockAws.EXPECT().GetRDSCluster(gomock.Any(), gomock.Any()).Return(&dbmesh_rds.DescCluster{
+				DBClusterIdentifier: "test-aws-rds-cluster",
+				PrimaryEndpoint:     "test-aws-rds-cluster.cluster-xxxxxx.us-east-1.rds.amazonaws.com",
+				Port:                int32(3306),
+				Status:              string(dbmesh_rds.DBClusterStatusAvailable),
+				DBClusterMembers: []dbmesh_rds.ClusterMember{
+					{DBInstanceIdentifier: "test-aws-rds-cluster-instance-0"},
+					{DBInstanceIdentifier: "test-aws-rds-cluster-instance-1"},
+				},
+			}, nil).Times(2)
+			// mock get instances of aws aurora are available
+			mockAws.EXPECT().GetInstancesByFilters(gomock.Any(), gomock.Any()).Return([]*dbmesh_rds.DescInstance{
+				{
+					DBInstanceIdentifier: "test-aws-rds-cluster-instance-0",
+					DBInstanceStatus:     dbmesh_rds.DBInstanceStatusAvailable,
+					Endpoint:             dbmesh_rds.Endpoint{Address: "test-aws-rds-cluster-1.cluster-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306},
+				},
+				{
+					DBInstanceIdentifier: "test-aws-rds-cluster-instance-1",
+					DBInstanceStatus:     dbmesh_rds.DBInstanceStatusAvailable,
+					Endpoint:             dbmesh_rds.Endpoint{Address: "test-aws-rds-cluster-2.cluster-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306},
+				},
+			}, nil).Times(2)
+
+			_, err := reconciler.Reconcile(ctx, req)
+			Expect(err).To(BeNil())
+
+			Expect(fakeClient.Delete(ctx, storageNode)).Should(Succeed())
+			// mock shardingsphere
+			mockSS.EXPECT().UnRegisterStorageUnit("test-logic-db", getDSName(storageNode)).Return(nil).Times(1)
+			mockSS.EXPECT().Close().Return(nil).Times(1)
+
+			// mock delete aws rds cluster
+			mockAws.EXPECT().DeleteRDSCluster(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1)
+
+			_, err = reconciler.Reconcile(ctx, req)
+			Expect(err).To(BeNil())
+
+			err = fakeClient.Get(ctx, namespacedName, storageNode)
+			Expect(storageNode.Status.Registered).To(BeFalse())
+		})
+	})
 })
diff --git a/shardingsphere-operator/pkg/controllers/storage_node_controller.go b/shardingsphere-operator/pkg/controllers/storage_node_controller.go
index 178042c..39aa7d1 100644
--- a/shardingsphere-operator/pkg/controllers/storage_node_controller.go
+++ b/shardingsphere-operator/pkg/controllers/storage_node_controller.go
@@ -164,6 +164,11 @@ func (r *StorageNodeReconciler) reconcile(ctx context.Context, storageProvider *
 			r.Recorder.Eventf(node, corev1.EventTypeWarning, "Reconcile Failed", fmt.Sprintf("unable to reconcile AWS RDS Instance %s/%s, err:%s", node.GetNamespace(), node.GetName(), err.Error()))
 			return ctrl.Result{RequeueAfter: defaultRequeueTime}, err
 		}
+	case v1alpha1.ProvisionerAWSRDSCluster:
+		if err := r.reconcileAwsRDSCluster(ctx, aws.NewRdsClient(r.AwsRDS), node, storageProvider); err != nil {
+			r.Recorder.Eventf(node, corev1.EventTypeWarning, "Reconcile Failed", fmt.Sprintf("unable to reconcile AWS RDS Cluster %s/%s, err:%s", node.GetNamespace(), node.GetName(), err.Error()))
+			return ctrl.Result{RequeueAfter: defaultRequeueTime}, err
+		}
 	case v1alpha1.ProvisionerAWSAurora:
 		if err := r.reconcileAwsAurora(ctx, aws.NewRdsClient(r.AwsRDS), node, storageProvider); err != nil {
 			r.Recorder.Eventf(node, corev1.EventTypeWarning, "Reconcile Failed", fmt.Sprintf("unable to reconcile AWS Aurora %s/%s, err:%s", node.GetNamespace(), node.GetName(), err.Error()))
@@ -215,7 +220,9 @@ func (r *StorageNodeReconciler) getStorageProvider(ctx context.Context, node *v1
 
 	// check provisioner
 	// aws-like provisioner need aws rds client
-	if storageProvider.Spec.Provisioner == v1alpha1.ProvisionerAWSRDSInstance || storageProvider.Spec.Provisioner == v1alpha1.ProvisionerAWSAurora {
+	if storageProvider.Spec.Provisioner == v1alpha1.ProvisionerAWSRDSInstance ||
+		storageProvider.Spec.Provisioner == v1alpha1.ProvisionerAWSAurora ||
+		storageProvider.Spec.Provisioner == v1alpha1.ProvisionerAWSRDSCluster {
 		if r.AwsRDS == nil {
 			r.Recorder.Event(node, corev1.EventTypeWarning, "AwsRdsClientIsNil", "aws rds client is nil, please check your aws credentials")
 			return nil, fmt.Errorf("aws rds client is nil, please check your aws credentials")
@@ -238,7 +245,7 @@ func computeDesiredState(status v1alpha1.StorageNodeStatus) v1alpha1.StorageNode
 		}
 	} else {
 		// If the storage node is not being deleted, check if all instances are ready.
-		if (clusterStatus == "" || clusterStatus == rds.DBClusterStatusAvailable) && allInstancesReady(status.Instances) {
+		if (clusterStatus == "" || clusterStatus == string(rds.DBClusterStatusAvailable)) && allInstancesReady(status.Instances) {
 			desiredState.Phase = v1alpha1.StorageNodePhaseReady
 		} else {
 			desiredState.Phase = v1alpha1.StorageNodePhaseNotReady
@@ -256,7 +263,7 @@ func computeNewConditions(desiredState, status v1alpha1.StorageNodeStatus, clust
 
 	// Update the cluster ready condition if the cluster status is not empty
 	if clusterStatus != "" {
-		if clusterStatus == rds.DBClusterStatusAvailable {
+		if clusterStatus == string(rds.DBClusterStatusAvailable) {
 			newSNConditions.UpsertCondition(&v1alpha1.StorageNodeCondition{
 				Type:           v1alpha1.StorageNodeConditionTypeClusterReady,
 				Status:         corev1.ConditionTrue,
@@ -372,6 +379,34 @@ func updateAWSRDSInstanceStatus(node *v1alpha1.StorageNode, instance *rds.DescIn
 	return nil
 }
 
+func (r *StorageNodeReconciler) reconcileAwsRDSCluster(ctx context.Context, client aws.IRdsClient, node *v1alpha1.StorageNode, storageProvider *v1alpha1.StorageProvider) error {
+	ac, err := client.GetRDSCluster(ctx, node)
+	if err != nil {
+		return err
+	}
+
+	if ac == nil {
+		// create instance
+		err = client.CreateRDSCluster(ctx, node, storageProvider.Spec.Parameters)
+		if err != nil {
+			return err
+		}
+		ac, err = client.GetRDSCluster(ctx, node)
+		if err != nil {
+			return err
+		}
+	}
+
+	// TODO: reconcile instance of aurora
+
+	// update storage node status
+	if err := updateClusterStatus(ctx, client, node, ac); err != nil {
+		return fmt.Errorf("updateClusterStatus failed: %w", err)
+	}
+
+	return nil
+}
+
 func (r *StorageNodeReconciler) reconcileAwsAurora(ctx context.Context, client aws.IRdsClient, node *v1alpha1.StorageNode, storageProvider *v1alpha1.StorageProvider) error {
 	r.Log.Info("reconcileAwsAurora", "node", node.GetName(), "phase", node.Status.Phase)
 	ac, err := client.GetAuroraCluster(ctx, node)
@@ -451,6 +486,10 @@ func (r *StorageNodeReconciler) deleteDatabaseCluster(ctx context.Context, node
 		if err := r.deleteAWSRDSInstance(ctx, aws.NewRdsClient(r.AwsRDS), node, storageProvider); err != nil {
 			return fmt.Errorf("delete aws rds instance failed: %w", err)
 		}
+	case v1alpha1.ProvisionerAWSRDSCluster:
+		if err := r.deleteAWSRDSCluster(ctx, aws.NewRdsClient(r.AwsRDS), node, storageProvider); err != nil {
+			return fmt.Errorf("delete aws rds cluster failed: %w", err)
+		}
 	case v1alpha1.ProvisionerAWSAurora:
 		if err := r.deleteAWSAurora(ctx, aws.NewRdsClient(r.AwsRDS), node, storageProvider); err != nil {
 			return fmt.Errorf("delete aws aurora cluster failed: %w", err)
@@ -487,6 +526,32 @@ func (r *StorageNodeReconciler) deleteAWSRDSInstance(ctx context.Context, client
 	return nil
 }
 
+// nolint:dupl
+func (r *StorageNodeReconciler) deleteAWSRDSCluster(ctx context.Context, client aws.IRdsClient, node *v1alpha1.StorageNode, storageProvider *v1alpha1.StorageProvider) error {
+	if node.Annotations[v1alpha1.AnnotationsClusterIdentifier] == "" {
+		return nil
+	}
+
+	cluster, err := client.GetRDSCluster(ctx, node)
+	if err != nil {
+		return fmt.Errorf("get rds cluster failed: %w", err)
+	}
+	if cluster != nil && cluster.Status != string(rds.DBClusterStatusDeleting) {
+		if err := client.DeleteRDSCluster(ctx, node, storageProvider); err != nil {
+			r.Recorder.Eventf(node, corev1.EventTypeWarning, "DeleteFailed", "Failed to delete rds cluster %s: %s", node.Annotations[v1alpha1.AnnotationsClusterIdentifier], err.Error())
+			return err
+		}
+		r.Recorder.Event(node, corev1.EventTypeNormal, "Deleting", fmt.Sprintf("rds cluster %s is deleting", node.Annotations[v1alpha1.AnnotationsClusterIdentifier]))
+	}
+
+	// update storage node status
+	if err := updateClusterStatus(ctx, client, node, cluster); err != nil {
+		return fmt.Errorf("updateClusterStatus failed: %w", err)
+	}
+	return nil
+}
+
+// nolint:dupl
 func (r *StorageNodeReconciler) deleteAWSAurora(ctx context.Context, client aws.IRdsClient, node *v1alpha1.StorageNode, storageProvider *v1alpha1.StorageProvider) error {
 	if node.Annotations[v1alpha1.AnnotationsClusterIdentifier] == "" {
 		return nil
@@ -496,7 +561,7 @@ func (r *StorageNodeReconciler) deleteAWSAurora(ctx context.Context, client aws.
 	if err != nil {
 		return fmt.Errorf("get aurora cluster failed: %w", err)
 	}
-	if auroraCluster != nil && auroraCluster.Status != rds.DBClusterStatusDeleting {
+	if auroraCluster != nil && auroraCluster.Status != string(rds.DBClusterStatusDeleting) {
 		if err := client.DeleteAuroraCluster(ctx, node, storageProvider); err != nil {
 			r.Recorder.Eventf(node, corev1.EventTypeWarning, "DeleteFailed", "Failed to delete aurora cluster %s: %s", node.Annotations[v1alpha1.AnnotationsClusterIdentifier], err.Error())
 			return err
diff --git a/shardingsphere-operator/pkg/reconcile/storagenode/aws/aurora.go b/shardingsphere-operator/pkg/reconcile/storagenode/aws/aurora.go
index 90df91e..42e3cbe 100644
--- a/shardingsphere-operator/pkg/reconcile/storagenode/aws/aurora.go
+++ b/shardingsphere-operator/pkg/reconcile/storagenode/aws/aurora.go
@@ -120,8 +120,8 @@ func (c *RdsClient) DeleteAuroraCluster(ctx context.Context, node *v1alpha1.Stor
 	case v1alpha1.StorageReclaimPolicyRetain:
 		aurora.SetDeleteAutomateBackups(false).SetSkipFinalSnapshot(true)
 	case v1alpha1.StorageReclaimPolicyDeleteWithFinalSnapshot:
+		// TODO set final snapshot name
 		aurora.SetDeleteAutomateBackups(true).SetSkipFinalSnapshot(false)
 	}
-
 	return aurora.Delete(ctx)
 }
diff --git a/shardingsphere-operator/pkg/reconcile/storagenode/aws/aws.go b/shardingsphere-operator/pkg/reconcile/storagenode/aws/aws.go
index cf90112..0fb62d4 100644
--- a/shardingsphere-operator/pkg/reconcile/storagenode/aws/aws.go
+++ b/shardingsphere-operator/pkg/reconcile/storagenode/aws/aws.go
@@ -39,6 +39,10 @@ type IRdsClient interface {
 	GetInstancesByFilters(ctx context.Context, filters map[string][]string) (instances []*rds.DescInstance, err error)
 	DeleteInstance(ctx context.Context, node *v1alpha1.StorageNode, storageProvider *v1alpha1.StorageProvider) error
 
+	CreateRDSCluster(ctx context.Context, node *v1alpha1.StorageNode, params map[string]string) error
+	GetRDSCluster(ctx context.Context, node *v1alpha1.StorageNode) (cluster *rds.DescCluster, err error)
+	DeleteRDSCluster(ctx context.Context, node *v1alpha1.StorageNode, storageProvider *v1alpha1.StorageProvider) error
+
 	CreateAuroraCluster(ctx context.Context, node *v1alpha1.StorageNode, params map[string]string) error
 	GetAuroraCluster(ctx context.Context, node *v1alpha1.StorageNode) (cluster *rds.DescCluster, err error)
 	DeleteAuroraCluster(ctx context.Context, node *v1alpha1.StorageNode, storageProvider *v1alpha1.StorageProvider) error
diff --git a/shardingsphere-operator/pkg/reconcile/storagenode/aws/mocks/aws.go b/shardingsphere-operator/pkg/reconcile/storagenode/aws/mocks/aws.go
index 060125d..30fc045 100644
--- a/shardingsphere-operator/pkg/reconcile/storagenode/aws/mocks/aws.go
+++ b/shardingsphere-operator/pkg/reconcile/storagenode/aws/mocks/aws.go
@@ -92,6 +92,20 @@ func (mr *MockIRdsClientMockRecorder) CreateInstance(ctx, node, params interface
 	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateInstance", reflect.TypeOf((*MockIRdsClient)(nil).CreateInstance), ctx, node, params)
 }
 
+// CreateRDSCluster mocks base method.
+func (m *MockIRdsClient) CreateRDSCluster(ctx context.Context, node *v1alpha1.StorageNode, params map[string]string) error {
+	m.ctrl.T.Helper()
+	ret := m.ctrl.Call(m, "CreateRDSCluster", ctx, node, params)
+	ret0, _ := ret[0].(error)
+	return ret0
+}
+
+// CreateRDSCluster indicates an expected call of CreateRDSCluster.
+func (mr *MockIRdsClientMockRecorder) CreateRDSCluster(ctx, node, params interface{}) *gomock.Call {
+	mr.mock.ctrl.T.Helper()
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateRDSCluster", reflect.TypeOf((*MockIRdsClient)(nil).CreateRDSCluster), ctx, node, params)
+}
+
 // DeleteAuroraCluster mocks base method.
 func (m *MockIRdsClient) DeleteAuroraCluster(ctx context.Context, node *v1alpha1.StorageNode, storageProvider *v1alpha1.StorageProvider) error {
 	m.ctrl.T.Helper()
@@ -120,6 +134,20 @@ func (mr *MockIRdsClientMockRecorder) DeleteInstance(ctx, node, storageProvider
 	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteInstance", reflect.TypeOf((*MockIRdsClient)(nil).DeleteInstance), ctx, node, storageProvider)
 }
 
+// DeleteRDSCluster mocks base method.
+func (m *MockIRdsClient) DeleteRDSCluster(ctx context.Context, node *v1alpha1.StorageNode, storageProvider *v1alpha1.StorageProvider) error {
+	m.ctrl.T.Helper()
+	ret := m.ctrl.Call(m, "DeleteRDSCluster", ctx, node, storageProvider)
+	ret0, _ := ret[0].(error)
+	return ret0
+}
+
+// DeleteRDSCluster indicates an expected call of DeleteRDSCluster.
+func (mr *MockIRdsClientMockRecorder) DeleteRDSCluster(ctx, node, storageProvider interface{}) *gomock.Call {
+	mr.mock.ctrl.T.Helper()
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteRDSCluster", reflect.TypeOf((*MockIRdsClient)(nil).DeleteRDSCluster), ctx, node, storageProvider)
+}
+
 // GetAuroraCluster mocks base method.
 func (m *MockIRdsClient) GetAuroraCluster(ctx context.Context, node *v1alpha1.StorageNode) (*rds.DescCluster, error) {
 	m.ctrl.T.Helper()
@@ -180,6 +208,21 @@ func (mr *MockIRdsClientMockRecorder) GetInstancesByFilters(ctx, filters interfa
 	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetInstancesByFilters", reflect.TypeOf((*MockIRdsClient)(nil).GetInstancesByFilters), ctx, filters)
 }
 
+// GetRDSCluster mocks base method.
+func (m *MockIRdsClient) GetRDSCluster(ctx context.Context, node *v1alpha1.StorageNode) (*rds.DescCluster, error) {
+	m.ctrl.T.Helper()
+	ret := m.ctrl.Call(m, "GetRDSCluster", ctx, node)
+	ret0, _ := ret[0].(*rds.DescCluster)
+	ret1, _ := ret[1].(error)
+	return ret0, ret1
+}
+
+// GetRDSCluster indicates an expected call of GetRDSCluster.
+func (mr *MockIRdsClientMockRecorder) GetRDSCluster(ctx, node interface{}) *gomock.Call {
+	mr.mock.ctrl.T.Helper()
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetRDSCluster", reflect.TypeOf((*MockIRdsClient)(nil).GetRDSCluster), ctx, node)
+}
+
 // Instance mocks base method.
 func (m *MockIRdsClient) Instance() rds.Instance {
 	m.ctrl.T.Helper()
diff --git a/shardingsphere-operator/pkg/reconcile/storagenode/aws/rdscluster.go b/shardingsphere-operator/pkg/reconcile/storagenode/aws/rdscluster.go
new file mode 100644
index 0000000..ae278ff
--- /dev/null
+++ b/shardingsphere-operator/pkg/reconcile/storagenode/aws/rdscluster.go
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package aws
+
+import (
+	"context"
+	"errors"
+	"fmt"
+	"strconv"
+	"strings"
+
+	"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/api/v1alpha1"
+	"github.com/database-mesh/golang-sdk/aws/client/rds"
+)
+
+func validateCreateRDSClusterParams(node *v1alpha1.StorageNode, paramsPtr *map[string]string) error {
+	requiredParams := map[string]string{
+		"instanceClass":      "instance class is empty",
+		"engine":             "engine is empty",
+		"engineVersion":      "engine version is empty",
+		"clusterIdentifier":  "cluster identifier is empty",
+		"masterUsername":     "master username is empty",
+		"masterUserPassword": "master user password is empty",
+		"allocatedStorage":   "allocated storage is empty",
+		"iops":               "iops is empty",
+		"storageType":        "storage type is empty",
+	}
+
+	params := *paramsPtr
+	if v, ok := node.Annotations[v1alpha1.AnnotationsClusterIdentifier]; !ok || v == "" {
+		return errors.New("cluster identifier is empty")
+	} else {
+		params["clusterIdentifier"] = v
+	}
+
+	if len(params["clusterIdentifier"]) > 50 {
+		return errors.New("cluster identifier is too long, max length is 50")
+	}
+
+	for k, v := range requiredParams {
+		if val, ok := params[k]; !ok || val == "" {
+			return fmt.Errorf(v)
+		}
+	}
+
+	// valid mysql engine version
+	if params["engine"] == "mysql" {
+		version := strings.Split(params["engineVersion"], ".")[0]
+		if version != "8" {
+			return fmt.Errorf("mysql engine version is not supported, only support 8.x")
+		}
+	}
+
+	if params["storageType"] != "io1" {
+		return fmt.Errorf("storage type is not supported, only support io1")
+	}
+
+	return nil
+}
+
+func getAllocatedStorage(allocatedStorageStr string) (allocatedStorage int, err error) {
+	allocatedStorage, err = strconv.Atoi(allocatedStorageStr)
+	if err != nil {
+		return 0, fmt.Errorf("allocated storage is not a number, %v", err)
+	}
+	if allocatedStorage < 100 || allocatedStorage > 65536 {
+		return 0, fmt.Errorf("allocated storage is out of range, min is 100, max is 65536")
+	}
+	return allocatedStorage, nil
+}
+
+func getIOPS(iopsStr string) (iops int, err error) {
+	iops, err = strconv.Atoi(iopsStr)
+	if err != nil {
+		return 0, fmt.Errorf("iops is not a number, %v", err)
+	}
+	if iops < 1000 || iops > 256000 {
+		return 0, fmt.Errorf("iops is out of range, min is 1000, max is 256000")
+	}
+	return iops, nil
+}
+
+// CreateRDSCluster creates rds cluster
+// ref: https://docs.aws.amazon.com/zh_cn/AmazonRDS/latest/APIReference/API_CreateDBInstance.html
+func (c *RdsClient) CreateRDSCluster(ctx context.Context, node *v1alpha1.StorageNode, params map[string]string) error {
+	if err := validateCreateRDSClusterParams(node, &params); err != nil {
+		return err
+	}
+
+	cc := c.Cluster()
+
+	allocatedStorage, err := getAllocatedStorage(params["allocatedStorage"])
+	if err != nil {
+		return err
+	}
+
+	iops, err := getIOPS(params["iops"])
+	if err != nil {
+		return err
+	}
+
+	if iops > allocatedStorage*50 || iops*2 < allocatedStorage {
+		return fmt.Errorf("the IOPS to GiB ratio must be between 0.5 and 50, current iops is %d, allocated storage is %d", iops, allocatedStorage)
+	}
+
+	cc.SetEngine(params["engine"]).
+		SetEngineVersion(params["engineVersion"]).
+		SetDBClusterIdentifier(params["clusterIdentifier"]).
+		SetMasterUsername(params["masterUsername"]).
+		SetMasterUserPassword(params["masterUserPassword"]).
+		SetAllocatedStorage(int32(allocatedStorage)).
+		SetDBClusterInstanceClass(params["instanceClass"]).
+		SetIOPS(int32(iops)).
+		SetStorageType(params["storageType"])
+
+	if v, ok := node.Annotations[v1alpha1.AnnotationsInstanceDBName]; ok && v != "" {
+		cc.SetDatabaseName(v)
+	}
+
+	if v, ok := params["publicAccessible"]; ok && v == "false" {
+		cc.SetPublicAccessible(false)
+	} else {
+		cc.SetPublicAccessible(true)
+	}
+
+	if params["vpcSecurityGroupIds"] != "" {
+		cc.SetVpcSecurityGroupIds(strings.Split(params["vpcSecurityGroupIds"], ","))
+	}
+
+	if err := cc.Create(ctx); err != nil {
+		return fmt.Errorf("create rds cluster failed, %v", err)
+	}
+
+	return nil
+}
+
+func (c *RdsClient) GetRDSCluster(ctx context.Context, node *v1alpha1.StorageNode) (cluster *rds.DescCluster, err error) {
+	identifier, ok := node.Annotations[v1alpha1.AnnotationsClusterIdentifier]
+	if !ok {
+		return nil, errors.New("cluster identifier is empty")
+	}
+
+	cc := c.Cluster()
+	cc.SetDBClusterIdentifier(identifier)
+	return cc.Describe(ctx)
+}
+
+func (c *RdsClient) DeleteRDSCluster(ctx context.Context, node *v1alpha1.StorageNode, storageProvider *v1alpha1.StorageProvider) error {
+	identifier, ok := node.Annotations[v1alpha1.AnnotationsClusterIdentifier]
+	if !ok {
+		return fmt.Errorf("cluster identifier is empty")
+	}
+
+	cc := c.Cluster()
+	cc.SetDBClusterIdentifier(identifier)
+
+	cluster, err := cc.Describe(ctx)
+	if err != nil {
+		return fmt.Errorf("describe rds cluster failed, %v", err)
+	}
+	if cluster == nil || cluster.Status == string(rds.DBClusterStatusDeleting) {
+		return nil
+	}
+
+	switch storageProvider.Spec.ReclaimPolicy {
+	case v1alpha1.StorageReclaimPolicyDelete:
+		cc.SetSkipFinalSnapshot(true)
+	case v1alpha1.StorageReclaimPolicyDeleteWithFinalSnapshot:
+		if v, ok := node.Annotations[v1alpha1.AnnotationsFinalSnapshotIdentifier]; !ok || v == "" {
+			return fmt.Errorf("final snapshot identifier is empty")
+		}
+		if cluster.Status != string(rds.DBClusterStatusAvailable) {
+			return fmt.Errorf("rds cluster is not available, can not delete with final snapshot")
+		}
+		cc.SetFinalDBSnapshotIdentifier(node.Annotations[v1alpha1.AnnotationsFinalSnapshotIdentifier])
+		cc.SetSkipFinalSnapshot(false)
+	case v1alpha1.StorageReclaimPolicyRetain:
+		return fmt.Errorf("rds cluster does not support retain policy")
+	}
+
+	return cc.Delete(ctx)
+}
diff --git a/shardingsphere-operator/test/e2e/storage_node_controller_test.go b/shardingsphere-operator/test/e2e/storage_node_controller_test.go
index f6744b7..47508ff 100644
--- a/shardingsphere-operator/test/e2e/storage_node_controller_test.go
+++ b/shardingsphere-operator/test/e2e/storage_node_controller_test.go
@@ -324,7 +324,7 @@ var _ = Describe("StorageNode Controller Suite Test For AWS Aurora Cluster", fun
 			monkey.PatchInstanceMethod(reflect.TypeOf(&aws.RdsClient{}), "GetAuroraCluster", func(_ *aws.RdsClient, _ context.Context, _ *v1alpha1.StorageNode) (*dbmesh_rds.DescCluster, error) {
 				return &dbmesh_rds.DescCluster{
 					DBClusterIdentifier: clusterIdentifier,
-					Status:              dbmesh_rds.DBClusterStatusCreating,
+					Status:              string(dbmesh_rds.DBClusterStatusCreating),
 					PrimaryEndpoint:     "test-primary-endpoint",
 					ReaderEndpoint:      "test-reader-endpoint",
 					Port:                3306,
@@ -359,7 +359,7 @@ var _ = Describe("StorageNode Controller Suite Test For AWS Aurora Cluster", fun
 				newSN := &v1alpha1.StorageNode{}
 				Expect(k8sClient.Get(ctx, client.ObjectKey{Name: snName, Namespace: "default"}, newSN)).Should(Succeed())
 				return newSN.Status.Cluster.Status
-			}, time.Second*10, time.Millisecond*250).Should(Equal(dbmesh_rds.DBClusterStatusCreating))
+			}, time.Second*10, time.Millisecond*250).Should(Equal(string(dbmesh_rds.DBClusterStatusCreating)))
 		})
 
 		It("should success when cluster is available", func() {
@@ -367,7 +367,7 @@ var _ = Describe("StorageNode Controller Suite Test For AWS Aurora Cluster", fun
 			monkey.PatchInstanceMethod(reflect.TypeOf(&aws.RdsClient{}), "GetAuroraCluster", func(_ *aws.RdsClient, _ context.Context, _ *v1alpha1.StorageNode) (*dbmesh_rds.DescCluster, error) {
 				return &dbmesh_rds.DescCluster{
 					DBClusterIdentifier: clusterIdentifier,
-					Status:              dbmesh_rds.DBClusterStatusAvailable,
+					Status:              string(dbmesh_rds.DBClusterStatusAvailable),
 					PrimaryEndpoint:     "test-primary-endpoint",
 					ReaderEndpoint:      "test-reader-endpoint",
 					Port:                3306,