You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by mi...@apache.org on 2023/06/07 02:21:33 UTC
[shardingsphere-on-cloud] branch main updated: feat: storage node support register and unregsiter for aws aurora cluster
This is an automated email from the ASF dual-hosted git repository.
miaoliyao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/shardingsphere-on-cloud.git
The following commit(s) were added to refs/heads/main by this push:
new 8c5509b feat: storage node support register and unregsiter for aws aurora cluster
new b5fe12e Merge pull request #398 from Xu-Wentao/sn-aws-aurora
8c5509b is described below
commit 8c5509b03c07aca9f680a8bc056c266c1f6c25d9
Author: xuwentao <cu...@yahoo.com>
AuthorDate: Tue Jun 6 20:57:09 2023 +0800
feat: storage node support register and unregsiter for aws aurora cluster
---
.../api/v1alpha1/storage_node_types.go | 10 +-
shardingsphere-operator/go.mod | 2 +-
shardingsphere-operator/go.sum | 2 +
.../controllers/storage_ndoe_controller_test.go | 220 +++++++++++++++++++--
.../pkg/controllers/storage_node_controller.go | 93 ++++++---
.../pkg/reconcile/storagenode/aws/aurora.go | 82 +++++---
.../pkg/reconcile/storagenode/aws/aurora_test.go | 160 +++++++++++++++
.../pkg/reconcile/storagenode/aws/rdsinstance.go | 9 -
.../test/e2e/storage_node_controller_test.go | 133 ++++++++++++-
9 files changed, 625 insertions(+), 86 deletions(-)
diff --git a/shardingsphere-operator/api/v1alpha1/storage_node_types.go b/shardingsphere-operator/api/v1alpha1/storage_node_types.go
index 165f7a7..e92d78a 100644
--- a/shardingsphere-operator/api/v1alpha1/storage_node_types.go
+++ b/shardingsphere-operator/api/v1alpha1/storage_node_types.go
@@ -116,8 +116,16 @@ type StorageNode struct {
type StorageNodeSpec struct {
// +kubebuilder:validation:Required
StorageProviderName string `json:"storageProviderName"`
- // +optional
+ // +optional the default database name of the storage node.
+ // if not set, will NOT create database
Schema string `json:"schema"`
+ // +optional
+ // only for cluster provider like AWS RDS Cluster/ AWS Aurora Cluster
+ // The Default value is 1 for cluster provider
+ // will not be effective for single instance, instance will always be 1 for single instance
+ // Example: 2, means 2 instances in the cluster(1 primary + 1 reader)
+ // +kubebuilder:default=1
+ Replicas int32 `json:"replicas"`
}
// StorageNodeStatus defines the actual state of a set of storage units
diff --git a/shardingsphere-operator/go.mod b/shardingsphere-operator/go.mod
index 60807e4..49ffba9 100644
--- a/shardingsphere-operator/go.mod
+++ b/shardingsphere-operator/go.mod
@@ -8,7 +8,7 @@ require (
github.com/antlr/antlr4 v0.0.0-20181218183524-be58ebffde8e
github.com/chaos-mesh/chaos-mesh/api v0.0.0-20230517110555-afab5b4a7813
github.com/cloudnative-pg/cloudnative-pg v1.20.0
- github.com/database-mesh/golang-sdk v0.0.0-20230605093335-916ac7abc788
+ github.com/database-mesh/golang-sdk v0.0.0-20230606100535-23037381e4fb
github.com/go-logr/logr v1.2.4
github.com/go-sql-driver/mysql v1.7.1
github.com/golang/mock v1.6.0
diff --git a/shardingsphere-operator/go.sum b/shardingsphere-operator/go.sum
index e03b58d..fec74ac 100644
--- a/shardingsphere-operator/go.sum
+++ b/shardingsphere-operator/go.sum
@@ -68,6 +68,8 @@ github.com/database-mesh/golang-sdk v0.0.0-20230605075457-a525bc484e78 h1:442d1d
github.com/database-mesh/golang-sdk v0.0.0-20230605075457-a525bc484e78/go.mod h1:yUEdo+aGdROl9oC7A1GeDB9/ubUtV2k73uLL+qC3PC4=
github.com/database-mesh/golang-sdk v0.0.0-20230605093335-916ac7abc788 h1:YEF8BDXHnEiek/EnDVbTCOrVDP7OT3v/R3a8mGM6+vc=
github.com/database-mesh/golang-sdk v0.0.0-20230605093335-916ac7abc788/go.mod h1:yUEdo+aGdROl9oC7A1GeDB9/ubUtV2k73uLL+qC3PC4=
+github.com/database-mesh/golang-sdk v0.0.0-20230606100535-23037381e4fb h1:p3tpHo24HjA7rW/JMjD9/6klWAVMn4fIefHXKgggVAg=
+github.com/database-mesh/golang-sdk v0.0.0-20230606100535-23037381e4fb/go.mod h1:yUEdo+aGdROl9oC7A1GeDB9/ubUtV2k73uLL+qC3PC4=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
diff --git a/shardingsphere-operator/pkg/controllers/storage_ndoe_controller_test.go b/shardingsphere-operator/pkg/controllers/storage_ndoe_controller_test.go
index cff4499..24dc250 100644
--- a/shardingsphere-operator/pkg/controllers/storage_ndoe_controller_test.go
+++ b/shardingsphere-operator/pkg/controllers/storage_ndoe_controller_test.go
@@ -19,6 +19,7 @@ package controllers
import (
"context"
+ "fmt"
"time"
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/api/v1alpha1"
@@ -405,7 +406,6 @@ var _ = Describe("StorageNode Controller Mock Test For AWS Rds Instance", func()
Annotations: map[string]string{
AnnotationKeyRegisterStorageUnitEnabled: "true",
v1alpha1.AnnotationsInstanceDBName: "test_db",
- AnnotationKeyComputeNodeNamespace: defaultTestNamespace,
AnnotationKeyComputeNodeName: cnName,
AnnotationKeyLogicDatabaseName: "sharding_db",
},
@@ -548,8 +548,7 @@ var _ = Describe("StorageNode Controller Mock Test For AWS Rds Instance", func()
Name: "test-get-shardingsphere-server",
Namespace: defaultTestNamespace,
Annotations: map[string]string{
- AnnotationKeyComputeNodeName: "test-get-shardingsphere-server",
- AnnotationKeyComputeNodeNamespace: defaultTestNamespace,
+ AnnotationKeyComputeNodeName: "test-get-shardingsphere-server",
},
},
}
@@ -618,7 +617,6 @@ var _ = Describe("StorageNode Controller Mock Test For AWS Rds Instance", func()
Namespace: defaultTestNamespace,
Annotations: map[string]string{
AnnotationKeyComputeNodeName: testName,
- AnnotationKeyComputeNodeNamespace: defaultTestNamespace,
AnnotationKeyRegisterStorageUnitEnabled: "true",
AnnotationKeyLogicDatabaseName: testName,
v1alpha1.AnnotationsInstanceDBName: testName,
@@ -638,7 +636,7 @@ var _ = Describe("StorageNode Controller Mock Test For AWS Rds Instance", func()
},
}
- dbClass := &v1alpha1.StorageProvider{
+ storageProvider := &v1alpha1.StorageProvider{
ObjectMeta: metav1.ObjectMeta{
Name: defaultTestStorageProvider,
},
@@ -655,7 +653,7 @@ var _ = Describe("StorageNode Controller Mock Test For AWS Rds Instance", func()
mockSS.EXPECT().Close().Return(nil)
mockSS.EXPECT().RegisterStorageUnit(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil)
- Expect(reconciler.registerStorageUnit(ctx, sn, dbClass)).To(BeNil())
+ Expect(reconciler.registerStorageUnit(ctx, sn, storageProvider)).To(BeNil())
Expect(sn.Status.Registered).To(BeTrue())
})
@@ -720,7 +718,6 @@ var _ = Describe("StorageNode Controller Mock Test For AWS Rds Instance", func()
AnnotationKeyLogicDatabaseName: testName,
v1alpha1.AnnotationsInstanceDBName: testName,
AnnotationKeyComputeNodeName: testName,
- AnnotationKeyComputeNodeNamespace: defaultTestNamespace,
},
},
Status: v1alpha1.StorageNodeStatus{
@@ -738,26 +735,35 @@ var _ = Describe("StorageNode Controller Mock Test For AWS Rds Instance", func()
})
var _ = Describe("StorageNode Controller Mock Test For AWS Aurora", func() {
+ var provider *v1alpha1.StorageProvider
BeforeEach(func() {
- provider := v1alpha1.StorageProvider{
+ provider = &v1alpha1.StorageProvider{
ObjectMeta: metav1.ObjectMeta{
Name: "aws-aurora",
},
Spec: v1alpha1.StorageProviderSpec{
Provisioner: v1alpha1.ProvisionerAWSAurora,
- Parameters: map[string]string{},
+ Parameters: map[string]string{
+ "engine": "aurora-mysql",
+ "engineVersion": "5.7",
+ "masterUsername": "root",
+ "masterUserPassword": "root",
+ },
},
}
- Expect(fakeClient.Create(ctx, &provider)).Should(Succeed())
+ Expect(fakeClient.Create(ctx, provider)).Should(Succeed())
// mock aws client
// mock aws rds client
mockCtrl = gomock.NewController(GinkgoT())
mockAws = mock_aws.NewMockIRdsClient(mockCtrl)
-
monkey.Patch(aws.NewRdsClient, func(rds dbmesh_rds.RDS) aws.IRdsClient {
return mockAws
})
+ mockSS = mock_shardingsphere.NewMockIServer(mockCtrl)
+ monkey.Patch(shardingsphere.NewServer, func(_, _ string, _ uint, _, _ string) (shardingsphere.IServer, error) {
+ return mockSS, nil
+ })
})
AfterEach(func() {
@@ -973,5 +979,197 @@ var _ = Describe("StorageNode Controller Mock Test For AWS Aurora", func() {
err = fakeClient.Get(ctx, namespacedName, storageNode)
Expect(apierrors.IsNotFound(err)).To(BeTrue())
})
+
+ It("should be success when storage node is ready for register", func() {
+ name := "test-aws-aurora-ready-for-register"
+ namespacedName := types.NamespacedName{
+ Name: name,
+ Namespace: defaultTestNamespace,
+ }
+ req := ctrl.Request{NamespacedName: namespacedName}
+ storageNode := &v1alpha1.StorageNode{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: name,
+ Namespace: defaultTestNamespace,
+ Annotations: map[string]string{
+ v1alpha1.AnnotationsClusterIdentifier: "test-aws-aurora",
+ AnnotationKeyRegisterStorageUnitEnabled: "true",
+ AnnotationKeyLogicDatabaseName: "test-logic-db",
+ v1alpha1.AnnotationsInstanceDBName: "test-instance-db",
+ AnnotationKeyComputeNodeName: "test-compute-node",
+ },
+ },
+ Spec: v1alpha1.StorageNodeSpec{
+ StorageProviderName: "aws-aurora",
+ },
+ Status: v1alpha1.StorageNodeStatus{
+ Phase: v1alpha1.StorageNodePhaseReady,
+ Cluster: v1alpha1.ClusterStatus{
+ Status: dbmesh_rds.DBClusterStatusAvailable,
+ PrimaryEndpoint: v1alpha1.Endpoint{Address: "test-aws-aurora.cluster-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306},
+ ReaderEndpoints: []v1alpha1.Endpoint{{Address: "test-aws-aurora.cluster-ro-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306}},
+ },
+ Instances: []v1alpha1.InstanceStatus{
+ {
+ Status: string(dbmesh_rds.DBInstanceStatusAvailable),
+ Endpoint: v1alpha1.Endpoint{Address: "test-aws-aurora-1.cluster-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306},
+ },
+ },
+ },
+ }
+ cn := &v1alpha1.ComputeNode{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "test-compute-node",
+ Namespace: defaultTestNamespace,
+ },
+ Spec: v1alpha1.ComputeNodeSpec{
+ Bootstrap: v1alpha1.BootstrapConfig{
+ ServerConfig: v1alpha1.ServerConfig{
+ Authority: v1alpha1.ComputeNodeAuthority{
+ Users: []v1alpha1.ComputeNodeUser{{User: "test-user", Password: "test-password"}},
+ },
+ Mode: v1alpha1.ComputeNodeServerMode{},
+ Props: nil,
+ },
+ },
+ },
+ }
+ svc := &corev1.Service{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "test-compute-node",
+ Namespace: defaultTestNamespace,
+ },
+ Spec: corev1.ServiceSpec{
+ Ports: []corev1.ServicePort{
+ {Name: "http", Protocol: "TCP", Port: 3307},
+ },
+ },
+ }
+ Expect(fakeClient.Create(ctx, storageNode)).Should(Succeed())
+ Expect(fakeClient.Create(ctx, cn)).Should(Succeed())
+ Expect(fakeClient.Create(ctx, svc)).Should(Succeed())
+
+ // mock aws aurora is available
+ mockAws.EXPECT().GetAuroraCluster(gomock.Any(), gomock.Any()).Return(&dbmesh_rds.DescCluster{
+ DBClusterIdentifier: "test-aws-aurora",
+ PrimaryEndpoint: "test-aws-aurora.cluster-xxxxxx.us-east-1.rds.amazonaws.com",
+ Port: int32(3306),
+ Status: dbmesh_rds.DBClusterStatusAvailable,
+ }, nil).Times(1)
+ // mock get instances of aws aurora are available
+ mockAws.EXPECT().GetInstancesByFilters(gomock.Any(), gomock.Any()).Return([]*dbmesh_rds.DescInstance{
+ {
+ DBInstanceIdentifier: "test-aws-aurora-instance-0",
+ DBInstanceStatus: dbmesh_rds.DBInstanceStatusAvailable,
+ Endpoint: dbmesh_rds.Endpoint{Address: "test-aws-aurora-1.cluster-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306},
+ },
+ }, nil).Times(1)
+
+ dsName, host, port, username, password := getDatasourceInfoFromCluster(storageNode, provider)
+
+ // mock shardingsphere
+ mockSS.EXPECT().CreateDatabase(gomock.Any()).Return(nil).Times(1)
+ mockSS.EXPECT().RegisterStorageUnit("test-logic-db", dsName, host, uint(port), "test-instance-db", username, password).Return(nil).Times(1)
+ mockSS.EXPECT().Close().Return(nil).Times(1)
+
+ _, err := reconciler.Reconcile(ctx, req)
+ Expect(err).To(BeNil())
+
+ err = fakeClient.Get(ctx, namespacedName, storageNode)
+ Expect(storageNode.Status.Registered).To(BeTrue())
+ })
+
+ It("should be success unregistered when storage node is deleting", func() {
+ snName := "test-aws-aurora-unregistered"
+ namespacedName := types.NamespacedName{
+ Name: snName,
+ Namespace: defaultTestNamespace,
+ }
+ req := ctrl.Request{NamespacedName: namespacedName}
+
+ storageNode := &v1alpha1.StorageNode{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: snName,
+ Namespace: defaultTestNamespace,
+ Annotations: map[string]string{
+ AnnotationKeyRegisterStorageUnitEnabled: "true",
+ AnnotationKeyLogicDatabaseName: "test-logic-db",
+ v1alpha1.AnnotationsInstanceDBName: "test-instance-db",
+ AnnotationKeyComputeNodeName: "test-compute-node",
+ },
+ },
+ Spec: v1alpha1.StorageNodeSpec{
+ StorageProviderName: "aws-aurora",
+ Replicas: 2,
+ },
+ Status: v1alpha1.StorageNodeStatus{
+ Phase: v1alpha1.StorageNodePhaseReady,
+ Registered: true,
+ },
+ }
+ cn := &v1alpha1.ComputeNode{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "test-compute-node",
+ Namespace: defaultTestNamespace,
+ },
+ Spec: v1alpha1.ComputeNodeSpec{
+ Bootstrap: v1alpha1.BootstrapConfig{
+ ServerConfig: v1alpha1.ServerConfig{
+ Authority: v1alpha1.ComputeNodeAuthority{
+ Users: []v1alpha1.ComputeNodeUser{{User: "test-user", Password: "test-password"}},
+ },
+ },
+ },
+ },
+ }
+ svc := &corev1.Service{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "test-compute-node",
+ Namespace: defaultTestNamespace,
+ },
+ Spec: corev1.ServiceSpec{
+ Ports: []corev1.ServicePort{
+ {Name: "http", Protocol: "TCP", Port: 3307},
+ },
+ },
+ }
+ Expect(fakeClient.Create(ctx, storageNode)).Should(Succeed())
+ Expect(fakeClient.Create(ctx, cn)).Should(Succeed())
+ Expect(fakeClient.Create(ctx, svc)).Should(Succeed())
+
+ // mock aws aurora is available
+ mockAws.EXPECT().GetAuroraCluster(gomock.Any(), gomock.Any()).Return(&dbmesh_rds.DescCluster{
+ DBClusterIdentifier: "test-aws-aurora",
+ PrimaryEndpoint: "test-aws-aurora.cluster-xxxxxx.us-east-1.rds.amazonaws.com",
+ Port: int32(3306),
+ Status: dbmesh_rds.DBClusterStatusAvailable,
+ }, nil).Times(2)
+ // mock get instances of aws aurora are available
+ mockAws.EXPECT().GetInstancesByFilters(gomock.Any(), gomock.Any()).Return([]*dbmesh_rds.DescInstance{
+ {
+ DBInstanceIdentifier: "test-aws-aurora-instance-0",
+ DBInstanceStatus: dbmesh_rds.DBInstanceStatusAvailable,
+ Endpoint: dbmesh_rds.Endpoint{Address: "test-aws-aurora-1.cluster-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306},
+ },
+ }, nil).Times(2)
+
+ _, err := reconciler.Reconcile(ctx, req)
+ Expect(err).To(BeNil())
+
+ Expect(fakeClient.Delete(ctx, storageNode)).Should(Succeed())
+ // mock shardingsphere
+ mockSS.EXPECT().UnRegisterStorageUnit("test-logic-db", fmt.Sprintf("ds_%s", storageNode.GetName())).Return(nil).Times(1)
+ mockSS.EXPECT().Close().Return(nil).Times(1)
+
+ // mock delete aws aurora
+ mockAws.EXPECT().DeleteAuroraCluster(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1)
+
+ _, err = reconciler.Reconcile(ctx, req)
+ Expect(err).To(BeNil())
+
+ err = fakeClient.Get(ctx, namespacedName, storageNode)
+ Expect(storageNode.Status.Registered).To(BeFalse())
+ })
})
+
})
diff --git a/shardingsphere-operator/pkg/controllers/storage_node_controller.go b/shardingsphere-operator/pkg/controllers/storage_node_controller.go
index e26c63d..f4fb541 100644
--- a/shardingsphere-operator/pkg/controllers/storage_node_controller.go
+++ b/shardingsphere-operator/pkg/controllers/storage_node_controller.go
@@ -48,7 +48,6 @@ const (
FinalizerName = "shardingsphere.apache.org/finalizer"
AnnotationKeyRegisterStorageUnitEnabled = "shardingsphere.apache.org/register-storage-unit-enabled"
- AnnotationKeyComputeNodeNamespace = "shardingsphere.apache.org/compute-node-namespace"
AnnotationKeyComputeNodeName = "shardingsphere.apache.org/compute-node-name"
AnnotationKeyLogicDatabaseName = "shardingsphere.apache.org/logic-database-name"
@@ -110,6 +109,8 @@ func (r *StorageNodeReconciler) Reconcile(ctx context.Context, req ctrl.Request)
func (r *StorageNodeReconciler) finalize(ctx context.Context, node *v1alpha1.StorageNode, storageProvider *v1alpha1.StorageProvider) (ctrl.Result, error) {
var err error
+ var oldStatus = node.Status.DeepCopy()
+
switch node.Status.Phase {
case v1alpha1.StorageNodePhaseReady, v1alpha1.StorageNodePhaseNotReady:
// set storage node status to deleting
@@ -139,7 +140,7 @@ func (r *StorageNodeReconciler) finalize(ctx context.Context, node *v1alpha1.Sto
desiredState := computeDesiredState(node.Status)
- if !reflect.DeepEqual(node.Status, desiredState) {
+ if !reflect.DeepEqual(oldStatus, desiredState) {
node.Status = desiredState
err := r.Status().Update(ctx, node)
if err != nil {
@@ -151,39 +152,41 @@ func (r *StorageNodeReconciler) finalize(ctx context.Context, node *v1alpha1.Sto
return ctrl.Result{RequeueAfter: defaultRequeueTime}, nil
}
-func (r *StorageNodeReconciler) reconcile(ctx context.Context, dbClass *v1alpha1.StorageProvider, node *v1alpha1.StorageNode) (ctrl.Result, error) {
+func (r *StorageNodeReconciler) reconcile(ctx context.Context, storageProvider *v1alpha1.StorageProvider, node *v1alpha1.StorageNode) (ctrl.Result, error) {
var err error
+ var oldStatus = node.Status.DeepCopy()
+
// reconcile storage node with storageProvider
- switch dbClass.Spec.Provisioner {
+ switch storageProvider.Spec.Provisioner {
case v1alpha1.ProvisionerAWSRDSInstance:
- if err := r.reconcileAwsRdsInstance(ctx, aws.NewRdsClient(r.AwsRDS), node, dbClass); err != nil {
+ if err := r.reconcileAwsRdsInstance(ctx, aws.NewRdsClient(r.AwsRDS), node, storageProvider); err != nil {
r.Recorder.Eventf(node, corev1.EventTypeWarning, "Reconcile Failed", fmt.Sprintf("unable to reconcile AWS RDS Instance %s/%s, err:%s", node.GetNamespace(), node.GetName(), err.Error()))
return ctrl.Result{RequeueAfter: defaultRequeueTime}, err
}
case v1alpha1.ProvisionerAWSAurora:
- if err := r.reconcileAwsAurora(ctx, aws.NewRdsClient(r.AwsRDS), node, dbClass); err != nil {
+ if err := r.reconcileAwsAurora(ctx, aws.NewRdsClient(r.AwsRDS), node, storageProvider); err != nil {
r.Recorder.Eventf(node, corev1.EventTypeWarning, "Reconcile Failed", fmt.Sprintf("unable to reconcile AWS Aurora %s/%s, err:%s", node.GetNamespace(), node.GetName(), err.Error()))
return ctrl.Result{RequeueAfter: defaultRequeueTime}, err
}
case v1alpha1.ProvisionerCloudNativePG:
- if err := r.reconcileCloudNativePG(ctx, node, dbClass); err != nil {
+ if err := r.reconcileCloudNativePG(ctx, node, storageProvider); err != nil {
r.Recorder.Eventf(node, corev1.EventTypeWarning, "Reconcile Failed", fmt.Sprintf("unable to reconcile CloudNative PG %s/%s, err:%s", node.GetNamespace(), node.GetName(), err.Error()))
return ctrl.Result{RequeueAfter: defaultRequeueTime}, err
}
default:
- r.Recorder.Event(node, corev1.EventTypeWarning, "UnsupportedDatabaseProvisioner", fmt.Sprintf("unsupported database provisioner %s", dbClass.Spec.Provisioner))
+ r.Recorder.Event(node, corev1.EventTypeWarning, "UnsupportedDatabaseProvisioner", fmt.Sprintf("unsupported database provisioner %s", storageProvider.Spec.Provisioner))
return ctrl.Result{RequeueAfter: defaultRequeueTime}, err
}
// register storage unit if needed.
- if err := r.registerStorageUnit(ctx, node, dbClass); err != nil {
+ if err := r.registerStorageUnit(ctx, node, storageProvider); err != nil {
r.Recorder.Eventf(node, corev1.EventTypeWarning, "RegisterStorageUnitFailed", "unable to register storage unit %s/%s", node.GetNamespace(), node.GetName())
return ctrl.Result{Requeue: true}, err
}
desiredState := computeDesiredState(node.Status)
- if !reflect.DeepEqual(node.Status, desiredState) {
+ if !reflect.DeepEqual(oldStatus, desiredState) {
node.Status = desiredState
err := r.Status().Update(ctx, node)
if err != nil {
@@ -323,14 +326,14 @@ func allInstancesReady(instances []v1alpha1.InstanceStatus) bool {
return true
}
-func (r *StorageNodeReconciler) reconcileAwsRdsInstance(ctx context.Context, client aws.IRdsClient, node *v1alpha1.StorageNode, dbClass *v1alpha1.StorageProvider) error {
+func (r *StorageNodeReconciler) reconcileAwsRdsInstance(ctx context.Context, client aws.IRdsClient, node *v1alpha1.StorageNode, storageProvider *v1alpha1.StorageProvider) error {
instance, err := client.GetInstance(ctx, node)
if err != nil {
return err
}
if instance == nil && node.Status.Phase != v1alpha1.StorageNodePhaseDeleting {
- err = client.CreateInstance(ctx, node, dbClass.Spec.Parameters)
+ err = client.CreateInstance(ctx, node, storageProvider.Spec.Parameters)
if err != nil {
return err
}
@@ -498,7 +501,7 @@ func (r *StorageNodeReconciler) deleteAWSAurora(ctx context.Context, client aws.
}
// registerStorageUnit
-func (r *StorageNodeReconciler) registerStorageUnit(ctx context.Context, node *v1alpha1.StorageNode, dbClass *v1alpha1.StorageProvider) error {
+func (r *StorageNodeReconciler) registerStorageUnit(ctx context.Context, node *v1alpha1.StorageNode, storageProvider *v1alpha1.StorageProvider) error {
// if register storage unit is not enabled, return
if node.Annotations[AnnotationKeyRegisterStorageUnitEnabled] != "true" {
return nil
@@ -534,22 +537,17 @@ func (r *StorageNodeReconciler) registerStorageUnit(ctx context.Context, node *v
}
r.Recorder.Eventf(node, corev1.EventTypeNormal, "LogicDatabaseCreated", "LogicDatabase %s is created", logicDBName)
- // TODO add cluster
-
- ins := node.Status.Instances[0]
- host := ins.Endpoint.Address
- port := ins.Endpoint.Port
- username := node.Annotations[v1alpha1.AnnotationsMasterUsername]
- if username == "" {
- username = dbClass.Spec.Parameters["masterUsername"]
- }
- password := node.Annotations[v1alpha1.AnnotationsMasterUserPassword]
- if password == "" {
- password = dbClass.Spec.Parameters["masterUserPassword"]
+ var dsName, host string
+ var port int32
+ var username, password string
+ // get storage unit info from instance
+ if node.Status.Cluster.Status == "" {
+ dsName, host, port, username, password = getDatasourceInfoFromInstance(node, storageProvider)
+ } else {
+ dsName, host, port, username, password = getDatasourceInfoFromCluster(node, storageProvider)
}
- // TODO how to set ds name?
- if err := ssServer.RegisterStorageUnit(logicDBName, "ds_0", host, uint(port), dbName, username, password); err != nil {
+ if err := ssServer.RegisterStorageUnit(logicDBName, dsName, host, uint(port), dbName, username, password); err != nil {
return fmt.Errorf("register storage node failed: %w", err)
}
r.Recorder.Eventf(node, corev1.EventTypeNormal, "StorageUnitRegistered", "StorageUnit %s:%d/%s is registered", host, port, dbName)
@@ -558,6 +556,38 @@ func (r *StorageNodeReconciler) registerStorageUnit(ctx context.Context, node *v
return nil
}
+func getDatasourceInfoFromInstance(node *v1alpha1.StorageNode, storageProvider *v1alpha1.StorageProvider) (dsName, host string, port int32, username, password string) {
+ dsName = fmt.Sprintf("ds_%s", node.GetName())
+ ins := node.Status.Instances[0]
+ host = ins.Endpoint.Address
+ port = ins.Endpoint.Port
+ username = node.Annotations[v1alpha1.AnnotationsMasterUsername]
+ if username == "" {
+ username = storageProvider.Spec.Parameters["masterUsername"]
+ }
+ password = node.Annotations[v1alpha1.AnnotationsMasterUserPassword]
+ if password == "" {
+ password = storageProvider.Spec.Parameters["masterUserPassword"]
+ }
+ return
+}
+
+func getDatasourceInfoFromCluster(node *v1alpha1.StorageNode, storageProvider *v1alpha1.StorageProvider) (dsName, host string, port int32, username, password string) {
+ dsName = fmt.Sprintf("ds_%s", node.GetName())
+ cluster := node.Status.Cluster
+ host = cluster.PrimaryEndpoint.Address
+ port = cluster.PrimaryEndpoint.Port
+ username = node.Annotations[v1alpha1.AnnotationsMasterUsername]
+ if username == "" {
+ username = storageProvider.Spec.Parameters["masterUsername"]
+ }
+ password = node.Annotations[v1alpha1.AnnotationsMasterUserPassword]
+ if password == "" {
+ password = storageProvider.Spec.Parameters["masterUserPassword"]
+ }
+ return
+}
+
func (r *StorageNodeReconciler) unregisterStorageUnit(ctx context.Context, node *v1alpha1.StorageNode) error {
if !node.Status.Registered {
return nil
@@ -575,8 +605,8 @@ func (r *StorageNodeReconciler) unregisterStorageUnit(ctx context.Context, node
defer ssServer.Close()
- // TODO how to set ds name?
- if err := ssServer.UnRegisterStorageUnit(logicDBName, "ds_0"); err != nil {
+ dsName := fmt.Sprintf("ds_%s", node.GetName())
+ if err := ssServer.UnRegisterStorageUnit(logicDBName, dsName); err != nil {
return fmt.Errorf("unregister storage unit failed: %w", err)
}
@@ -590,7 +620,6 @@ func (r *StorageNodeReconciler) validateComputeNodeAnnotations(node *v1alpha1.St
requiredAnnos := []string{
AnnotationKeyLogicDatabaseName,
v1alpha1.AnnotationsInstanceDBName,
- AnnotationKeyComputeNodeNamespace,
AnnotationKeyComputeNodeName,
}
@@ -614,7 +643,7 @@ func (r *StorageNodeReconciler) getShardingsphereServer(ctx context.Context, nod
cn := &v1alpha1.ComputeNode{}
if err := r.Client.Get(ctx, types.NamespacedName{
Name: node.Annotations[AnnotationKeyComputeNodeName],
- Namespace: node.Annotations[AnnotationKeyComputeNodeNamespace],
+ Namespace: node.Namespace,
}, cn); err != nil {
return nil, fmt.Errorf("get compute node failed: %w", err)
}
@@ -637,7 +666,7 @@ func (r *StorageNodeReconciler) getShardingsphereServer(ctx context.Context, nod
// get service of compute node
svc, err := r.Service.GetByNamespacedName(ctx, types.NamespacedName{
Name: node.Annotations[AnnotationKeyComputeNodeName],
- Namespace: node.Annotations[AnnotationKeyComputeNodeNamespace],
+ Namespace: node.Namespace,
})
if err != nil || svc == nil {
diff --git a/shardingsphere-operator/pkg/reconcile/storagenode/aws/aurora.go b/shardingsphere-operator/pkg/reconcile/storagenode/aws/aurora.go
index 21f51aa..38ad0b7 100644
--- a/shardingsphere-operator/pkg/reconcile/storagenode/aws/aurora.go
+++ b/shardingsphere-operator/pkg/reconcile/storagenode/aws/aurora.go
@@ -26,29 +26,57 @@ import (
"github.com/database-mesh/golang-sdk/aws/client/rds"
)
+func validateCreateAuroraParams(node *v1alpha1.StorageNode, paramsPtr *map[string]string) error {
+ requiredParams := map[string]string{
+ "instanceClass": "instance class is empty",
+ "engine": "engine is empty",
+ "engineVersion": "engine version is empty",
+ "clusterIdentifier": "cluster identifier is empty",
+ "masterUsername": "master username is empty",
+ "masterUserPassword": "master user password is empty",
+ }
+ params := *paramsPtr
+ if v, ok := node.Annotations[v1alpha1.AnnotationsClusterIdentifier]; !ok || v == "" {
+ return errors.New("cluster identifier is empty")
+ } else {
+ params["clusterIdentifier"] = v
+ }
+
+ if len(params["clusterIdentifier"]) > 50 {
+ return errors.New("cluster identifier is too long, max length is 50")
+ }
+
+ for k, v := range requiredParams {
+ if val, ok := params[k]; !ok || val == "" {
+ return fmt.Errorf(v)
+ }
+ }
+ return nil
+}
+
// CreateAuroraCluster creates aurora cluster
// ref: https://docs.aws.amazon.com/zh_cn/AmazonRDS/latest/APIReference/API_CreateDBInstance.html
func (c *RdsClient) CreateAuroraCluster(ctx context.Context, node *v1alpha1.StorageNode, params map[string]string) error {
+ if err := validateCreateAuroraParams(node, ¶ms); err != nil {
+ return err
+ }
+
aurora := c.Aurora()
// set required params
aurora.SetDBInstanceClass(params["instanceClass"]).
SetEngine(params["engine"]).
- SetDBClusterIdentifier(params["clusterIdentifier"])
+ SetEngineVersion(params["engineVersion"]).
+ SetDBClusterIdentifier(params["clusterIdentifier"]).
+ SetMasterUsername(params["masterUsername"]).
+ SetMasterUserPassword(params["masterUserPassword"]).
+ SetInstanceNumber(node.Spec.Replicas)
- // set optional params
- if params["engineVersion"] != "" {
- aurora.SetEngineVersion(params["engineVersion"])
- }
- if params["masterUsername"] != "" {
- aurora.SetMasterUsername(params["masterUsername"])
- }
- if params["masterUserPassword"] != "" {
- aurora.SetMasterUserPassword(params["masterUserPassword"])
+ if err := aurora.Create(ctx); err != nil {
+ return fmt.Errorf("create aurora cluster failed, %v", err)
}
- err := aurora.Create(ctx)
- return err
+ return nil
}
func (c *RdsClient) GetAuroraCluster(ctx context.Context, node *v1alpha1.StorageNode) (cluster *rds.DescCluster, err error) {
@@ -56,10 +84,6 @@ func (c *RdsClient) GetAuroraCluster(ctx context.Context, node *v1alpha1.Storage
if !ok {
return nil, errors.New("cluster identifier is empty")
}
- if node.Status.Cluster.Properties == nil || node.Status.Cluster.Properties["clusterIdentifier"] == "" {
- // cluster not created
- return nil, nil
- }
aurora := c.Aurora()
aurora.SetDBClusterIdentifier(identifier)
@@ -71,22 +95,18 @@ func (c *RdsClient) DeleteAuroraCluster(ctx context.Context, node *v1alpha1.Stor
if !ok {
return fmt.Errorf("cluster identifier is empty")
}
- // get instances of aurora cluster
- filters := map[string][]string{
- "db-cluster-id": {identifier},
- }
- instances, err := c.GetInstancesByFilters(ctx, filters)
- if err != nil {
- return fmt.Errorf("get instances failed, %v", err)
- }
- // delete instance first
- for _, ins := range instances {
- if err := c.DeleteInstance(ctx, node, storageProvider); err != nil {
- return fmt.Errorf("delete instance=%s of aurora=%s failed, %v", ins.DBInstanceIdentifier, identifier, err)
- }
- }
- // delete cluster
+
aurora := c.Aurora()
aurora.SetDBClusterIdentifier(identifier)
+
+ switch storageProvider.Spec.ReclaimPolicy {
+ case v1alpha1.StorageReclaimPolicyDelete:
+ aurora.SetDeleteAutomateBackups(true).SetSkipFinalSnapshot(true)
+ case v1alpha1.StorageReclaimPolicyRetain:
+ aurora.SetDeleteAutomateBackups(false).SetSkipFinalSnapshot(true)
+ case v1alpha1.StorageReclaimPolicyDeleteWithFinalSnapshot:
+ aurora.SetDeleteAutomateBackups(true).SetSkipFinalSnapshot(false)
+ }
+
return aurora.Delete(ctx)
}
diff --git a/shardingsphere-operator/pkg/reconcile/storagenode/aws/aurora_test.go b/shardingsphere-operator/pkg/reconcile/storagenode/aws/aurora_test.go
new file mode 100644
index 0000000..6fb8977
--- /dev/null
+++ b/shardingsphere-operator/pkg/reconcile/storagenode/aws/aurora_test.go
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package aws
+
+import (
+ "context"
+ "github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/api/v1alpha1"
+
+ "github.com/database-mesh/golang-sdk/aws"
+ "github.com/database-mesh/golang-sdk/aws/client/rds"
+ . "github.com/onsi/ginkgo/v2"
+ . "github.com/onsi/gomega"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+)
+
+var ctx = context.Background()
+
+var _ = Describe("Aurora", func() {
+ Context("Test valid create aurora params", func() {
+ It("should be success", func() {
+ sn := &v1alpha1.StorageNode{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "test-valid-create-aurora-params",
+ Namespace: "test-namespace",
+ Annotations: map[string]string{
+ v1alpha1.AnnotationsClusterIdentifier: "test",
+ },
+ },
+ }
+ params := map[string]string{
+ "engine": "aurora-mysql",
+ "engineVersion": "5.7",
+ "instanceClass": "db.t2.small",
+ "clusterIdentifier": "",
+ "masterUsername": "root",
+ "masterUserPassword": "root123456",
+ }
+ err := validateCreateAuroraParams(sn, ¶ms)
+ Expect(err).To(BeNil())
+ })
+ })
+})
+
+var _ = Describe("Test For AWS Aurora Manually", func() {
+ var (
+ region string
+ accessKey string
+ secretKey string
+ )
+ Context("Test create aurora cluster with 2 replicas", func() {
+ It("should be success", func() {
+ if region == "" || accessKey == "" || secretKey == "" {
+ Skip("Skip test create aurora cluster")
+ }
+
+ sess := aws.NewSessions().SetCredential(region, accessKey, secretKey).Build()
+ rdsClient := rds.NewService(sess[region])
+ awsClient := NewRdsClient(rdsClient)
+
+ sn := &v1alpha1.StorageNode{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "test-create-aurora-cluster",
+ Namespace: "test-namespace",
+ Annotations: map[string]string{
+ v1alpha1.AnnotationsClusterIdentifier: "test-create-aurora-cluster-identifier",
+ },
+ },
+ Spec: v1alpha1.StorageNodeSpec{
+ Replicas: 2,
+ },
+ }
+ params := map[string]string{
+ "engine": "aurora-mysql",
+ "engineVersion": "5.7",
+ "instanceClass": "db.t2.small",
+ "clusterIdentifier": "",
+ "masterUsername": "root",
+ "masterUserPassword": "root123456",
+ }
+
+ Expect(awsClient.CreateAuroraCluster(ctx, sn, params)).Should(Succeed())
+ })
+ })
+ Context("Test Get Aurora Cluster", func() {
+ It("should be success", func() {
+ if region == "" || accessKey == "" || secretKey == "" {
+ Skip("Skip test create aurora cluster")
+ }
+ sess := aws.NewSessions().SetCredential(region, accessKey, secretKey).Build()
+ rdsClient := rds.NewService(sess[region])
+ awsClient := NewRdsClient(rdsClient)
+ sn := &v1alpha1.StorageNode{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "test-get-aurora-cluster",
+ Namespace: "test-namespace",
+ Annotations: map[string]string{
+ v1alpha1.AnnotationsClusterIdentifier: "test-create-aurora-cluster-identifier",
+ },
+ },
+ Spec: v1alpha1.StorageNodeSpec{
+ Replicas: 2,
+ },
+ }
+
+ ac, err := awsClient.GetAuroraCluster(ctx, sn)
+ Expect(err).To(BeNil())
+ Expect(len(ac.DBClusterMembers)).To(Equal(2))
+ })
+ })
+
+ Context("Test Delete Aurora Cluster", func() {
+ It("should be success", func() {
+ if region == "" || accessKey == "" || secretKey == "" {
+ Skip("Skip test create aurora cluster")
+ }
+ sess := aws.NewSessions().SetCredential(region, accessKey, secretKey).Build()
+ rdsClient := rds.NewService(sess[region])
+ awsClient := NewRdsClient(rdsClient)
+
+ sn := &v1alpha1.StorageNode{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "test-get-aurora-cluster",
+ Namespace: "test-namespace",
+ Annotations: map[string]string{
+ v1alpha1.AnnotationsClusterIdentifier: "test-create-aurora-cluster-identifier",
+ },
+ },
+ Spec: v1alpha1.StorageNodeSpec{
+ StorageProviderName: "test-get-aurora-cluster",
+ },
+ }
+
+ storageProvider := &v1alpha1.StorageProvider{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "test-get-aurora-cluster",
+ },
+ Spec: v1alpha1.StorageProviderSpec{
+ Provisioner: v1alpha1.ProvisionerAWSAurora,
+ ReclaimPolicy: v1alpha1.StorageReclaimPolicyDelete,
+ },
+ }
+ Expect(awsClient.DeleteAuroraCluster(ctx, sn, storageProvider)).Should(Succeed())
+ })
+ })
+})
diff --git a/shardingsphere-operator/pkg/reconcile/storagenode/aws/rdsinstance.go b/shardingsphere-operator/pkg/reconcile/storagenode/aws/rdsinstance.go
index 7fe5f59..e566099 100644
--- a/shardingsphere-operator/pkg/reconcile/storagenode/aws/rdsinstance.go
+++ b/shardingsphere-operator/pkg/reconcile/storagenode/aws/rdsinstance.go
@@ -174,15 +174,6 @@ func (c *RdsClient) GetInstancesByFilters(ctx context.Context, filters map[strin
// DeleteInstance delete rds instance.
// aws rds instance status doc: https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/accessing-monitoring.html
func (c *RdsClient) DeleteInstance(ctx context.Context, node *v1alpha1.StorageNode, storageProvider *v1alpha1.StorageProvider) error {
- // TODO add more test case.
- /* TODO set options to skip final snapshot and backup stuff depends on database class ClaimPolicy.
- "error": "operation error RDS: DeleteDBInstance,
- https response error StatusCode: 400,
- RequestID: ae094e3c-d8f1-49ba-aed1-cb0618b3641d,
- api error InvalidParameterCombination:
- FinalDBSnapshotIdentifier is required unless SkipFinalSnapshot is specified."
- */
-
identifier, ok := node.Annotations[v1alpha1.AnnotationsInstanceIdentifier]
if !ok {
return errors.New("instance identifier is empty")
diff --git a/shardingsphere-operator/test/e2e/storage_node_controller_test.go b/shardingsphere-operator/test/e2e/storage_node_controller_test.go
index 35b0283..f6744b7 100644
--- a/shardingsphere-operator/test/e2e/storage_node_controller_test.go
+++ b/shardingsphere-operator/test/e2e/storage_node_controller_test.go
@@ -20,6 +20,7 @@ package e2e
import (
"context"
"database/sql"
+ "fmt"
"reflect"
"regexp"
"time"
@@ -230,7 +231,6 @@ var _ = Describe("StorageNode Controller Suite Test For AWS RDS Instance", func(
v1alpha1.AnnotationsInstanceIdentifier: instanceIdentifier,
controllers.AnnotationKeyRegisterStorageUnitEnabled: "true",
v1alpha1.AnnotationsInstanceDBName: "test-db-name",
- controllers.AnnotationKeyComputeNodeNamespace: "default",
controllers.AnnotationKeyComputeNodeName: "test-compute-node",
controllers.AnnotationKeyLogicDatabaseName: "test-logic-db-name",
},
@@ -285,3 +285,134 @@ var _ = Describe("StorageNode Controller Suite Test For AWS RDS Instance", func(
})
})
})
+
+var _ = Describe("StorageNode Controller Suite Test For AWS Aurora Cluster", func() {
+ storageProviderName := "test-storage-provider"
+ clusterIdentifier := "test-aurora-cluster-identifier"
+
+ BeforeEach(func() {
+ provider := &v1alpha1.StorageProvider{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: storageProviderName,
+ },
+ Spec: v1alpha1.StorageProviderSpec{
+ Provisioner: v1alpha1.ProvisionerAWSAurora,
+ Parameters: map[string]string{
+ "engine": "aurora-mysql",
+ "engineVersion": "5.7",
+ "instanceClass": "db.t2.small",
+ "masterUsername": "test-user",
+ "masterUserPassword": "test-password",
+ },
+ },
+ }
+ Expect(k8sClient.Create(ctx, provider)).Should(Succeed())
+ })
+
+ AfterEach(func() {
+ monkey.UnpatchAll()
+
+ StorageProvider := &v1alpha1.StorageProvider{}
+ Expect(k8sClient.Get(ctx, client.ObjectKey{Name: storageProviderName}, StorageProvider)).Should(Succeed())
+ Expect(k8sClient.Delete(ctx, StorageProvider)).Should(Succeed())
+ })
+
+ Context("When Creat StorageNode with Aurora Cluster Not Exist", func() {
+ It("Should Success", func() {
+ snName := "test-storage-node-creating"
+ // monkey patch
+ monkey.PatchInstanceMethod(reflect.TypeOf(&aws.RdsClient{}), "GetAuroraCluster", func(_ *aws.RdsClient, _ context.Context, _ *v1alpha1.StorageNode) (*dbmesh_rds.DescCluster, error) {
+ return &dbmesh_rds.DescCluster{
+ DBClusterIdentifier: clusterIdentifier,
+ Status: dbmesh_rds.DBClusterStatusCreating,
+ PrimaryEndpoint: "test-primary-endpoint",
+ ReaderEndpoint: "test-reader-endpoint",
+ Port: 3306,
+ }, nil
+ })
+ monkey.PatchInstanceMethod(reflect.TypeOf(&aws.RdsClient{}), "GetInstancesByFilters", func(_ *aws.RdsClient, _ context.Context, _ map[string][]string) ([]*dbmesh_rds.DescInstance, error) {
+ return []*dbmesh_rds.DescInstance{
+ {
+ DBInstanceIdentifier: fmt.Sprintf("%s-insatnce-0", clusterIdentifier),
+ DBInstanceStatus: dbmesh_rds.DBInstanceStatusCreating,
+ Endpoint: dbmesh_rds.Endpoint{Address: "test-instance-0-endpoint", Port: 3306},
+ },
+ }, nil
+ })
+
+ sn := &v1alpha1.StorageNode{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: snName,
+ Namespace: "default",
+ Annotations: map[string]string{
+ v1alpha1.AnnotationsClusterIdentifier: clusterIdentifier,
+ },
+ },
+ Spec: v1alpha1.StorageNodeSpec{
+ StorageProviderName: storageProviderName,
+ Replicas: 2,
+ },
+ }
+ Expect(k8sClient.Create(ctx, sn)).Should(Succeed())
+
+ Eventually(func() string {
+ newSN := &v1alpha1.StorageNode{}
+ Expect(k8sClient.Get(ctx, client.ObjectKey{Name: snName, Namespace: "default"}, newSN)).Should(Succeed())
+ return newSN.Status.Cluster.Status
+ }, time.Second*10, time.Millisecond*250).Should(Equal(dbmesh_rds.DBClusterStatusCreating))
+ })
+
+ It("should success when cluster is available", func() {
+ snName := "test-storage-node-available"
+ monkey.PatchInstanceMethod(reflect.TypeOf(&aws.RdsClient{}), "GetAuroraCluster", func(_ *aws.RdsClient, _ context.Context, _ *v1alpha1.StorageNode) (*dbmesh_rds.DescCluster, error) {
+ return &dbmesh_rds.DescCluster{
+ DBClusterIdentifier: clusterIdentifier,
+ Status: dbmesh_rds.DBClusterStatusAvailable,
+ PrimaryEndpoint: "test-primary-endpoint",
+ ReaderEndpoint: "test-reader-endpoint",
+ Port: 3306,
+ }, nil
+ })
+ monkey.PatchInstanceMethod(reflect.TypeOf(&aws.RdsClient{}), "GetInstancesByFilters", func(_ *aws.RdsClient, _ context.Context, _ map[string][]string) ([]*dbmesh_rds.DescInstance, error) {
+ return []*dbmesh_rds.DescInstance{
+ {
+ DBInstanceIdentifier: fmt.Sprintf("%s-insatnce-0", clusterIdentifier),
+ DBInstanceStatus: dbmesh_rds.DBInstanceStatusAvailable,
+ Endpoint: dbmesh_rds.Endpoint{Address: "test-instance-0-endpoint", Port: 3306},
+ },
+ {
+ DBInstanceIdentifier: fmt.Sprintf("%s-insatnce-1", clusterIdentifier),
+ DBInstanceStatus: dbmesh_rds.DBInstanceStatusAvailable,
+ Endpoint: dbmesh_rds.Endpoint{Address: "test-instance-1-endpoint", Port: 3306},
+ },
+ }, nil
+ })
+ monkey.PatchInstanceMethod(reflect.TypeOf(&aws.RdsClient{}), "DeleteAuroraCluster", func(_ *aws.RdsClient, _ context.Context, _ *v1alpha1.StorageNode, _ *v1alpha1.StorageProvider) error {
+ return nil
+ })
+
+ sn := &v1alpha1.StorageNode{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: snName,
+ Namespace: "default",
+ Annotations: map[string]string{
+ v1alpha1.AnnotationsClusterIdentifier: clusterIdentifier,
+ },
+ },
+ Spec: v1alpha1.StorageNodeSpec{
+ StorageProviderName: storageProviderName,
+ Replicas: 2,
+ },
+ }
+ Expect(k8sClient.Create(ctx, sn)).Should(Succeed())
+
+ newSN := &v1alpha1.StorageNode{}
+ Eventually(func() v1alpha1.StorageNodePhaseStatus {
+ Expect(k8sClient.Get(ctx, client.ObjectKey{Name: snName, Namespace: "default"}, newSN)).Should(Succeed())
+ return newSN.Status.Phase
+ }, time.Second*10, time.Millisecond*250).Should(Equal(v1alpha1.StorageNodePhaseReady))
+
+ Expect(newSN.Status.Instances).Should(HaveLen(2))
+ })
+ })
+})