You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by mi...@apache.org on 2023/06/08 06:30:45 UTC
[shardingsphere-on-cloud] branch main updated: feat: storage node support aws rds cluster
This is an automated email from the ASF dual-hosted git repository.
miaoliyao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/shardingsphere-on-cloud.git
The following commit(s) were added to refs/heads/main by this push:
new bd4c5bf feat: storage node support aws rds cluster
new 4656bf7 Merge pull request #402 from Xu-Wentao/sn-aws-rds-cluster
bd4c5bf is described below
commit bd4c5bf83f9274238dcc1c92cc786eb630a22f7a
Author: xuwentao <cu...@yahoo.com>
AuthorDate: Wed Jun 7 20:27:23 2023 +0800
feat: storage node support aws rds cluster
---
.../api/v1alpha1/storage_node_types.go | 7 +-
.../api/v1alpha1/storageprovider_types.go | 19 +-
shardingsphere-operator/go.mod | 2 +-
shardingsphere-operator/go.sum | 10 +-
.../controllers/storage_ndoe_controller_test.go | 464 ++++++++++++++++++++-
.../pkg/controllers/storage_node_controller.go | 73 +++-
.../pkg/reconcile/storagenode/aws/aurora.go | 2 +-
.../pkg/reconcile/storagenode/aws/aws.go | 4 +
.../pkg/reconcile/storagenode/aws/mocks/aws.go | 43 ++
.../pkg/reconcile/storagenode/aws/rdscluster.go | 197 +++++++++
.../test/e2e/storage_node_controller_test.go | 6 +-
11 files changed, 792 insertions(+), 35 deletions(-)
diff --git a/shardingsphere-operator/api/v1alpha1/storage_node_types.go b/shardingsphere-operator/api/v1alpha1/storage_node_types.go
index e92d78a..1dc1d97 100644
--- a/shardingsphere-operator/api/v1alpha1/storage_node_types.go
+++ b/shardingsphere-operator/api/v1alpha1/storage_node_types.go
@@ -120,10 +120,9 @@ type StorageNodeSpec struct {
// if not set, will NOT create database
Schema string `json:"schema"`
// +optional
- // only for cluster provider like AWS RDS Cluster/ AWS Aurora Cluster
- // The Default value is 1 for cluster provider
- // will not be effective for single instance, instance will always be 1 for single instance
- // Example: 2, means 2 instances in the cluster(1 primary + 1 reader)
+ // Only for aws aurora storage provider right now. And the default value is 1.
+ // aws rds instance is always 1.
+ // aws rds cluster will auto create 3 instances(1 primary and 2 replicas).
// +kubebuilder:default=1
Replicas int32 `json:"replicas"`
}
diff --git a/shardingsphere-operator/api/v1alpha1/storageprovider_types.go b/shardingsphere-operator/api/v1alpha1/storageprovider_types.go
index 8cd579f..02cb15f 100644
--- a/shardingsphere-operator/api/v1alpha1/storageprovider_types.go
+++ b/shardingsphere-operator/api/v1alpha1/storageprovider_types.go
@@ -25,15 +25,16 @@ import (
// NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized.
const (
- AnnotationsVPCSecurityGroupIds = "storageproviders.shardingsphere.apache.org/vpc-security-group-ids"
- AnnotationsSubnetGroupName = "storageproviders.shardingsphere.apache.org/vpc-subnet-group-name"
- AnnotationsAvailabilityZones = "storageproviders.shardingsphere.apache.org/availability-zones"
- AnnotationsClusterIdentifier = "storageproviders.shardingsphere.apache.org/cluster-identifier"
- AnnotationsInstanceIdentifier = "storageproviders.shardingsphere.apache.org/instance-identifier"
- AnnotationsInstanceDBName = "storageproviders.shardingsphere.apache.org/instance-db-name"
- AnnotationsSnapshotIdentifier = "storageproviders.shardingsphere.apache.org/snapshot-identifier"
- AnnotationsMasterUsername = "storageproviders.shardingsphere.apache.org/master-username"
- AnnotationsMasterUserPassword = "storageproviders.shardingsphere.apache.org/master-user-password"
+ AnnotationsVPCSecurityGroupIds = "storageproviders.shardingsphere.apache.org/vpc-security-group-ids"
+ AnnotationsSubnetGroupName = "storageproviders.shardingsphere.apache.org/vpc-subnet-group-name"
+ AnnotationsAvailabilityZones = "storageproviders.shardingsphere.apache.org/availability-zones"
+ AnnotationsClusterIdentifier = "storageproviders.shardingsphere.apache.org/cluster-identifier"
+ AnnotationsInstanceIdentifier = "storageproviders.shardingsphere.apache.org/instance-identifier"
+ AnnotationsInstanceDBName = "storageproviders.shardingsphere.apache.org/instance-db-name"
+ AnnotationsSnapshotIdentifier = "storageproviders.shardingsphere.apache.org/snapshot-identifier"
+ AnnotationsMasterUsername = "storageproviders.shardingsphere.apache.org/master-username"
+ AnnotationsMasterUserPassword = "storageproviders.shardingsphere.apache.org/master-user-password"
+ AnnotationsFinalSnapshotIdentifier = "storageproviders.shardingsphere.apache.org/final-snapshot-identifier"
ProvisionerAWSRDSInstance = "storageproviders.shardingsphere.apache.org/aws-rds-instance"
ProvisionerAWSRDSCluster = "storageproviders.shardingsphere.apache.org/aws-rds-cluster"
diff --git a/shardingsphere-operator/go.mod b/shardingsphere-operator/go.mod
index e321341..277acdc 100644
--- a/shardingsphere-operator/go.mod
+++ b/shardingsphere-operator/go.mod
@@ -8,7 +8,7 @@ require (
github.com/antlr/antlr4 v0.0.0-20181218183524-be58ebffde8e
github.com/chaos-mesh/chaos-mesh/api v0.0.0-20230517110555-afab5b4a7813
github.com/cloudnative-pg/cloudnative-pg v1.20.0
- github.com/database-mesh/golang-sdk v0.0.0-20230607082802-81e878c57517
+ github.com/database-mesh/golang-sdk v0.0.0-20230608051131-717115b848ac
github.com/go-logr/logr v1.2.4
github.com/go-sql-driver/mysql v1.7.1
github.com/golang/mock v1.6.0
diff --git a/shardingsphere-operator/go.sum b/shardingsphere-operator/go.sum
index 905c132..06ec0b2 100644
--- a/shardingsphere-operator/go.sum
+++ b/shardingsphere-operator/go.sum
@@ -64,12 +64,10 @@ github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGX
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
github.com/cncf/xds/go v0.0.0-20210312221358-fbca930ec8ed/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
-github.com/database-mesh/golang-sdk v0.0.0-20230606100535-23037381e4fb h1:p3tpHo24HjA7rW/JMjD9/6klWAVMn4fIefHXKgggVAg=
-github.com/database-mesh/golang-sdk v0.0.0-20230606100535-23037381e4fb/go.mod h1:yUEdo+aGdROl9oC7A1GeDB9/ubUtV2k73uLL+qC3PC4=
-github.com/database-mesh/golang-sdk v0.0.0-20230607065347-ac17e15902a6 h1:qm4zHib+RLX02dX+V8jpY5Zti0GBBZxoIBEceu/AuCQ=
-github.com/database-mesh/golang-sdk v0.0.0-20230607065347-ac17e15902a6/go.mod h1:yUEdo+aGdROl9oC7A1GeDB9/ubUtV2k73uLL+qC3PC4=
-github.com/database-mesh/golang-sdk v0.0.0-20230607082802-81e878c57517 h1:qS2ZfFpV+Vq3um8WYxWkgTpuoYT9ki63H2tA/Tb9SWw=
-github.com/database-mesh/golang-sdk v0.0.0-20230607082802-81e878c57517/go.mod h1:yUEdo+aGdROl9oC7A1GeDB9/ubUtV2k73uLL+qC3PC4=
+github.com/database-mesh/golang-sdk v0.0.0-20230608033816-2b0eafd08bb8 h1:ykLnu04rJx6qLn4QL0XcWpL3AoIGYDYLqRX7qpPB+Rw=
+github.com/database-mesh/golang-sdk v0.0.0-20230608033816-2b0eafd08bb8/go.mod h1:yUEdo+aGdROl9oC7A1GeDB9/ubUtV2k73uLL+qC3PC4=
+github.com/database-mesh/golang-sdk v0.0.0-20230608051131-717115b848ac h1:XxVNCAoMMSaDgKrPKV6jhXsknsXnHJ9PphpE/0DQldY=
+github.com/database-mesh/golang-sdk v0.0.0-20230608051131-717115b848ac/go.mod h1:yUEdo+aGdROl9oC7A1GeDB9/ubUtV2k73uLL+qC3PC4=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
diff --git a/shardingsphere-operator/pkg/controllers/storage_ndoe_controller_test.go b/shardingsphere-operator/pkg/controllers/storage_ndoe_controller_test.go
index fadd47a..f3632a5 100644
--- a/shardingsphere-operator/pkg/controllers/storage_ndoe_controller_test.go
+++ b/shardingsphere-operator/pkg/controllers/storage_ndoe_controller_test.go
@@ -799,7 +799,7 @@ var _ = Describe("StorageNode Controller Mock Test For AWS Aurora", func() {
PrimaryEndpoint: "test-aws-aurora.cluster-xxxxxx.us-east-1.rds.amazonaws.com",
ReaderEndpoint: "test-aws-aurora.cluster-ro-xxxxxx.us-east-1.rds.amazonaws.com",
Port: 3306,
- Status: dbmesh_rds.DBClusterStatusAvailable,
+ Status: string(dbmesh_rds.DBClusterStatusAvailable),
}
descInstance := &dbmesh_rds.DescInstance{
DBInstanceIdentifier: "test-aws-aurora-1",
@@ -850,7 +850,7 @@ var _ = Describe("StorageNode Controller Mock Test For AWS Aurora", func() {
Status: v1alpha1.StorageNodeStatus{
Phase: v1alpha1.StorageNodePhaseReady,
Cluster: v1alpha1.ClusterStatus{
- Status: dbmesh_rds.DBClusterStatusAvailable,
+ Status: string(dbmesh_rds.DBClusterStatusAvailable),
PrimaryEndpoint: v1alpha1.Endpoint{Address: "test-aws-aurora.cluster-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306},
ReaderEndpoints: []v1alpha1.Endpoint{{Address: "test-aws-aurora.cluster-ro-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306}},
},
@@ -870,7 +870,7 @@ var _ = Describe("StorageNode Controller Mock Test For AWS Aurora", func() {
PrimaryEndpoint: "test-aws-aurora.cluster-xxxxxx.us-east-1.rds.amazonaws.com",
ReaderEndpoint: "test-aws-aurora.cluster-ro-xxxxxx.us-east-1.rds.amazonaws.com",
Port: 3306,
- Status: dbmesh_rds.DBClusterStatusAvailable,
+ Status: string(dbmesh_rds.DBClusterStatusAvailable),
}
descInstance := &dbmesh_rds.DescInstance{
@@ -924,7 +924,7 @@ var _ = Describe("StorageNode Controller Mock Test For AWS Aurora", func() {
Status: v1alpha1.StorageNodeStatus{
Phase: v1alpha1.StorageNodePhaseDeleting,
Cluster: v1alpha1.ClusterStatus{
- Status: dbmesh_rds.DBClusterStatusDeleting,
+ Status: string(dbmesh_rds.DBClusterStatusDeleting),
PrimaryEndpoint: v1alpha1.Endpoint{Address: "test-aws-aurora.cluster-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306},
ReaderEndpoints: []v1alpha1.Endpoint{{Address: "test-aws-aurora.cluster-ro-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306}},
},
@@ -1007,7 +1007,7 @@ var _ = Describe("StorageNode Controller Mock Test For AWS Aurora", func() {
Status: v1alpha1.StorageNodeStatus{
Phase: v1alpha1.StorageNodePhaseReady,
Cluster: v1alpha1.ClusterStatus{
- Status: dbmesh_rds.DBClusterStatusAvailable,
+ Status: string(dbmesh_rds.DBClusterStatusAvailable),
PrimaryEndpoint: v1alpha1.Endpoint{Address: "test-aws-aurora.cluster-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306},
ReaderEndpoints: []v1alpha1.Endpoint{{Address: "test-aws-aurora.cluster-ro-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306}},
},
@@ -1056,7 +1056,7 @@ var _ = Describe("StorageNode Controller Mock Test For AWS Aurora", func() {
DBClusterIdentifier: "test-aws-aurora",
PrimaryEndpoint: "test-aws-aurora.cluster-xxxxxx.us-east-1.rds.amazonaws.com",
Port: int32(3306),
- Status: dbmesh_rds.DBClusterStatusAvailable,
+ Status: string(dbmesh_rds.DBClusterStatusAvailable),
}, nil).Times(1)
// mock get instances of aws aurora are available
mockAws.EXPECT().GetInstancesByFilters(gomock.Any(), gomock.Any()).Return([]*dbmesh_rds.DescInstance{
@@ -1145,7 +1145,7 @@ var _ = Describe("StorageNode Controller Mock Test For AWS Aurora", func() {
DBClusterIdentifier: "test-aws-aurora",
PrimaryEndpoint: "test-aws-aurora.cluster-xxxxxx.us-east-1.rds.amazonaws.com",
Port: int32(3306),
- Status: dbmesh_rds.DBClusterStatusAvailable,
+ Status: string(dbmesh_rds.DBClusterStatusAvailable),
DBClusterMembers: []dbmesh_rds.ClusterMember{
{DBInstanceIdentifier: "test-aws-aurora-instance-0"},
{DBInstanceIdentifier: "test-aws-aurora-instance-1"},
@@ -1183,5 +1183,455 @@ var _ = Describe("StorageNode Controller Mock Test For AWS Aurora", func() {
Expect(storageNode.Status.Registered).To(BeFalse())
})
})
+})
+
+var _ = Describe("StorageNode Controller Mock Test For AWS RDS Cluster", func() {
+ var provider *v1alpha1.StorageProvider
+ var providerName = "aws-rds-cluster"
+ BeforeEach(func() {
+ provider = &v1alpha1.StorageProvider{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: providerName,
+ },
+ Spec: v1alpha1.StorageProviderSpec{
+ Provisioner: v1alpha1.ProvisionerAWSRDSCluster,
+ Parameters: map[string]string{
+ "engine": "mysql",
+ "engineVersion": "5.7",
+ "masterUsername": "root",
+ "masterUserPassword": "root",
+ "allocatedStorage": "20",
+ },
+ },
+ }
+ Expect(fakeClient.Create(ctx, provider)).Should(Succeed())
+
+ // mock aws client
+ // mock aws rds client
+ mockCtrl = gomock.NewController(GinkgoT())
+ mockAws = mock_aws.NewMockIRdsClient(mockCtrl)
+ monkey.Patch(aws.NewRdsClient, func(rds dbmesh_rds.RDS) aws.IRdsClient {
+ return mockAws
+ })
+ mockSS = mock_shardingsphere.NewMockIServer(mockCtrl)
+ monkey.Patch(shardingsphere.NewServer, func(_, _ string, _ uint, _, _ string) (shardingsphere.IServer, error) {
+ return mockSS, nil
+ })
+ })
+
+ AfterEach(func() {
+ mockCtrl.Finish()
+ monkey.UnpatchAll()
+ })
+
+ Context("reconcile storage node", func() {
+ It("should success when aws rds cluster is not exits", func() {
+ name := "test-aws-rds-cluster-not-exists"
+ namespacedName := types.NamespacedName{
+ Name: name,
+ Namespace: defaultTestNamespace,
+ }
+ storageNode := v1alpha1.StorageNode{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: name,
+ Namespace: defaultTestNamespace,
+ Annotations: map[string]string{
+ v1alpha1.AnnotationsClusterIdentifier: "test-aws-rds-cluster",
+ },
+ },
+ Spec: v1alpha1.StorageNodeSpec{
+ StorageProviderName: providerName,
+ },
+ }
+ Expect(fakeClient.Create(ctx, &storageNode)).Should(Succeed())
+
+ descCluster := &dbmesh_rds.DescCluster{
+ DBClusterIdentifier: "test-aws-rds-cluster",
+ PrimaryEndpoint: "test-aws-rds-cluster.cluster-xxxxxx.us-east-1.rds.amazonaws.com",
+ ReaderEndpoint: "test-aws-rds-cluster.cluster-ro-xxxxxx.us-east-1.rds.amazonaws.com",
+ Port: 3306,
+ Status: string(dbmesh_rds.DBClusterStatusAvailable),
+ }
+ descInstance := &dbmesh_rds.DescInstance{
+ DBInstanceIdentifier: "test-aws-rds-cluster-instance-1",
+ DBClusterIdentifier: "test-aws-rds-cluster",
+ Endpoint: dbmesh_rds.Endpoint{
+ Address: "test-aws-rds-cluster-instance-1.cluster-xxxxxx.us-east-1.rds.amazonaws.com",
+ Port: 3306,
+ },
+ DBInstanceStatus: dbmesh_rds.DBInstanceStatusAvailable,
+ }
+
+ // mock aws rds cluster is not exist
+ mockAws.EXPECT().GetRDSCluster(gomock.Any(), gomock.Any()).Return(nil, nil).Times(1)
+ // mock create aws aurora cluster
+ mockAws.EXPECT().CreateRDSCluster(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil)
+ // mock aws aurora cluster is created
+ mockAws.EXPECT().GetRDSCluster(gomock.Any(), gomock.Any()).Return(descCluster, nil).Times(1)
+ // mock aws instance is created
+ mockAws.EXPECT().GetInstancesByFilters(gomock.Any(), gomock.Any()).Return([]*dbmesh_rds.DescInstance{descInstance}, nil).Times(1)
+
+ req := ctrl.Request{NamespacedName: namespacedName}
+ _, err := reconciler.Reconcile(ctx, req)
+ Expect(err).To(BeNil())
+ sn := &v1alpha1.StorageNode{}
+ Expect(fakeClient.Get(ctx, namespacedName, sn)).Should(Succeed())
+ Expect(sn.Status.Phase).To(Equal(v1alpha1.StorageNodePhaseReady))
+ })
+
+ It("should success when storage node been delete", func() {
+ name := "test-aws-rds-cluster-deleted"
+ namespacedName := types.NamespacedName{
+ Name: name,
+ Namespace: defaultTestNamespace,
+ }
+ req := ctrl.Request{NamespacedName: namespacedName}
+ storageNode := &v1alpha1.StorageNode{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: name,
+ Namespace: defaultTestNamespace,
+ Annotations: map[string]string{
+ v1alpha1.AnnotationsClusterIdentifier: "test-aws-rds-cluster",
+ },
+ Finalizers: []string{FinalizerName},
+ },
+ Spec: v1alpha1.StorageNodeSpec{
+ StorageProviderName: providerName,
+ },
+ Status: v1alpha1.StorageNodeStatus{
+ Phase: v1alpha1.StorageNodePhaseReady,
+ Cluster: v1alpha1.ClusterStatus{
+ Status: string(dbmesh_rds.DBClusterStatusAvailable),
+ PrimaryEndpoint: v1alpha1.Endpoint{Address: "test-aws-rds-cluster.cluster-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306},
+ ReaderEndpoints: []v1alpha1.Endpoint{{Address: "test-aws-rds-cluster.cluster-ro-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306}},
+ },
+ Instances: []v1alpha1.InstanceStatus{
+ {
+ Status: string(dbmesh_rds.DBInstanceStatusAvailable),
+ Endpoint: v1alpha1.Endpoint{Address: "test-aws-rds-cluster-instance-1.cluster-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306},
+ },
+ },
+ },
+ }
+
+ Expect(fakeClient.Create(ctx, storageNode)).Should(Succeed())
+
+ descCluster := &dbmesh_rds.DescCluster{
+ DBClusterIdentifier: "test-aws-rds-cluster",
+ PrimaryEndpoint: "test-aws-rds-cluster.cluster-xxxxxx.us-east-1.rds.amazonaws.com",
+ ReaderEndpoint: "test-aws-rds-cluster.cluster-ro-xxxxxx.us-east-1.rds.amazonaws.com",
+ Port: 3306,
+ Status: string(dbmesh_rds.DBClusterStatusAvailable),
+ }
+
+ descInstance := &dbmesh_rds.DescInstance{
+ DBInstanceIdentifier: "test-aws-rds-cluster-1",
+ DBClusterIdentifier: "test-aws-rds-cluster",
+ Endpoint: dbmesh_rds.Endpoint{
+ Address: "test-aws-rds-cluster-1.cluster-xxxxxx.us-east-1.rds.amazonaws.com",
+ Port: 3306,
+ },
+ DBInstanceStatus: dbmesh_rds.DBInstanceStatusAvailable,
+ }
+
+ Expect(fakeClient.Delete(ctx, storageNode)).Should(Succeed())
+
+ // mock aws rds cluster is exists
+ mockAws.EXPECT().GetRDSCluster(gomock.Any(), gomock.Any()).Return(descCluster, nil).Times(1)
+ // mock get instances of aws rds cluster
+ mockAws.EXPECT().GetInstancesByFilters(gomock.Any(), gomock.Any()).Return([]*dbmesh_rds.DescInstance{descInstance}, nil).Times(1)
+ // mock delete aws rds cluster
+ mockAws.EXPECT().DeleteRDSCluster(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil)
+
+ _, err := reconciler.Reconcile(ctx, req)
+ Expect(err).To(BeNil())
+
+ Expect(fakeClient.Get(ctx, namespacedName, storageNode)).Should(Succeed())
+ Expect(storageNode.DeletionTimestamp).NotTo(BeNil())
+ Expect(storageNode.Status.Phase).To(Equal(v1alpha1.StorageNodePhaseDeleting))
+ })
+
+ It("should be success when storage node is deleting", func() {
+ name := "test-aws-rds-cluster-deleting"
+ namespacedName := types.NamespacedName{
+ Name: name,
+ Namespace: defaultTestNamespace,
+ }
+ req := ctrl.Request{NamespacedName: namespacedName}
+ deletionTimestamp := metav1.Now()
+ storageNode := &v1alpha1.StorageNode{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: name,
+ Namespace: defaultTestNamespace,
+ Annotations: map[string]string{
+ v1alpha1.AnnotationsClusterIdentifier: "test-aws-rds-cluster",
+ },
+ Finalizers: []string{FinalizerName},
+ DeletionTimestamp: &deletionTimestamp,
+ },
+ Spec: v1alpha1.StorageNodeSpec{
+ StorageProviderName: providerName,
+ },
+ Status: v1alpha1.StorageNodeStatus{
+ Phase: v1alpha1.StorageNodePhaseDeleting,
+ Cluster: v1alpha1.ClusterStatus{
+ Status: string(dbmesh_rds.DBClusterStatusDeleting),
+ PrimaryEndpoint: v1alpha1.Endpoint{Address: "test-aws-rds-cluster.cluster-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306},
+ ReaderEndpoints: []v1alpha1.Endpoint{{Address: "test-aws-rds-cluster.cluster-ro-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306}},
+ },
+ Instances: []v1alpha1.InstanceStatus{
+ {
+ Status: string(dbmesh_rds.DBInstanceStatusDeleting),
+ Endpoint: v1alpha1.Endpoint{Address: "test-aws-rds-cluster-1.cluster-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306},
+ },
+ },
+ },
+ }
+ Expect(fakeClient.Create(ctx, storageNode)).Should(Succeed())
+
+ // mock aws rds cluster is not exists
+ mockAws.EXPECT().GetRDSCluster(gomock.Any(), gomock.Any()).Return(nil, nil).Times(1)
+ // mock get instances of aws rds cluster is not exists
+ mockAws.EXPECT().GetInstancesByFilters(gomock.Any(), gomock.Any()).Return(nil, nil).Times(1)
+
+ _, err := reconciler.Reconcile(ctx, req)
+ Expect(err).To(BeNil())
+ Expect(fakeClient.Get(ctx, namespacedName, storageNode)).Should(Succeed())
+ Expect(storageNode.Status.Phase).To(Equal(v1alpha1.StorageNodePhaseDeleteComplete))
+ })
+ It("should be success when storage node is delete completed", func() {
+ name := "test-aws-rds-cluster-delete-completed"
+ namespacedName := types.NamespacedName{
+ Name: name,
+ Namespace: defaultTestNamespace,
+ }
+ req := ctrl.Request{NamespacedName: namespacedName}
+ deletionTimestamp := metav1.Now()
+ storageNode := &v1alpha1.StorageNode{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: name,
+ Namespace: defaultTestNamespace,
+ Annotations: map[string]string{
+ v1alpha1.AnnotationsClusterIdentifier: "test-aws-rds-cluster",
+ },
+ Finalizers: []string{FinalizerName},
+ DeletionTimestamp: &deletionTimestamp,
+ },
+ Spec: v1alpha1.StorageNodeSpec{
+ StorageProviderName: providerName,
+ },
+ Status: v1alpha1.StorageNodeStatus{
+ Phase: v1alpha1.StorageNodePhaseDeleteComplete,
+ },
+ }
+ Expect(fakeClient.Create(ctx, storageNode)).Should(Succeed())
+
+ _, err := reconciler.Reconcile(ctx, req)
+ Expect(err).To(BeNil())
+ err = fakeClient.Get(ctx, namespacedName, storageNode)
+ Expect(apierrors.IsNotFound(err)).To(BeTrue())
+ })
+
+ It("should be success when storage node is ready for register", func() {
+ name := "test-aws-rds-cluster-ready-for-register"
+ namespacedName := types.NamespacedName{
+ Name: name,
+ Namespace: defaultTestNamespace,
+ }
+ req := ctrl.Request{NamespacedName: namespacedName}
+ storageNode := &v1alpha1.StorageNode{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: name,
+ Namespace: defaultTestNamespace,
+ Annotations: map[string]string{
+ v1alpha1.AnnotationsClusterIdentifier: "test-aws-rds-cluster",
+ AnnotationKeyRegisterStorageUnitEnabled: "true",
+ AnnotationKeyLogicDatabaseName: "test-logic-db",
+ v1alpha1.AnnotationsInstanceDBName: "test-instance-db",
+ AnnotationKeyComputeNodeName: "test-compute-node",
+ },
+ },
+ Spec: v1alpha1.StorageNodeSpec{
+ StorageProviderName: providerName,
+ },
+ Status: v1alpha1.StorageNodeStatus{
+ Phase: v1alpha1.StorageNodePhaseReady,
+ Cluster: v1alpha1.ClusterStatus{
+ Status: string(dbmesh_rds.DBClusterStatusAvailable),
+ PrimaryEndpoint: v1alpha1.Endpoint{Address: "test-aws-rds-cluster.cluster-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306},
+ ReaderEndpoints: []v1alpha1.Endpoint{{Address: "test-aws-rds-cluster.cluster-ro-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306}},
+ },
+ Instances: []v1alpha1.InstanceStatus{
+ {
+ Status: string(dbmesh_rds.DBInstanceStatusAvailable),
+ Endpoint: v1alpha1.Endpoint{Address: "test-aws-rds-cluster-1.cluster-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306},
+ },
+ },
+ },
+ }
+ cn := &v1alpha1.ComputeNode{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "test-compute-node",
+ Namespace: defaultTestNamespace,
+ },
+ Spec: v1alpha1.ComputeNodeSpec{
+ Bootstrap: v1alpha1.BootstrapConfig{
+ ServerConfig: v1alpha1.ServerConfig{
+ Authority: v1alpha1.ComputeNodeAuthority{
+ Users: []v1alpha1.ComputeNodeUser{{User: "test-user", Password: "test-password"}},
+ },
+ Mode: v1alpha1.ComputeNodeServerMode{},
+ Props: nil,
+ },
+ },
+ },
+ }
+ svc := &corev1.Service{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "test-compute-node",
+ Namespace: defaultTestNamespace,
+ },
+ Spec: corev1.ServiceSpec{
+ Ports: []corev1.ServicePort{
+ {Name: "http", Protocol: "TCP", Port: 3307},
+ },
+ },
+ }
+ Expect(fakeClient.Create(ctx, storageNode)).Should(Succeed())
+ Expect(fakeClient.Create(ctx, cn)).Should(Succeed())
+ Expect(fakeClient.Create(ctx, svc)).Should(Succeed())
+
+ // mock aws rds cluster is available
+ mockAws.EXPECT().GetRDSCluster(gomock.Any(), gomock.Any()).Return(&dbmesh_rds.DescCluster{
+ DBClusterIdentifier: "test-aws-rds-cluster",
+ PrimaryEndpoint: "test-aws-rds-cluster.cluster-xxxxxx.us-east-1.rds.amazonaws.com",
+ Port: int32(3306),
+ Status: string(dbmesh_rds.DBClusterStatusAvailable),
+ }, nil).Times(1)
+ // mock get instances of aws aurora are available
+ mockAws.EXPECT().GetInstancesByFilters(gomock.Any(), gomock.Any()).Return([]*dbmesh_rds.DescInstance{
+ {
+ DBInstanceIdentifier: "test-aws-rds-cluster-instance-0",
+ DBInstanceStatus: dbmesh_rds.DBInstanceStatusAvailable,
+ Endpoint: dbmesh_rds.Endpoint{Address: "test-aws-rds-cluster-1.cluster-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306},
+ },
+ }, nil).Times(1)
+
+ host, port, username, password := getDatasourceInfoFromCluster(storageNode, provider)
+
+ // mock shardingsphere
+ mockSS.EXPECT().CreateDatabase(gomock.Any()).Return(nil).Times(1)
+ mockSS.EXPECT().RegisterStorageUnit("test-logic-db", getDSName(storageNode), host, uint(port), "test-instance-db", username, password).Return(nil).Times(1)
+ mockSS.EXPECT().Close().Return(nil).Times(1)
+
+ _, err := reconciler.Reconcile(ctx, req)
+ Expect(err).To(BeNil())
+
+ err = fakeClient.Get(ctx, namespacedName, storageNode)
+ Expect(storageNode.Status.Registered).To(BeTrue())
+ })
+
+ It("should be success unregistered when storage node is deleting", func() {
+ snName := "test-aws-rds-cluster-unregistered"
+ namespacedName := types.NamespacedName{
+ Name: snName,
+ Namespace: defaultTestNamespace,
+ }
+ req := ctrl.Request{NamespacedName: namespacedName}
+
+ storageNode := &v1alpha1.StorageNode{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: snName,
+ Namespace: defaultTestNamespace,
+ Annotations: map[string]string{
+ v1alpha1.AnnotationsClusterIdentifier: "test-aws-rds-cluster",
+ AnnotationKeyRegisterStorageUnitEnabled: "true",
+ AnnotationKeyLogicDatabaseName: "test-logic-db",
+ v1alpha1.AnnotationsInstanceDBName: "test-instance-db",
+ AnnotationKeyComputeNodeName: "test-compute-node",
+ },
+ },
+ Spec: v1alpha1.StorageNodeSpec{
+ StorageProviderName: providerName,
+ Replicas: 2,
+ },
+ Status: v1alpha1.StorageNodeStatus{
+ Phase: v1alpha1.StorageNodePhaseReady,
+ Registered: true,
+ },
+ }
+ cn := &v1alpha1.ComputeNode{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "test-compute-node",
+ Namespace: defaultTestNamespace,
+ },
+ Spec: v1alpha1.ComputeNodeSpec{
+ Bootstrap: v1alpha1.BootstrapConfig{
+ ServerConfig: v1alpha1.ServerConfig{
+ Authority: v1alpha1.ComputeNodeAuthority{
+ Users: []v1alpha1.ComputeNodeUser{{User: "test-user", Password: "test-password"}},
+ },
+ },
+ },
+ },
+ }
+ svc := &corev1.Service{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "test-compute-node",
+ Namespace: defaultTestNamespace,
+ },
+ Spec: corev1.ServiceSpec{
+ Ports: []corev1.ServicePort{
+ {Name: "http", Protocol: "TCP", Port: 3307},
+ },
+ },
+ }
+ Expect(fakeClient.Create(ctx, storageNode)).Should(Succeed())
+ Expect(fakeClient.Create(ctx, cn)).Should(Succeed())
+ Expect(fakeClient.Create(ctx, svc)).Should(Succeed())
+
+ // mock aws rds cluster is available
+ mockAws.EXPECT().GetRDSCluster(gomock.Any(), gomock.Any()).Return(&dbmesh_rds.DescCluster{
+ DBClusterIdentifier: "test-aws-rds-cluster",
+ PrimaryEndpoint: "test-aws-rds-cluster.cluster-xxxxxx.us-east-1.rds.amazonaws.com",
+ Port: int32(3306),
+ Status: string(dbmesh_rds.DBClusterStatusAvailable),
+ DBClusterMembers: []dbmesh_rds.ClusterMember{
+ {DBInstanceIdentifier: "test-aws-rds-cluster-instance-0"},
+ {DBInstanceIdentifier: "test-aws-rds-cluster-instance-1"},
+ },
+ }, nil).Times(2)
+ // mock get instances of aws aurora are available
+ mockAws.EXPECT().GetInstancesByFilters(gomock.Any(), gomock.Any()).Return([]*dbmesh_rds.DescInstance{
+ {
+ DBInstanceIdentifier: "test-aws-rds-cluster-instance-0",
+ DBInstanceStatus: dbmesh_rds.DBInstanceStatusAvailable,
+ Endpoint: dbmesh_rds.Endpoint{Address: "test-aws-rds-cluster-1.cluster-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306},
+ },
+ {
+ DBInstanceIdentifier: "test-aws-rds-cluster-instance-1",
+ DBInstanceStatus: dbmesh_rds.DBInstanceStatusAvailable,
+ Endpoint: dbmesh_rds.Endpoint{Address: "test-aws-rds-cluster-2.cluster-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306},
+ },
+ }, nil).Times(2)
+
+ _, err := reconciler.Reconcile(ctx, req)
+ Expect(err).To(BeNil())
+
+ Expect(fakeClient.Delete(ctx, storageNode)).Should(Succeed())
+ // mock shardingsphere
+ mockSS.EXPECT().UnRegisterStorageUnit("test-logic-db", getDSName(storageNode)).Return(nil).Times(1)
+ mockSS.EXPECT().Close().Return(nil).Times(1)
+
+ // mock delete aws rds cluster
+ mockAws.EXPECT().DeleteRDSCluster(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1)
+
+ _, err = reconciler.Reconcile(ctx, req)
+ Expect(err).To(BeNil())
+
+ err = fakeClient.Get(ctx, namespacedName, storageNode)
+ Expect(storageNode.Status.Registered).To(BeFalse())
+ })
+ })
})
diff --git a/shardingsphere-operator/pkg/controllers/storage_node_controller.go b/shardingsphere-operator/pkg/controllers/storage_node_controller.go
index 178042c..39aa7d1 100644
--- a/shardingsphere-operator/pkg/controllers/storage_node_controller.go
+++ b/shardingsphere-operator/pkg/controllers/storage_node_controller.go
@@ -164,6 +164,11 @@ func (r *StorageNodeReconciler) reconcile(ctx context.Context, storageProvider *
r.Recorder.Eventf(node, corev1.EventTypeWarning, "Reconcile Failed", fmt.Sprintf("unable to reconcile AWS RDS Instance %s/%s, err:%s", node.GetNamespace(), node.GetName(), err.Error()))
return ctrl.Result{RequeueAfter: defaultRequeueTime}, err
}
+ case v1alpha1.ProvisionerAWSRDSCluster:
+ if err := r.reconcileAwsRDSCluster(ctx, aws.NewRdsClient(r.AwsRDS), node, storageProvider); err != nil {
+ r.Recorder.Eventf(node, corev1.EventTypeWarning, "Reconcile Failed", fmt.Sprintf("unable to reconcile AWS RDS Cluster %s/%s, err:%s", node.GetNamespace(), node.GetName(), err.Error()))
+ return ctrl.Result{RequeueAfter: defaultRequeueTime}, err
+ }
case v1alpha1.ProvisionerAWSAurora:
if err := r.reconcileAwsAurora(ctx, aws.NewRdsClient(r.AwsRDS), node, storageProvider); err != nil {
r.Recorder.Eventf(node, corev1.EventTypeWarning, "Reconcile Failed", fmt.Sprintf("unable to reconcile AWS Aurora %s/%s, err:%s", node.GetNamespace(), node.GetName(), err.Error()))
@@ -215,7 +220,9 @@ func (r *StorageNodeReconciler) getStorageProvider(ctx context.Context, node *v1
// check provisioner
// aws-like provisioner need aws rds client
- if storageProvider.Spec.Provisioner == v1alpha1.ProvisionerAWSRDSInstance || storageProvider.Spec.Provisioner == v1alpha1.ProvisionerAWSAurora {
+ if storageProvider.Spec.Provisioner == v1alpha1.ProvisionerAWSRDSInstance ||
+ storageProvider.Spec.Provisioner == v1alpha1.ProvisionerAWSAurora ||
+ storageProvider.Spec.Provisioner == v1alpha1.ProvisionerAWSRDSCluster {
if r.AwsRDS == nil {
r.Recorder.Event(node, corev1.EventTypeWarning, "AwsRdsClientIsNil", "aws rds client is nil, please check your aws credentials")
return nil, fmt.Errorf("aws rds client is nil, please check your aws credentials")
@@ -238,7 +245,7 @@ func computeDesiredState(status v1alpha1.StorageNodeStatus) v1alpha1.StorageNode
}
} else {
// If the storage node is not being deleted, check if all instances are ready.
- if (clusterStatus == "" || clusterStatus == rds.DBClusterStatusAvailable) && allInstancesReady(status.Instances) {
+ if (clusterStatus == "" || clusterStatus == string(rds.DBClusterStatusAvailable)) && allInstancesReady(status.Instances) {
desiredState.Phase = v1alpha1.StorageNodePhaseReady
} else {
desiredState.Phase = v1alpha1.StorageNodePhaseNotReady
@@ -256,7 +263,7 @@ func computeNewConditions(desiredState, status v1alpha1.StorageNodeStatus, clust
// Update the cluster ready condition if the cluster status is not empty
if clusterStatus != "" {
- if clusterStatus == rds.DBClusterStatusAvailable {
+ if clusterStatus == string(rds.DBClusterStatusAvailable) {
newSNConditions.UpsertCondition(&v1alpha1.StorageNodeCondition{
Type: v1alpha1.StorageNodeConditionTypeClusterReady,
Status: corev1.ConditionTrue,
@@ -372,6 +379,34 @@ func updateAWSRDSInstanceStatus(node *v1alpha1.StorageNode, instance *rds.DescIn
return nil
}
+func (r *StorageNodeReconciler) reconcileAwsRDSCluster(ctx context.Context, client aws.IRdsClient, node *v1alpha1.StorageNode, storageProvider *v1alpha1.StorageProvider) error {
+ ac, err := client.GetRDSCluster(ctx, node)
+ if err != nil {
+ return err
+ }
+
+ if ac == nil {
+ // create instance
+ err = client.CreateRDSCluster(ctx, node, storageProvider.Spec.Parameters)
+ if err != nil {
+ return err
+ }
+ ac, err = client.GetRDSCluster(ctx, node)
+ if err != nil {
+ return err
+ }
+ }
+
+ // TODO: reconcile instance of aurora
+
+ // update storage node status
+ if err := updateClusterStatus(ctx, client, node, ac); err != nil {
+ return fmt.Errorf("updateClusterStatus failed: %w", err)
+ }
+
+ return nil
+}
+
func (r *StorageNodeReconciler) reconcileAwsAurora(ctx context.Context, client aws.IRdsClient, node *v1alpha1.StorageNode, storageProvider *v1alpha1.StorageProvider) error {
r.Log.Info("reconcileAwsAurora", "node", node.GetName(), "phase", node.Status.Phase)
ac, err := client.GetAuroraCluster(ctx, node)
@@ -451,6 +486,10 @@ func (r *StorageNodeReconciler) deleteDatabaseCluster(ctx context.Context, node
if err := r.deleteAWSRDSInstance(ctx, aws.NewRdsClient(r.AwsRDS), node, storageProvider); err != nil {
return fmt.Errorf("delete aws rds instance failed: %w", err)
}
+ case v1alpha1.ProvisionerAWSRDSCluster:
+ if err := r.deleteAWSRDSCluster(ctx, aws.NewRdsClient(r.AwsRDS), node, storageProvider); err != nil {
+ return fmt.Errorf("delete aws rds cluster failed: %w", err)
+ }
case v1alpha1.ProvisionerAWSAurora:
if err := r.deleteAWSAurora(ctx, aws.NewRdsClient(r.AwsRDS), node, storageProvider); err != nil {
return fmt.Errorf("delete aws aurora cluster failed: %w", err)
@@ -487,6 +526,32 @@ func (r *StorageNodeReconciler) deleteAWSRDSInstance(ctx context.Context, client
return nil
}
+// nolint:dupl
+func (r *StorageNodeReconciler) deleteAWSRDSCluster(ctx context.Context, client aws.IRdsClient, node *v1alpha1.StorageNode, storageProvider *v1alpha1.StorageProvider) error {
+ if node.Annotations[v1alpha1.AnnotationsClusterIdentifier] == "" {
+ return nil
+ }
+
+ cluster, err := client.GetRDSCluster(ctx, node)
+ if err != nil {
+ return fmt.Errorf("get rds cluster failed: %w", err)
+ }
+ if cluster != nil && cluster.Status != string(rds.DBClusterStatusDeleting) {
+ if err := client.DeleteRDSCluster(ctx, node, storageProvider); err != nil {
+ r.Recorder.Eventf(node, corev1.EventTypeWarning, "DeleteFailed", "Failed to delete rds cluster %s: %s", node.Annotations[v1alpha1.AnnotationsClusterIdentifier], err.Error())
+ return err
+ }
+ r.Recorder.Event(node, corev1.EventTypeNormal, "Deleting", fmt.Sprintf("rds cluster %s is deleting", node.Annotations[v1alpha1.AnnotationsClusterIdentifier]))
+ }
+
+ // update storage node status
+ if err := updateClusterStatus(ctx, client, node, cluster); err != nil {
+ return fmt.Errorf("updateClusterStatus failed: %w", err)
+ }
+ return nil
+}
+
+// nolint:dupl
func (r *StorageNodeReconciler) deleteAWSAurora(ctx context.Context, client aws.IRdsClient, node *v1alpha1.StorageNode, storageProvider *v1alpha1.StorageProvider) error {
if node.Annotations[v1alpha1.AnnotationsClusterIdentifier] == "" {
return nil
@@ -496,7 +561,7 @@ func (r *StorageNodeReconciler) deleteAWSAurora(ctx context.Context, client aws.
if err != nil {
return fmt.Errorf("get aurora cluster failed: %w", err)
}
- if auroraCluster != nil && auroraCluster.Status != rds.DBClusterStatusDeleting {
+ if auroraCluster != nil && auroraCluster.Status != string(rds.DBClusterStatusDeleting) {
if err := client.DeleteAuroraCluster(ctx, node, storageProvider); err != nil {
r.Recorder.Eventf(node, corev1.EventTypeWarning, "DeleteFailed", "Failed to delete aurora cluster %s: %s", node.Annotations[v1alpha1.AnnotationsClusterIdentifier], err.Error())
return err
diff --git a/shardingsphere-operator/pkg/reconcile/storagenode/aws/aurora.go b/shardingsphere-operator/pkg/reconcile/storagenode/aws/aurora.go
index 90df91e..42e3cbe 100644
--- a/shardingsphere-operator/pkg/reconcile/storagenode/aws/aurora.go
+++ b/shardingsphere-operator/pkg/reconcile/storagenode/aws/aurora.go
@@ -120,8 +120,8 @@ func (c *RdsClient) DeleteAuroraCluster(ctx context.Context, node *v1alpha1.Stor
case v1alpha1.StorageReclaimPolicyRetain:
aurora.SetDeleteAutomateBackups(false).SetSkipFinalSnapshot(true)
case v1alpha1.StorageReclaimPolicyDeleteWithFinalSnapshot:
+ // TODO set final snapshot name
aurora.SetDeleteAutomateBackups(true).SetSkipFinalSnapshot(false)
}
-
return aurora.Delete(ctx)
}
diff --git a/shardingsphere-operator/pkg/reconcile/storagenode/aws/aws.go b/shardingsphere-operator/pkg/reconcile/storagenode/aws/aws.go
index cf90112..0fb62d4 100644
--- a/shardingsphere-operator/pkg/reconcile/storagenode/aws/aws.go
+++ b/shardingsphere-operator/pkg/reconcile/storagenode/aws/aws.go
@@ -39,6 +39,10 @@ type IRdsClient interface {
GetInstancesByFilters(ctx context.Context, filters map[string][]string) (instances []*rds.DescInstance, err error)
DeleteInstance(ctx context.Context, node *v1alpha1.StorageNode, storageProvider *v1alpha1.StorageProvider) error
+ CreateRDSCluster(ctx context.Context, node *v1alpha1.StorageNode, params map[string]string) error
+ GetRDSCluster(ctx context.Context, node *v1alpha1.StorageNode) (cluster *rds.DescCluster, err error)
+ DeleteRDSCluster(ctx context.Context, node *v1alpha1.StorageNode, storageProvider *v1alpha1.StorageProvider) error
+
CreateAuroraCluster(ctx context.Context, node *v1alpha1.StorageNode, params map[string]string) error
GetAuroraCluster(ctx context.Context, node *v1alpha1.StorageNode) (cluster *rds.DescCluster, err error)
DeleteAuroraCluster(ctx context.Context, node *v1alpha1.StorageNode, storageProvider *v1alpha1.StorageProvider) error
diff --git a/shardingsphere-operator/pkg/reconcile/storagenode/aws/mocks/aws.go b/shardingsphere-operator/pkg/reconcile/storagenode/aws/mocks/aws.go
index 060125d..30fc045 100644
--- a/shardingsphere-operator/pkg/reconcile/storagenode/aws/mocks/aws.go
+++ b/shardingsphere-operator/pkg/reconcile/storagenode/aws/mocks/aws.go
@@ -92,6 +92,20 @@ func (mr *MockIRdsClientMockRecorder) CreateInstance(ctx, node, params interface
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateInstance", reflect.TypeOf((*MockIRdsClient)(nil).CreateInstance), ctx, node, params)
}
+// CreateRDSCluster mocks base method.
+func (m *MockIRdsClient) CreateRDSCluster(ctx context.Context, node *v1alpha1.StorageNode, params map[string]string) error {
+ m.ctrl.T.Helper()
+ ret := m.ctrl.Call(m, "CreateRDSCluster", ctx, node, params)
+ ret0, _ := ret[0].(error)
+ return ret0
+}
+
+// CreateRDSCluster indicates an expected call of CreateRDSCluster.
+func (mr *MockIRdsClientMockRecorder) CreateRDSCluster(ctx, node, params interface{}) *gomock.Call {
+ mr.mock.ctrl.T.Helper()
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateRDSCluster", reflect.TypeOf((*MockIRdsClient)(nil).CreateRDSCluster), ctx, node, params)
+}
+
// DeleteAuroraCluster mocks base method.
func (m *MockIRdsClient) DeleteAuroraCluster(ctx context.Context, node *v1alpha1.StorageNode, storageProvider *v1alpha1.StorageProvider) error {
m.ctrl.T.Helper()
@@ -120,6 +134,20 @@ func (mr *MockIRdsClientMockRecorder) DeleteInstance(ctx, node, storageProvider
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteInstance", reflect.TypeOf((*MockIRdsClient)(nil).DeleteInstance), ctx, node, storageProvider)
}
+// DeleteRDSCluster mocks base method.
+func (m *MockIRdsClient) DeleteRDSCluster(ctx context.Context, node *v1alpha1.StorageNode, storageProvider *v1alpha1.StorageProvider) error {
+ m.ctrl.T.Helper()
+ ret := m.ctrl.Call(m, "DeleteRDSCluster", ctx, node, storageProvider)
+ ret0, _ := ret[0].(error)
+ return ret0
+}
+
+// DeleteRDSCluster indicates an expected call of DeleteRDSCluster.
+func (mr *MockIRdsClientMockRecorder) DeleteRDSCluster(ctx, node, storageProvider interface{}) *gomock.Call {
+ mr.mock.ctrl.T.Helper()
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteRDSCluster", reflect.TypeOf((*MockIRdsClient)(nil).DeleteRDSCluster), ctx, node, storageProvider)
+}
+
// GetAuroraCluster mocks base method.
func (m *MockIRdsClient) GetAuroraCluster(ctx context.Context, node *v1alpha1.StorageNode) (*rds.DescCluster, error) {
m.ctrl.T.Helper()
@@ -180,6 +208,21 @@ func (mr *MockIRdsClientMockRecorder) GetInstancesByFilters(ctx, filters interfa
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetInstancesByFilters", reflect.TypeOf((*MockIRdsClient)(nil).GetInstancesByFilters), ctx, filters)
}
+// GetRDSCluster mocks base method.
+func (m *MockIRdsClient) GetRDSCluster(ctx context.Context, node *v1alpha1.StorageNode) (*rds.DescCluster, error) {
+ m.ctrl.T.Helper()
+ ret := m.ctrl.Call(m, "GetRDSCluster", ctx, node)
+ ret0, _ := ret[0].(*rds.DescCluster)
+ ret1, _ := ret[1].(error)
+ return ret0, ret1
+}
+
+// GetRDSCluster indicates an expected call of GetRDSCluster.
+func (mr *MockIRdsClientMockRecorder) GetRDSCluster(ctx, node interface{}) *gomock.Call {
+ mr.mock.ctrl.T.Helper()
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetRDSCluster", reflect.TypeOf((*MockIRdsClient)(nil).GetRDSCluster), ctx, node)
+}
+
// Instance mocks base method.
func (m *MockIRdsClient) Instance() rds.Instance {
m.ctrl.T.Helper()
diff --git a/shardingsphere-operator/pkg/reconcile/storagenode/aws/rdscluster.go b/shardingsphere-operator/pkg/reconcile/storagenode/aws/rdscluster.go
new file mode 100644
index 0000000..ae278ff
--- /dev/null
+++ b/shardingsphere-operator/pkg/reconcile/storagenode/aws/rdscluster.go
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package aws
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "strconv"
+ "strings"
+
+ "github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/api/v1alpha1"
+ "github.com/database-mesh/golang-sdk/aws/client/rds"
+)
+
+func validateCreateRDSClusterParams(node *v1alpha1.StorageNode, paramsPtr *map[string]string) error {
+ requiredParams := map[string]string{
+ "instanceClass": "instance class is empty",
+ "engine": "engine is empty",
+ "engineVersion": "engine version is empty",
+ "clusterIdentifier": "cluster identifier is empty",
+ "masterUsername": "master username is empty",
+ "masterUserPassword": "master user password is empty",
+ "allocatedStorage": "allocated storage is empty",
+ "iops": "iops is empty",
+ "storageType": "storage type is empty",
+ }
+
+ params := *paramsPtr
+ if v, ok := node.Annotations[v1alpha1.AnnotationsClusterIdentifier]; !ok || v == "" {
+ return errors.New("cluster identifier is empty")
+ } else {
+ params["clusterIdentifier"] = v
+ }
+
+ if len(params["clusterIdentifier"]) > 50 {
+ return errors.New("cluster identifier is too long, max length is 50")
+ }
+
+ for k, v := range requiredParams {
+ if val, ok := params[k]; !ok || val == "" {
+ return fmt.Errorf(v)
+ }
+ }
+
+ // valid mysql engine version
+ if params["engine"] == "mysql" {
+ version := strings.Split(params["engineVersion"], ".")[0]
+ if version != "8" {
+ return fmt.Errorf("mysql engine version is not supported, only support 8.x")
+ }
+ }
+
+ if params["storageType"] != "io1" {
+ return fmt.Errorf("storage type is not supported, only support io1")
+ }
+
+ return nil
+}
+
+func getAllocatedStorage(allocatedStorageStr string) (allocatedStorage int, err error) {
+ allocatedStorage, err = strconv.Atoi(allocatedStorageStr)
+ if err != nil {
+ return 0, fmt.Errorf("allocated storage is not a number, %v", err)
+ }
+ if allocatedStorage < 100 || allocatedStorage > 65536 {
+ return 0, fmt.Errorf("allocated storage is out of range, min is 100, max is 65536")
+ }
+ return allocatedStorage, nil
+}
+
+func getIOPS(iopsStr string) (iops int, err error) {
+ iops, err = strconv.Atoi(iopsStr)
+ if err != nil {
+ return 0, fmt.Errorf("iops is not a number, %v", err)
+ }
+ if iops < 1000 || iops > 256000 {
+ return 0, fmt.Errorf("iops is out of range, min is 1000, max is 256000")
+ }
+ return iops, nil
+}
+
+// CreateRDSCluster creates rds cluster
+// ref: https://docs.aws.amazon.com/zh_cn/AmazonRDS/latest/APIReference/API_CreateDBInstance.html
+func (c *RdsClient) CreateRDSCluster(ctx context.Context, node *v1alpha1.StorageNode, params map[string]string) error {
+ if err := validateCreateRDSClusterParams(node, ¶ms); err != nil {
+ return err
+ }
+
+ cc := c.Cluster()
+
+ allocatedStorage, err := getAllocatedStorage(params["allocatedStorage"])
+ if err != nil {
+ return err
+ }
+
+ iops, err := getIOPS(params["iops"])
+ if err != nil {
+ return err
+ }
+
+ if iops > allocatedStorage*50 || iops*2 < allocatedStorage {
+ return fmt.Errorf("the IOPS to GiB ratio must be between 0.5 and 50, current iops is %d, allocated storage is %d", iops, allocatedStorage)
+ }
+
+ cc.SetEngine(params["engine"]).
+ SetEngineVersion(params["engineVersion"]).
+ SetDBClusterIdentifier(params["clusterIdentifier"]).
+ SetMasterUsername(params["masterUsername"]).
+ SetMasterUserPassword(params["masterUserPassword"]).
+ SetAllocatedStorage(int32(allocatedStorage)).
+ SetDBClusterInstanceClass(params["instanceClass"]).
+ SetIOPS(int32(iops)).
+ SetStorageType(params["storageType"])
+
+ if v, ok := node.Annotations[v1alpha1.AnnotationsInstanceDBName]; ok && v != "" {
+ cc.SetDatabaseName(v)
+ }
+
+ if v, ok := params["publicAccessible"]; ok && v == "false" {
+ cc.SetPublicAccessible(false)
+ } else {
+ cc.SetPublicAccessible(true)
+ }
+
+ if params["vpcSecurityGroupIds"] != "" {
+ cc.SetVpcSecurityGroupIds(strings.Split(params["vpcSecurityGroupIds"], ","))
+ }
+
+ if err := cc.Create(ctx); err != nil {
+ return fmt.Errorf("create rds cluster failed, %v", err)
+ }
+
+ return nil
+}
+
+func (c *RdsClient) GetRDSCluster(ctx context.Context, node *v1alpha1.StorageNode) (cluster *rds.DescCluster, err error) {
+ identifier, ok := node.Annotations[v1alpha1.AnnotationsClusterIdentifier]
+ if !ok {
+ return nil, errors.New("cluster identifier is empty")
+ }
+
+ cc := c.Cluster()
+ cc.SetDBClusterIdentifier(identifier)
+ return cc.Describe(ctx)
+}
+
+func (c *RdsClient) DeleteRDSCluster(ctx context.Context, node *v1alpha1.StorageNode, storageProvider *v1alpha1.StorageProvider) error {
+ identifier, ok := node.Annotations[v1alpha1.AnnotationsClusterIdentifier]
+ if !ok {
+ return fmt.Errorf("cluster identifier is empty")
+ }
+
+ cc := c.Cluster()
+ cc.SetDBClusterIdentifier(identifier)
+
+ cluster, err := cc.Describe(ctx)
+ if err != nil {
+ return fmt.Errorf("describe rds cluster failed, %v", err)
+ }
+ if cluster == nil || cluster.Status == string(rds.DBClusterStatusDeleting) {
+ return nil
+ }
+
+ switch storageProvider.Spec.ReclaimPolicy {
+ case v1alpha1.StorageReclaimPolicyDelete:
+ cc.SetSkipFinalSnapshot(true)
+ case v1alpha1.StorageReclaimPolicyDeleteWithFinalSnapshot:
+ if v, ok := node.Annotations[v1alpha1.AnnotationsFinalSnapshotIdentifier]; !ok || v == "" {
+ return fmt.Errorf("final snapshot identifier is empty")
+ }
+ if cluster.Status != string(rds.DBClusterStatusAvailable) {
+ return fmt.Errorf("rds cluster is not available, can not delete with final snapshot")
+ }
+ cc.SetFinalDBSnapshotIdentifier(node.Annotations[v1alpha1.AnnotationsFinalSnapshotIdentifier])
+ cc.SetSkipFinalSnapshot(false)
+ case v1alpha1.StorageReclaimPolicyRetain:
+ return fmt.Errorf("rds cluster does not support retain policy")
+ }
+
+ return cc.Delete(ctx)
+}
diff --git a/shardingsphere-operator/test/e2e/storage_node_controller_test.go b/shardingsphere-operator/test/e2e/storage_node_controller_test.go
index f6744b7..47508ff 100644
--- a/shardingsphere-operator/test/e2e/storage_node_controller_test.go
+++ b/shardingsphere-operator/test/e2e/storage_node_controller_test.go
@@ -324,7 +324,7 @@ var _ = Describe("StorageNode Controller Suite Test For AWS Aurora Cluster", fun
monkey.PatchInstanceMethod(reflect.TypeOf(&aws.RdsClient{}), "GetAuroraCluster", func(_ *aws.RdsClient, _ context.Context, _ *v1alpha1.StorageNode) (*dbmesh_rds.DescCluster, error) {
return &dbmesh_rds.DescCluster{
DBClusterIdentifier: clusterIdentifier,
- Status: dbmesh_rds.DBClusterStatusCreating,
+ Status: string(dbmesh_rds.DBClusterStatusCreating),
PrimaryEndpoint: "test-primary-endpoint",
ReaderEndpoint: "test-reader-endpoint",
Port: 3306,
@@ -359,7 +359,7 @@ var _ = Describe("StorageNode Controller Suite Test For AWS Aurora Cluster", fun
newSN := &v1alpha1.StorageNode{}
Expect(k8sClient.Get(ctx, client.ObjectKey{Name: snName, Namespace: "default"}, newSN)).Should(Succeed())
return newSN.Status.Cluster.Status
- }, time.Second*10, time.Millisecond*250).Should(Equal(dbmesh_rds.DBClusterStatusCreating))
+ }, time.Second*10, time.Millisecond*250).Should(Equal(string(dbmesh_rds.DBClusterStatusCreating)))
})
It("should success when cluster is available", func() {
@@ -367,7 +367,7 @@ var _ = Describe("StorageNode Controller Suite Test For AWS Aurora Cluster", fun
monkey.PatchInstanceMethod(reflect.TypeOf(&aws.RdsClient{}), "GetAuroraCluster", func(_ *aws.RdsClient, _ context.Context, _ *v1alpha1.StorageNode) (*dbmesh_rds.DescCluster, error) {
return &dbmesh_rds.DescCluster{
DBClusterIdentifier: clusterIdentifier,
- Status: dbmesh_rds.DBClusterStatusAvailable,
+ Status: string(dbmesh_rds.DBClusterStatusAvailable),
PrimaryEndpoint: "test-primary-endpoint",
ReaderEndpoint: "test-reader-endpoint",
Port: 3306,