You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by mi...@apache.org on 2023/06/27 07:23:30 UTC
[shardingsphere-on-cloud] branch main updated: fix: fix storagenode controller may reset params when reconcile aws rds
This is an automated email from the ASF dual-hosted git repository.
miaoliyao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/shardingsphere-on-cloud.git
The following commit(s) were added to refs/heads/main by this push:
new 9057d9e fix: fix storagenode controller may reset params when reconcile aws rds
new 11d7838 Merge pull request #424 from Xu-Wentao/bugfix
9057d9e is described below
commit 9057d9e76d23e60e3ef3284ad29c47b4667ceea9
Author: xuwentao <cu...@yahoo.com>
AuthorDate: Tue Jun 27 11:15:53 2023 +0800
fix: fix storagenode controller may reset params when reconcile aws rds
---
.../templates/operator_rbac.yaml | 27 ++--
.../cmd/shardingsphere-operator/manager/option.go | 23 ++-
.../controllers/storage_ndoe_controller_test.go | 178 ++++++++++-----------
.../pkg/controllers/storage_node_controller.go | 117 ++++++++------
shardingsphere-operator/test/e2e/e2e_suite_test.go | 17 +-
.../test/e2e/storage_node_controller_test.go | 54 +++----
6 files changed, 214 insertions(+), 202 deletions(-)
diff --git a/charts/apache-shardingsphere-operator-charts/templates/operator_rbac.yaml b/charts/apache-shardingsphere-operator-charts/templates/operator_rbac.yaml
index 620a24f..ec41a8c 100644
--- a/charts/apache-shardingsphere-operator-charts/templates/operator_rbac.yaml
+++ b/charts/apache-shardingsphere-operator-charts/templates/operator_rbac.yaml
@@ -89,6 +89,13 @@ rules:
- patch
- update
- watch
+ - apiGroups:
+ - ""
+ resources:
+ - event
+ verbs:
+ - create
+ - patch
- apiGroups:
- apps
resources:
@@ -189,7 +196,7 @@ rules:
- apiGroups:
- shardingsphere.apache.org
resources:
- - computenodes
+ - chaos
verbs:
- create
- delete
@@ -201,7 +208,13 @@ rules:
- apiGroups:
- shardingsphere.apache.org
resources:
- - computenodes/status
+ - chaos/finalizers
+ verbs:
+ - update
+ - apiGroups:
+ - shardingsphere.apache.org
+ resources:
+ - chaos/status
verbs:
- get
- patch
@@ -209,7 +222,7 @@ rules:
- apiGroups:
- shardingsphere.apache.org
resources:
- - shardingspherechaos
+ - computenodes
verbs:
- create
- delete
@@ -221,13 +234,7 @@ rules:
- apiGroups:
- shardingsphere.apache.org
resources:
- - shardingspherechaos/finalizers
- verbs:
- - update
- - apiGroups:
- - shardingsphere.apache.org
- resources:
- - shardingspherechaos/status
+ - computenodes/status
verbs:
- get
- patch
diff --git a/shardingsphere-operator/cmd/shardingsphere-operator/manager/option.go b/shardingsphere-operator/cmd/shardingsphere-operator/manager/option.go
index 843f188..15674d0 100644
--- a/shardingsphere-operator/cmd/shardingsphere-operator/manager/option.go
+++ b/shardingsphere-operator/cmd/shardingsphere-operator/manager/option.go
@@ -32,8 +32,6 @@ import (
chaosv1alpha1 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
cnpgv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1"
- "github.com/database-mesh/golang-sdk/aws"
- "github.com/database-mesh/golang-sdk/aws/client/rds"
dbmeshv1alpha1 "github.com/database-mesh/golang-sdk/kubernetes/api/v1alpha1"
"go.uber.org/zap/zapcore"
batchV1 "k8s.io/api/batch/v1"
@@ -148,18 +146,15 @@ var featureGatesHandlers = map[string]FeatureGateHandler{
},
"StorageNode": func(mgr manager.Manager) error {
reconciler := &controllers.StorageNodeReconciler{
- Client: mgr.GetClient(),
- Scheme: mgr.GetScheme(),
- Log: mgr.GetLogger(),
- Recorder: mgr.GetEventRecorderFor(controllers.StorageNodeControllerName),
- Service: service.NewServiceClient(mgr.GetClient()),
- CNPG: cloudnativepg.NewCloudNativePGClient(mgr.GetClient()),
- }
-
- // init aws client if aws credentials are provided
- if AwsRegion != "" && AwsAccessKeyID != "" && AwsSecretAccessKey != "" {
- sess := aws.NewSessions().SetCredential(AwsRegion, AwsAccessKeyID, AwsSecretAccessKey).Build()
- reconciler.AwsRDS = rds.NewService(sess[AwsRegion])
+ Client: mgr.GetClient(),
+ Scheme: mgr.GetScheme(),
+ Log: mgr.GetLogger(),
+ Recorder: mgr.GetEventRecorderFor(controllers.StorageNodeControllerName),
+ Service: service.NewServiceClient(mgr.GetClient()),
+ CNPG: cloudnativepg.NewCloudNativePGClient(mgr.GetClient()),
+ AwsAccessKeyID: AwsAccessKeyID,
+ AwsSecretAccessKey: AwsSecretAccessKey,
+ AwsRegion: AwsRegion,
}
if err := reconciler.SetupWithManager(mgr); err != nil {
diff --git a/shardingsphere-operator/pkg/controllers/storage_ndoe_controller_test.go b/shardingsphere-operator/pkg/controllers/storage_ndoe_controller_test.go
index f3632a5..ce2e4e2 100644
--- a/shardingsphere-operator/pkg/controllers/storage_ndoe_controller_test.go
+++ b/shardingsphere-operator/pkg/controllers/storage_ndoe_controller_test.go
@@ -29,8 +29,7 @@ import (
mock_shardingsphere "github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/shardingsphere/mocks"
"bou.ke/monkey"
- dbmesh_aws "github.com/database-mesh/golang-sdk/aws"
- dbmesh_rds "github.com/database-mesh/golang-sdk/aws/client/rds"
+ dbmeshawsrds "github.com/database-mesh/golang-sdk/aws/client/rds"
"github.com/golang/mock/gomock"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
@@ -71,13 +70,14 @@ func fakeStorageNodeReconciler() {
Expect(corev1.AddToScheme(scheme)).To(Succeed())
fakeClient = fake.NewClientBuilder().WithScheme(scheme).Build()
- sess := dbmesh_aws.NewSessions().SetCredential("AwsRegion", "AwsAccessKeyID", "AwsSecretAccessKey").Build()
reconciler = &StorageNodeReconciler{
- Client: fakeClient,
- Log: logf.Log,
- Recorder: record.NewFakeRecorder(100),
- AwsRDS: dbmesh_rds.NewService(sess["AwsRegion"]),
- Service: service.NewServiceClient(fakeClient),
+ Client: fakeClient,
+ Log: logf.Log,
+ Recorder: record.NewFakeRecorder(100),
+ AwsRegion: "AwsRegion",
+ AwsAccessKeyID: "AwsAccessKeyID",
+ AwsSecretAccessKey: "AwsSecretAccessKey",
+ Service: service.NewServiceClient(fakeClient),
}
}
@@ -91,7 +91,7 @@ var _ = Describe("StorageNode Controller Mock Test For AWS Rds Instance", func()
mockCtrl = gomock.NewController(GinkgoT())
mockAws = mock_aws.NewMockIRdsClient(mockCtrl)
- monkey.Patch(aws.NewRdsClient, func(rds dbmesh_rds.RDS) aws.IRdsClient {
+ monkey.Patch(aws.NewRdsClient, func(rds dbmeshawsrds.RDS) aws.IRdsClient {
return mockAws
})
@@ -192,9 +192,9 @@ var _ = Describe("StorageNode Controller Mock Test For AWS Rds Instance", func()
},
}
- rdsInstance := &dbmesh_rds.DescInstance{
- DBInstanceStatus: dbmesh_rds.DBInstanceStatusCreating,
- Endpoint: dbmesh_rds.Endpoint{
+ rdsInstance := &dbmeshawsrds.DescInstance{
+ DBInstanceStatus: dbmeshawsrds.DBInstanceStatusCreating,
+ Endpoint: dbmeshawsrds.Endpoint{
Address: "127.0.0.1",
Port: 3306,
},
@@ -210,7 +210,7 @@ var _ = Describe("StorageNode Controller Mock Test For AWS Rds Instance", func()
Expect(fakeClient.Get(ctx, client.ObjectKey{Name: "test-storage-node", Namespace: "test-namespace"}, newSN)).Should(Succeed())
Expect(newSN.Status.Phase).To(Equal(v1alpha1.StorageNodePhaseNotReady))
Expect(newSN.Status.Instances).To(HaveLen(1))
- Expect(newSN.Status.Instances[0].Status).To(Equal(string(dbmesh_rds.DBInstanceStatusCreating)))
+ Expect(newSN.Status.Instances[0].Status).To(Equal(string(dbmeshawsrds.DBInstanceStatusCreating)))
})
It("should reconcile successfully with Available Instance", func() {
@@ -221,9 +221,9 @@ var _ = Describe("StorageNode Controller Mock Test For AWS Rds Instance", func()
},
}
- rdsInstance := &dbmesh_rds.DescInstance{
- DBInstanceStatus: dbmesh_rds.DBInstanceStatusAvailable,
- Endpoint: dbmesh_rds.Endpoint{
+ rdsInstance := &dbmeshawsrds.DescInstance{
+ DBInstanceStatus: dbmeshawsrds.DBInstanceStatusAvailable,
+ Endpoint: dbmeshawsrds.Endpoint{
Address: "127.0.0.1",
Port: 3306,
},
@@ -239,15 +239,15 @@ var _ = Describe("StorageNode Controller Mock Test For AWS Rds Instance", func()
Expect(newSN.Status.Phase).To(Equal(v1alpha1.StorageNodePhaseReady))
Expect(newSN.Status.Instances).To(HaveLen(1))
- Expect(newSN.Status.Instances[0].Status).To(Equal(string(dbmesh_rds.DBInstanceStatusAvailable)))
+ Expect(newSN.Status.Instances[0].Status).To(Equal(string(dbmeshawsrds.DBInstanceStatusAvailable)))
})
})
Context("reconcile storage node in Ready status when it's been deleted", func() {
- rdsInstanceAvailable := dbmesh_rds.DescInstance{
+ rdsInstanceAvailable := dbmeshawsrds.DescInstance{
DBInstanceIdentifier: defaultTestInstanceIdentifier,
- DBInstanceStatus: dbmesh_rds.DBInstanceStatusAvailable,
- Endpoint: dbmesh_rds.Endpoint{
+ DBInstanceStatus: dbmeshawsrds.DBInstanceStatusAvailable,
+ Endpoint: dbmeshawsrds.Endpoint{
Address: "127.0.0.1",
Port: 3306,
},
@@ -326,7 +326,7 @@ var _ = Describe("StorageNode Controller Mock Test For AWS Rds Instance", func()
Phase: v1alpha1.StorageNodePhaseDeleting,
Instances: []v1alpha1.InstanceStatus{
{
- Status: string(dbmesh_rds.DBInstanceStatusDeleting),
+ Status: string(dbmeshawsrds.DBInstanceStatusDeleting),
Endpoint: v1alpha1.Endpoint{
Address: "127.0.0.1",
Port: 3306,
@@ -416,10 +416,10 @@ var _ = Describe("StorageNode Controller Mock Test For AWS Rds Instance", func()
StorageProviderName: defaultTestStorageProvider,
},
}
- ins := &dbmesh_rds.DescInstance{
+ ins := &dbmeshawsrds.DescInstance{
DBInstanceIdentifier: "ins-test-register-storage-node",
- DBInstanceStatus: dbmesh_rds.DBInstanceStatusAvailable,
- Endpoint: dbmesh_rds.Endpoint{
+ DBInstanceStatus: dbmeshawsrds.DBInstanceStatusAvailable,
+ Endpoint: dbmeshawsrds.Endpoint{
Address: "127.0.0.1",
Port: 3306,
},
@@ -631,7 +631,7 @@ var _ = Describe("StorageNode Controller Mock Test For AWS Rds Instance", func()
Phase: v1alpha1.StorageNodePhaseReady,
Instances: []v1alpha1.InstanceStatus{
{
- Status: string(dbmesh_rds.DBInstanceStatusAvailable),
+ Status: string(dbmeshawsrds.DBInstanceStatusAvailable),
Endpoint: v1alpha1.Endpoint{},
},
},
@@ -759,7 +759,7 @@ var _ = Describe("StorageNode Controller Mock Test For AWS Aurora", func() {
// mock aws rds client
mockCtrl = gomock.NewController(GinkgoT())
mockAws = mock_aws.NewMockIRdsClient(mockCtrl)
- monkey.Patch(aws.NewRdsClient, func(rds dbmesh_rds.RDS) aws.IRdsClient {
+ monkey.Patch(aws.NewRdsClient, func(rds dbmeshawsrds.RDS) aws.IRdsClient {
return mockAws
})
mockSS = mock_shardingsphere.NewMockIServer(mockCtrl)
@@ -794,21 +794,21 @@ var _ = Describe("StorageNode Controller Mock Test For AWS Aurora", func() {
}
Expect(fakeClient.Create(ctx, &storageNode)).Should(Succeed())
- descCluster := &dbmesh_rds.DescCluster{
+ descCluster := &dbmeshawsrds.DescCluster{
DBClusterIdentifier: "test-aws-aurora",
PrimaryEndpoint: "test-aws-aurora.cluster-xxxxxx.us-east-1.rds.amazonaws.com",
ReaderEndpoint: "test-aws-aurora.cluster-ro-xxxxxx.us-east-1.rds.amazonaws.com",
Port: 3306,
- Status: string(dbmesh_rds.DBClusterStatusAvailable),
+ Status: string(dbmeshawsrds.DBClusterStatusAvailable),
}
- descInstance := &dbmesh_rds.DescInstance{
+ descInstance := &dbmeshawsrds.DescInstance{
DBInstanceIdentifier: "test-aws-aurora-1",
DBClusterIdentifier: "test-aws-aurora",
- Endpoint: dbmesh_rds.Endpoint{
+ Endpoint: dbmeshawsrds.Endpoint{
Address: "test-aws-aurora-1.cluster-xxxxxx.us-east-1.rds.amazonaws.com",
Port: 3306,
},
- DBInstanceStatus: dbmesh_rds.DBInstanceStatusAvailable,
+ DBInstanceStatus: dbmeshawsrds.DBInstanceStatusAvailable,
}
// mock aws aurora cluster is not exist
@@ -818,7 +818,7 @@ var _ = Describe("StorageNode Controller Mock Test For AWS Aurora", func() {
// mock aws aurora cluster is created
mockAws.EXPECT().GetAuroraCluster(gomock.Any(), gomock.Any()).Return(descCluster, nil).Times(1)
// mock aws instance is created
- mockAws.EXPECT().GetInstancesByFilters(gomock.Any(), gomock.Any()).Return([]*dbmesh_rds.DescInstance{descInstance}, nil).Times(1)
+ mockAws.EXPECT().GetInstancesByFilters(gomock.Any(), gomock.Any()).Return([]*dbmeshawsrds.DescInstance{descInstance}, nil).Times(1)
req := ctrl.Request{NamespacedName: namespacedName}
_, err := reconciler.Reconcile(ctx, req)
@@ -850,13 +850,13 @@ var _ = Describe("StorageNode Controller Mock Test For AWS Aurora", func() {
Status: v1alpha1.StorageNodeStatus{
Phase: v1alpha1.StorageNodePhaseReady,
Cluster: v1alpha1.ClusterStatus{
- Status: string(dbmesh_rds.DBClusterStatusAvailable),
+ Status: string(dbmeshawsrds.DBClusterStatusAvailable),
PrimaryEndpoint: v1alpha1.Endpoint{Address: "test-aws-aurora.cluster-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306},
ReaderEndpoints: []v1alpha1.Endpoint{{Address: "test-aws-aurora.cluster-ro-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306}},
},
Instances: []v1alpha1.InstanceStatus{
{
- Status: string(dbmesh_rds.DBInstanceStatusAvailable),
+ Status: string(dbmeshawsrds.DBInstanceStatusAvailable),
Endpoint: v1alpha1.Endpoint{Address: "test-aws-aurora-1.cluster-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306},
},
},
@@ -865,22 +865,22 @@ var _ = Describe("StorageNode Controller Mock Test For AWS Aurora", func() {
Expect(fakeClient.Create(ctx, storageNode)).Should(Succeed())
- descCluster := &dbmesh_rds.DescCluster{
+ descCluster := &dbmeshawsrds.DescCluster{
DBClusterIdentifier: "test-aws-aurora",
PrimaryEndpoint: "test-aws-aurora.cluster-xxxxxx.us-east-1.rds.amazonaws.com",
ReaderEndpoint: "test-aws-aurora.cluster-ro-xxxxxx.us-east-1.rds.amazonaws.com",
Port: 3306,
- Status: string(dbmesh_rds.DBClusterStatusAvailable),
+ Status: string(dbmeshawsrds.DBClusterStatusAvailable),
}
- descInstance := &dbmesh_rds.DescInstance{
+ descInstance := &dbmeshawsrds.DescInstance{
DBInstanceIdentifier: "test-aws-aurora-1",
DBClusterIdentifier: "test-aws-aurora",
- Endpoint: dbmesh_rds.Endpoint{
+ Endpoint: dbmeshawsrds.Endpoint{
Address: "test-aws-aurora-1.cluster-xxxxxx.us-east-1.rds.amazonaws.com",
Port: 3306,
},
- DBInstanceStatus: dbmesh_rds.DBInstanceStatusAvailable,
+ DBInstanceStatus: dbmeshawsrds.DBInstanceStatusAvailable,
}
Expect(fakeClient.Delete(ctx, storageNode)).Should(Succeed())
@@ -888,7 +888,7 @@ var _ = Describe("StorageNode Controller Mock Test For AWS Aurora", func() {
// mock aws aurora is exists
mockAws.EXPECT().GetAuroraCluster(gomock.Any(), gomock.Any()).Return(descCluster, nil).Times(1)
// mock get instances of aws aurora
- mockAws.EXPECT().GetInstancesByFilters(gomock.Any(), gomock.Any()).Return([]*dbmesh_rds.DescInstance{descInstance}, nil).Times(1)
+ mockAws.EXPECT().GetInstancesByFilters(gomock.Any(), gomock.Any()).Return([]*dbmeshawsrds.DescInstance{descInstance}, nil).Times(1)
// mock delete aws aurora cluster
mockAws.EXPECT().DeleteAuroraCluster(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil)
@@ -924,13 +924,13 @@ var _ = Describe("StorageNode Controller Mock Test For AWS Aurora", func() {
Status: v1alpha1.StorageNodeStatus{
Phase: v1alpha1.StorageNodePhaseDeleting,
Cluster: v1alpha1.ClusterStatus{
- Status: string(dbmesh_rds.DBClusterStatusDeleting),
+ Status: string(dbmeshawsrds.DBClusterStatusDeleting),
PrimaryEndpoint: v1alpha1.Endpoint{Address: "test-aws-aurora.cluster-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306},
ReaderEndpoints: []v1alpha1.Endpoint{{Address: "test-aws-aurora.cluster-ro-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306}},
},
Instances: []v1alpha1.InstanceStatus{
{
- Status: string(dbmesh_rds.DBInstanceStatusDeleting),
+ Status: string(dbmeshawsrds.DBInstanceStatusDeleting),
Endpoint: v1alpha1.Endpoint{Address: "test-aws-aurora-1.cluster-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306},
},
},
@@ -1007,13 +1007,13 @@ var _ = Describe("StorageNode Controller Mock Test For AWS Aurora", func() {
Status: v1alpha1.StorageNodeStatus{
Phase: v1alpha1.StorageNodePhaseReady,
Cluster: v1alpha1.ClusterStatus{
- Status: string(dbmesh_rds.DBClusterStatusAvailable),
+ Status: string(dbmeshawsrds.DBClusterStatusAvailable),
PrimaryEndpoint: v1alpha1.Endpoint{Address: "test-aws-aurora.cluster-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306},
ReaderEndpoints: []v1alpha1.Endpoint{{Address: "test-aws-aurora.cluster-ro-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306}},
},
Instances: []v1alpha1.InstanceStatus{
{
- Status: string(dbmesh_rds.DBInstanceStatusAvailable),
+ Status: string(dbmeshawsrds.DBInstanceStatusAvailable),
Endpoint: v1alpha1.Endpoint{Address: "test-aws-aurora-1.cluster-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306},
},
},
@@ -1052,18 +1052,18 @@ var _ = Describe("StorageNode Controller Mock Test For AWS Aurora", func() {
Expect(fakeClient.Create(ctx, svc)).Should(Succeed())
// mock aws aurora is available
- mockAws.EXPECT().GetAuroraCluster(gomock.Any(), gomock.Any()).Return(&dbmesh_rds.DescCluster{
+ mockAws.EXPECT().GetAuroraCluster(gomock.Any(), gomock.Any()).Return(&dbmeshawsrds.DescCluster{
DBClusterIdentifier: "test-aws-aurora",
PrimaryEndpoint: "test-aws-aurora.cluster-xxxxxx.us-east-1.rds.amazonaws.com",
Port: int32(3306),
- Status: string(dbmesh_rds.DBClusterStatusAvailable),
+ Status: string(dbmeshawsrds.DBClusterStatusAvailable),
}, nil).Times(1)
// mock get instances of aws aurora are available
- mockAws.EXPECT().GetInstancesByFilters(gomock.Any(), gomock.Any()).Return([]*dbmesh_rds.DescInstance{
+ mockAws.EXPECT().GetInstancesByFilters(gomock.Any(), gomock.Any()).Return([]*dbmeshawsrds.DescInstance{
{
DBInstanceIdentifier: "test-aws-aurora-instance-0",
- DBInstanceStatus: dbmesh_rds.DBInstanceStatusAvailable,
- Endpoint: dbmesh_rds.Endpoint{Address: "test-aws-aurora-1.cluster-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306},
+ DBInstanceStatus: dbmeshawsrds.DBInstanceStatusAvailable,
+ Endpoint: dbmeshawsrds.Endpoint{Address: "test-aws-aurora-1.cluster-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306},
},
}, nil).Times(1)
@@ -1141,27 +1141,27 @@ var _ = Describe("StorageNode Controller Mock Test For AWS Aurora", func() {
Expect(fakeClient.Create(ctx, svc)).Should(Succeed())
// mock aws aurora is available
- mockAws.EXPECT().GetAuroraCluster(gomock.Any(), gomock.Any()).Return(&dbmesh_rds.DescCluster{
+ mockAws.EXPECT().GetAuroraCluster(gomock.Any(), gomock.Any()).Return(&dbmeshawsrds.DescCluster{
DBClusterIdentifier: "test-aws-aurora",
PrimaryEndpoint: "test-aws-aurora.cluster-xxxxxx.us-east-1.rds.amazonaws.com",
Port: int32(3306),
- Status: string(dbmesh_rds.DBClusterStatusAvailable),
- DBClusterMembers: []dbmesh_rds.ClusterMember{
+ Status: string(dbmeshawsrds.DBClusterStatusAvailable),
+ DBClusterMembers: []dbmeshawsrds.ClusterMember{
{DBInstanceIdentifier: "test-aws-aurora-instance-0"},
{DBInstanceIdentifier: "test-aws-aurora-instance-1"},
},
}, nil).Times(2)
// mock get instances of aws aurora are available
- mockAws.EXPECT().GetInstancesByFilters(gomock.Any(), gomock.Any()).Return([]*dbmesh_rds.DescInstance{
+ mockAws.EXPECT().GetInstancesByFilters(gomock.Any(), gomock.Any()).Return([]*dbmeshawsrds.DescInstance{
{
DBInstanceIdentifier: "test-aws-aurora-instance-0",
- DBInstanceStatus: dbmesh_rds.DBInstanceStatusAvailable,
- Endpoint: dbmesh_rds.Endpoint{Address: "test-aws-aurora-1.cluster-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306},
+ DBInstanceStatus: dbmeshawsrds.DBInstanceStatusAvailable,
+ Endpoint: dbmeshawsrds.Endpoint{Address: "test-aws-aurora-1.cluster-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306},
},
{
DBInstanceIdentifier: "test-aws-aurora-instance-1",
- DBInstanceStatus: dbmesh_rds.DBInstanceStatusAvailable,
- Endpoint: dbmesh_rds.Endpoint{Address: "test-aws-aurora-2.cluster-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306},
+ DBInstanceStatus: dbmeshawsrds.DBInstanceStatusAvailable,
+ Endpoint: dbmeshawsrds.Endpoint{Address: "test-aws-aurora-2.cluster-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306},
},
}, nil).Times(2)
@@ -1210,7 +1210,7 @@ var _ = Describe("StorageNode Controller Mock Test For AWS RDS Cluster", func()
// mock aws rds client
mockCtrl = gomock.NewController(GinkgoT())
mockAws = mock_aws.NewMockIRdsClient(mockCtrl)
- monkey.Patch(aws.NewRdsClient, func(rds dbmesh_rds.RDS) aws.IRdsClient {
+ monkey.Patch(aws.NewRdsClient, func(rds dbmeshawsrds.RDS) aws.IRdsClient {
return mockAws
})
mockSS = mock_shardingsphere.NewMockIServer(mockCtrl)
@@ -1245,21 +1245,21 @@ var _ = Describe("StorageNode Controller Mock Test For AWS RDS Cluster", func()
}
Expect(fakeClient.Create(ctx, &storageNode)).Should(Succeed())
- descCluster := &dbmesh_rds.DescCluster{
+ descCluster := &dbmeshawsrds.DescCluster{
DBClusterIdentifier: "test-aws-rds-cluster",
PrimaryEndpoint: "test-aws-rds-cluster.cluster-xxxxxx.us-east-1.rds.amazonaws.com",
ReaderEndpoint: "test-aws-rds-cluster.cluster-ro-xxxxxx.us-east-1.rds.amazonaws.com",
Port: 3306,
- Status: string(dbmesh_rds.DBClusterStatusAvailable),
+ Status: string(dbmeshawsrds.DBClusterStatusAvailable),
}
- descInstance := &dbmesh_rds.DescInstance{
+ descInstance := &dbmeshawsrds.DescInstance{
DBInstanceIdentifier: "test-aws-rds-cluster-instance-1",
DBClusterIdentifier: "test-aws-rds-cluster",
- Endpoint: dbmesh_rds.Endpoint{
+ Endpoint: dbmeshawsrds.Endpoint{
Address: "test-aws-rds-cluster-instance-1.cluster-xxxxxx.us-east-1.rds.amazonaws.com",
Port: 3306,
},
- DBInstanceStatus: dbmesh_rds.DBInstanceStatusAvailable,
+ DBInstanceStatus: dbmeshawsrds.DBInstanceStatusAvailable,
}
// mock aws rds cluster is not exist
@@ -1269,7 +1269,7 @@ var _ = Describe("StorageNode Controller Mock Test For AWS RDS Cluster", func()
// mock aws aurora cluster is created
mockAws.EXPECT().GetRDSCluster(gomock.Any(), gomock.Any()).Return(descCluster, nil).Times(1)
// mock aws instance is created
- mockAws.EXPECT().GetInstancesByFilters(gomock.Any(), gomock.Any()).Return([]*dbmesh_rds.DescInstance{descInstance}, nil).Times(1)
+ mockAws.EXPECT().GetInstancesByFilters(gomock.Any(), gomock.Any()).Return([]*dbmeshawsrds.DescInstance{descInstance}, nil).Times(1)
req := ctrl.Request{NamespacedName: namespacedName}
_, err := reconciler.Reconcile(ctx, req)
@@ -1301,13 +1301,13 @@ var _ = Describe("StorageNode Controller Mock Test For AWS RDS Cluster", func()
Status: v1alpha1.StorageNodeStatus{
Phase: v1alpha1.StorageNodePhaseReady,
Cluster: v1alpha1.ClusterStatus{
- Status: string(dbmesh_rds.DBClusterStatusAvailable),
+ Status: string(dbmeshawsrds.DBClusterStatusAvailable),
PrimaryEndpoint: v1alpha1.Endpoint{Address: "test-aws-rds-cluster.cluster-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306},
ReaderEndpoints: []v1alpha1.Endpoint{{Address: "test-aws-rds-cluster.cluster-ro-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306}},
},
Instances: []v1alpha1.InstanceStatus{
{
- Status: string(dbmesh_rds.DBInstanceStatusAvailable),
+ Status: string(dbmeshawsrds.DBInstanceStatusAvailable),
Endpoint: v1alpha1.Endpoint{Address: "test-aws-rds-cluster-instance-1.cluster-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306},
},
},
@@ -1316,22 +1316,22 @@ var _ = Describe("StorageNode Controller Mock Test For AWS RDS Cluster", func()
Expect(fakeClient.Create(ctx, storageNode)).Should(Succeed())
- descCluster := &dbmesh_rds.DescCluster{
+ descCluster := &dbmeshawsrds.DescCluster{
DBClusterIdentifier: "test-aws-rds-cluster",
PrimaryEndpoint: "test-aws-rds-cluster.cluster-xxxxxx.us-east-1.rds.amazonaws.com",
ReaderEndpoint: "test-aws-rds-cluster.cluster-ro-xxxxxx.us-east-1.rds.amazonaws.com",
Port: 3306,
- Status: string(dbmesh_rds.DBClusterStatusAvailable),
+ Status: string(dbmeshawsrds.DBClusterStatusAvailable),
}
- descInstance := &dbmesh_rds.DescInstance{
+ descInstance := &dbmeshawsrds.DescInstance{
DBInstanceIdentifier: "test-aws-rds-cluster-1",
DBClusterIdentifier: "test-aws-rds-cluster",
- Endpoint: dbmesh_rds.Endpoint{
+ Endpoint: dbmeshawsrds.Endpoint{
Address: "test-aws-rds-cluster-1.cluster-xxxxxx.us-east-1.rds.amazonaws.com",
Port: 3306,
},
- DBInstanceStatus: dbmesh_rds.DBInstanceStatusAvailable,
+ DBInstanceStatus: dbmeshawsrds.DBInstanceStatusAvailable,
}
Expect(fakeClient.Delete(ctx, storageNode)).Should(Succeed())
@@ -1339,7 +1339,7 @@ var _ = Describe("StorageNode Controller Mock Test For AWS RDS Cluster", func()
// mock aws rds cluster is exists
mockAws.EXPECT().GetRDSCluster(gomock.Any(), gomock.Any()).Return(descCluster, nil).Times(1)
// mock get instances of aws rds cluster
- mockAws.EXPECT().GetInstancesByFilters(gomock.Any(), gomock.Any()).Return([]*dbmesh_rds.DescInstance{descInstance}, nil).Times(1)
+ mockAws.EXPECT().GetInstancesByFilters(gomock.Any(), gomock.Any()).Return([]*dbmeshawsrds.DescInstance{descInstance}, nil).Times(1)
// mock delete aws rds cluster
mockAws.EXPECT().DeleteRDSCluster(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil)
@@ -1375,13 +1375,13 @@ var _ = Describe("StorageNode Controller Mock Test For AWS RDS Cluster", func()
Status: v1alpha1.StorageNodeStatus{
Phase: v1alpha1.StorageNodePhaseDeleting,
Cluster: v1alpha1.ClusterStatus{
- Status: string(dbmesh_rds.DBClusterStatusDeleting),
+ Status: string(dbmeshawsrds.DBClusterStatusDeleting),
PrimaryEndpoint: v1alpha1.Endpoint{Address: "test-aws-rds-cluster.cluster-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306},
ReaderEndpoints: []v1alpha1.Endpoint{{Address: "test-aws-rds-cluster.cluster-ro-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306}},
},
Instances: []v1alpha1.InstanceStatus{
{
- Status: string(dbmesh_rds.DBInstanceStatusDeleting),
+ Status: string(dbmeshawsrds.DBInstanceStatusDeleting),
Endpoint: v1alpha1.Endpoint{Address: "test-aws-rds-cluster-1.cluster-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306},
},
},
@@ -1458,13 +1458,13 @@ var _ = Describe("StorageNode Controller Mock Test For AWS RDS Cluster", func()
Status: v1alpha1.StorageNodeStatus{
Phase: v1alpha1.StorageNodePhaseReady,
Cluster: v1alpha1.ClusterStatus{
- Status: string(dbmesh_rds.DBClusterStatusAvailable),
+ Status: string(dbmeshawsrds.DBClusterStatusAvailable),
PrimaryEndpoint: v1alpha1.Endpoint{Address: "test-aws-rds-cluster.cluster-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306},
ReaderEndpoints: []v1alpha1.Endpoint{{Address: "test-aws-rds-cluster.cluster-ro-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306}},
},
Instances: []v1alpha1.InstanceStatus{
{
- Status: string(dbmesh_rds.DBInstanceStatusAvailable),
+ Status: string(dbmeshawsrds.DBInstanceStatusAvailable),
Endpoint: v1alpha1.Endpoint{Address: "test-aws-rds-cluster-1.cluster-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306},
},
},
@@ -1503,18 +1503,18 @@ var _ = Describe("StorageNode Controller Mock Test For AWS RDS Cluster", func()
Expect(fakeClient.Create(ctx, svc)).Should(Succeed())
// mock aws rds cluster is available
- mockAws.EXPECT().GetRDSCluster(gomock.Any(), gomock.Any()).Return(&dbmesh_rds.DescCluster{
+ mockAws.EXPECT().GetRDSCluster(gomock.Any(), gomock.Any()).Return(&dbmeshawsrds.DescCluster{
DBClusterIdentifier: "test-aws-rds-cluster",
PrimaryEndpoint: "test-aws-rds-cluster.cluster-xxxxxx.us-east-1.rds.amazonaws.com",
Port: int32(3306),
- Status: string(dbmesh_rds.DBClusterStatusAvailable),
+ Status: string(dbmeshawsrds.DBClusterStatusAvailable),
}, nil).Times(1)
// mock get instances of aws aurora are available
- mockAws.EXPECT().GetInstancesByFilters(gomock.Any(), gomock.Any()).Return([]*dbmesh_rds.DescInstance{
+ mockAws.EXPECT().GetInstancesByFilters(gomock.Any(), gomock.Any()).Return([]*dbmeshawsrds.DescInstance{
{
DBInstanceIdentifier: "test-aws-rds-cluster-instance-0",
- DBInstanceStatus: dbmesh_rds.DBInstanceStatusAvailable,
- Endpoint: dbmesh_rds.Endpoint{Address: "test-aws-rds-cluster-1.cluster-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306},
+ DBInstanceStatus: dbmeshawsrds.DBInstanceStatusAvailable,
+ Endpoint: dbmeshawsrds.Endpoint{Address: "test-aws-rds-cluster-1.cluster-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306},
},
}, nil).Times(1)
@@ -1592,27 +1592,27 @@ var _ = Describe("StorageNode Controller Mock Test For AWS RDS Cluster", func()
Expect(fakeClient.Create(ctx, svc)).Should(Succeed())
// mock aws rds cluster is available
- mockAws.EXPECT().GetRDSCluster(gomock.Any(), gomock.Any()).Return(&dbmesh_rds.DescCluster{
+ mockAws.EXPECT().GetRDSCluster(gomock.Any(), gomock.Any()).Return(&dbmeshawsrds.DescCluster{
DBClusterIdentifier: "test-aws-rds-cluster",
PrimaryEndpoint: "test-aws-rds-cluster.cluster-xxxxxx.us-east-1.rds.amazonaws.com",
Port: int32(3306),
- Status: string(dbmesh_rds.DBClusterStatusAvailable),
- DBClusterMembers: []dbmesh_rds.ClusterMember{
+ Status: string(dbmeshawsrds.DBClusterStatusAvailable),
+ DBClusterMembers: []dbmeshawsrds.ClusterMember{
{DBInstanceIdentifier: "test-aws-rds-cluster-instance-0"},
{DBInstanceIdentifier: "test-aws-rds-cluster-instance-1"},
},
}, nil).Times(2)
// mock get instances of aws aurora are available
- mockAws.EXPECT().GetInstancesByFilters(gomock.Any(), gomock.Any()).Return([]*dbmesh_rds.DescInstance{
+ mockAws.EXPECT().GetInstancesByFilters(gomock.Any(), gomock.Any()).Return([]*dbmeshawsrds.DescInstance{
{
DBInstanceIdentifier: "test-aws-rds-cluster-instance-0",
- DBInstanceStatus: dbmesh_rds.DBInstanceStatusAvailable,
- Endpoint: dbmesh_rds.Endpoint{Address: "test-aws-rds-cluster-1.cluster-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306},
+ DBInstanceStatus: dbmeshawsrds.DBInstanceStatusAvailable,
+ Endpoint: dbmeshawsrds.Endpoint{Address: "test-aws-rds-cluster-1.cluster-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306},
},
{
DBInstanceIdentifier: "test-aws-rds-cluster-instance-1",
- DBInstanceStatus: dbmesh_rds.DBInstanceStatusAvailable,
- Endpoint: dbmesh_rds.Endpoint{Address: "test-aws-rds-cluster-2.cluster-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306},
+ DBInstanceStatus: dbmeshawsrds.DBInstanceStatusAvailable,
+ Endpoint: dbmeshawsrds.Endpoint{Address: "test-aws-rds-cluster-2.cluster-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306},
},
}, nil).Times(2)
diff --git a/shardingsphere-operator/pkg/controllers/storage_node_controller.go b/shardingsphere-operator/pkg/controllers/storage_node_controller.go
index 37edba7..c221e87 100644
--- a/shardingsphere-operator/pkg/controllers/storage_node_controller.go
+++ b/shardingsphere-operator/pkg/controllers/storage_node_controller.go
@@ -31,7 +31,8 @@ import (
cnpg "github.com/cloudnative-pg/cloudnative-pg/api/v1"
cnpgutils "github.com/cloudnative-pg/cloudnative-pg/pkg/utils"
- "github.com/database-mesh/golang-sdk/aws/client/rds"
+ dbmeshaws "github.com/database-mesh/golang-sdk/aws"
+ dbmeshawsrds "github.com/database-mesh/golang-sdk/aws/client/rds"
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
@@ -62,10 +63,14 @@ type StorageNodeReconciler struct {
Scheme *runtime.Scheme
Log logr.Logger
Recorder record.EventRecorder
- AwsRDS rds.RDS
- CNPG cloudnativepg.CloudNativePG
+ Service service.Service
- Service service.Service
+ AwsRegion string
+ AwsAccessKeyID string
+ AwsSecretAccessKey string
+ AwsSessions dbmeshaws.Sessions
+
+ CNPG cloudnativepg.CloudNativePG
}
// +kubebuilder:rbac:groups=shardingsphere.apache.org,resources=storagenodes,verbs=get;list;watch;create;update;patch;delete
@@ -73,6 +78,7 @@ type StorageNodeReconciler struct {
// +kubebuilder:rbac:groups=shardingsphere.apache.org,resources=storagenodes/finalizers,verbs=update
// +kubebuilder:rbac:groups=shardingsphere.apache.org,resources=storageproviders,verbs=get;list;watch
// +kubebuilder:rbac:groups=postgresql.cnpg.io,resources=clusters,verbs=get;list;watch;create;update;patch;delete
+// +kubebuilder:rbac:groups="",resources=event,verbs=create;patch
// Reconcile handles main function of this controller
// nolint:gocognit
@@ -191,17 +197,17 @@ func (r *StorageNodeReconciler) reconcile(ctx context.Context, storageProvider *
// reconcile storage node with storageProvider
switch storageProvider.Spec.Provisioner {
case v1alpha1.ProvisionerAWSRDSInstance:
- if err := r.reconcileAwsRdsInstance(ctx, aws.NewRdsClient(r.AwsRDS), node, storageProvider); err != nil {
+ if err := r.reconcileAwsRdsInstance(ctx, r.getAwsRdsClient(), node, storageProvider); err != nil {
r.Recorder.Eventf(node, corev1.EventTypeWarning, "Reconcile Failed", fmt.Sprintf("unable to reconcile AWS RDS Instance %s/%s, err:%s", node.GetNamespace(), node.GetName(), err.Error()))
return ctrl.Result{RequeueAfter: defaultRequeueTime}, err
}
case v1alpha1.ProvisionerAWSRDSCluster:
- if err := r.reconcileAwsRDSCluster(ctx, aws.NewRdsClient(r.AwsRDS), node, storageProvider); err != nil {
+ if err := r.reconcileAwsRDSCluster(ctx, r.getAwsRdsClient(), node, storageProvider); err != nil {
r.Recorder.Eventf(node, corev1.EventTypeWarning, "Reconcile Failed", fmt.Sprintf("unable to reconcile AWS RDS Cluster %s/%s, err:%s", node.GetNamespace(), node.GetName(), err.Error()))
return ctrl.Result{RequeueAfter: defaultRequeueTime}, err
}
case v1alpha1.ProvisionerAWSAurora:
- if err := r.reconcileAwsAurora(ctx, aws.NewRdsClient(r.AwsRDS), node, storageProvider); err != nil {
+ if err := r.reconcileAwsAurora(ctx, r.getAwsRdsClient(), node, storageProvider); err != nil {
r.Recorder.Eventf(node, corev1.EventTypeWarning, "Reconcile Failed", fmt.Sprintf("unable to reconcile AWS Aurora %s/%s, err:%s", node.GetNamespace(), node.GetName(), err.Error()))
return ctrl.Result{RequeueAfter: defaultRequeueTime}, err
}
@@ -254,9 +260,9 @@ func (r *StorageNodeReconciler) getStorageProvider(ctx context.Context, node *v1
if storageProvider.Spec.Provisioner == v1alpha1.ProvisionerAWSRDSInstance ||
storageProvider.Spec.Provisioner == v1alpha1.ProvisionerAWSAurora ||
storageProvider.Spec.Provisioner == v1alpha1.ProvisionerAWSRDSCluster {
- if r.AwsRDS == nil {
- r.Recorder.Event(node, corev1.EventTypeWarning, "AwsRdsClientIsNil", "aws rds client is nil, please check your aws credentials")
- return nil, fmt.Errorf("aws rds client is nil, please check your aws credentials")
+ if r.AwsRegion == "" || r.AwsAccessKeyID == "" || r.AwsSecretAccessKey == "" {
+ r.Recorder.Eventf(node, corev1.EventTypeWarning, "awsCredentialsNotSet", "aws credentials not set")
+ return nil, fmt.Errorf("aws credentials not set")
}
}
@@ -276,7 +282,7 @@ func computeDesiredState(status v1alpha1.StorageNodeStatus) v1alpha1.StorageNode
}
} else {
// If the storage node is not being deleted, check if all instances are ready.
- if (clusterStatus == "" || clusterStatus == string(rds.DBClusterStatusAvailable)) && allInstancesReady(status.Instances) {
+ if (clusterStatus == "" || clusterStatus == string(dbmeshawsrds.DBClusterStatusAvailable)) && allInstancesReady(status.Instances) {
desiredState.Phase = v1alpha1.StorageNodePhaseReady
} else {
desiredState.Phase = v1alpha1.StorageNodePhaseNotReady
@@ -294,7 +300,7 @@ func computeNewConditions(desiredState, status v1alpha1.StorageNodeStatus, clust
// Update the cluster ready condition if the cluster status is not empty
if clusterStatus != "" {
- if clusterStatus == string(rds.DBClusterStatusAvailable) {
+ if clusterStatus == string(dbmeshawsrds.DBClusterStatusAvailable) {
newSNConditions.UpsertCondition(&v1alpha1.StorageNodeCondition{
Type: v1alpha1.StorageNodeConditionTypeClusterReady,
Status: corev1.ConditionTrue,
@@ -357,7 +363,7 @@ func allInstancesReady(instances []v1alpha1.InstanceStatus) bool {
for idx := range instances {
instance := &instances[idx]
- if !(instance.Status == string(rds.DBInstanceStatusAvailable)) {
+ if !(instance.Status == string(dbmeshawsrds.DBInstanceStatusAvailable)) {
return false
}
}
@@ -365,19 +371,27 @@ func allInstancesReady(instances []v1alpha1.InstanceStatus) bool {
return true
}
-func (r *StorageNodeReconciler) reconcileAwsRdsInstance(ctx context.Context, client aws.IRdsClient, node *v1alpha1.StorageNode, storageProvider *v1alpha1.StorageProvider) error {
- instance, err := client.GetInstance(ctx, node)
+func (r *StorageNodeReconciler) getAwsRdsClient() aws.IRdsClient {
+ if _, ok := r.AwsSessions[r.AwsRegion]; !ok {
+ sessions := dbmeshaws.NewSessions().SetCredential(r.AwsRegion, r.AwsAccessKeyID, r.AwsSecretAccessKey).Build()
+ r.AwsSessions = sessions
+ }
+ return aws.NewRdsClient(dbmeshawsrds.NewService(r.AwsSessions[r.AwsRegion]))
+}
+
+func (r *StorageNodeReconciler) reconcileAwsRdsInstance(ctx context.Context, rdsClient aws.IRdsClient, node *v1alpha1.StorageNode, storageProvider *v1alpha1.StorageProvider) error {
+ instance, err := rdsClient.GetInstance(ctx, node)
if err != nil {
return err
}
if instance == nil && node.Status.Phase != v1alpha1.StorageNodePhaseDeleting {
- err = client.CreateInstance(ctx, node, storageProvider.Spec.Parameters)
+ err = rdsClient.CreateInstance(ctx, node, storageProvider.Spec.Parameters)
if err != nil {
return err
}
- instance, err = client.GetInstance(ctx, node)
+ instance, err = rdsClient.GetInstance(ctx, node)
if err != nil {
return err
}
@@ -390,7 +404,7 @@ func (r *StorageNodeReconciler) reconcileAwsRdsInstance(ctx context.Context, cli
return nil
}
-func updateAWSRDSInstanceStatus(node *v1alpha1.StorageNode, instance *rds.DescInstance) error {
+func updateAWSRDSInstanceStatus(node *v1alpha1.StorageNode, instance *dbmeshawsrds.DescInstance) error {
instances := make([]v1alpha1.InstanceStatus, 0)
if instance == nil {
@@ -410,64 +424,61 @@ func updateAWSRDSInstanceStatus(node *v1alpha1.StorageNode, instance *rds.DescIn
return nil
}
-func (r *StorageNodeReconciler) reconcileAwsRDSCluster(ctx context.Context, client aws.IRdsClient, node *v1alpha1.StorageNode, storageProvider *v1alpha1.StorageProvider) error {
- ac, err := client.GetRDSCluster(ctx, node)
+// nolint:dupl
+func (r *StorageNodeReconciler) reconcileAwsRDSCluster(ctx context.Context, rdsClient aws.IRdsClient, node *v1alpha1.StorageNode, storageProvider *v1alpha1.StorageProvider) error {
+ rc, err := rdsClient.GetRDSCluster(ctx, node)
if err != nil {
return err
}
- if ac == nil {
+ if rc == nil {
// create instance
- err = client.CreateRDSCluster(ctx, node, storageProvider.Spec.Parameters)
+ err = rdsClient.CreateRDSCluster(ctx, node, storageProvider.Spec.Parameters)
if err != nil {
return err
}
- ac, err = client.GetRDSCluster(ctx, node)
+ rc, err = rdsClient.GetRDSCluster(ctx, node)
if err != nil {
return err
}
}
- // TODO: reconcile instance of aurora
-
// update storage node status
- if err := updateClusterStatus(ctx, client, node, ac); err != nil {
+ if err := updateClusterStatus(ctx, rdsClient, node, rc); err != nil {
return fmt.Errorf("updateClusterStatus failed: %w", err)
}
return nil
}
-func (r *StorageNodeReconciler) reconcileAwsAurora(ctx context.Context, client aws.IRdsClient, node *v1alpha1.StorageNode, storageProvider *v1alpha1.StorageProvider) error {
- r.Log.Info("reconcileAwsAurora", "node", node.GetName(), "phase", node.Status.Phase)
- ac, err := client.GetAuroraCluster(ctx, node)
+// nolint:dupl
+func (r *StorageNodeReconciler) reconcileAwsAurora(ctx context.Context, rdsClient aws.IRdsClient, node *v1alpha1.StorageNode, storageProvider *v1alpha1.StorageProvider) error {
+ ac, err := rdsClient.GetAuroraCluster(ctx, node)
if err != nil {
return err
}
if ac == nil {
// create instance
- err = client.CreateAuroraCluster(ctx, node, storageProvider.Spec.Parameters)
+ err = rdsClient.CreateAuroraCluster(ctx, node, storageProvider.Spec.Parameters)
if err != nil {
return err
}
- ac, err = client.GetAuroraCluster(ctx, node)
+ ac, err = rdsClient.GetAuroraCluster(ctx, node)
if err != nil {
return err
}
}
- // TODO: reconcile instance of aurora
-
// update storage node status
- if err := updateClusterStatus(ctx, client, node, ac); err != nil {
+ if err := updateClusterStatus(ctx, rdsClient, node, ac); err != nil {
return fmt.Errorf("updateClusterStatus failed: %w", err)
}
return nil
}
-func updateClusterStatus(ctx context.Context, client aws.IRdsClient, node *v1alpha1.StorageNode, cluster *rds.DescCluster) error {
+func updateClusterStatus(ctx context.Context, rdsClient aws.IRdsClient, node *v1alpha1.StorageNode, cluster *dbmeshawsrds.DescCluster) error {
// update cluster status
clusterStatus := v1alpha1.ClusterStatus{}
if cluster != nil {
@@ -492,7 +503,7 @@ func updateClusterStatus(ctx context.Context, client aws.IRdsClient, node *v1alp
filters := map[string][]string{
"db-cluster-id": {identifier},
}
- instances, err := client.GetInstancesByFilters(ctx, filters)
+ instances, err := rdsClient.GetInstancesByFilters(ctx, filters)
if err != nil {
return fmt.Errorf("GetInstances failed, err:%w", err)
}
@@ -514,15 +525,15 @@ func updateClusterStatus(ctx context.Context, client aws.IRdsClient, node *v1alp
func (r *StorageNodeReconciler) deleteDatabaseCluster(ctx context.Context, node *v1alpha1.StorageNode, storageProvider *v1alpha1.StorageProvider) error {
switch storageProvider.Spec.Provisioner {
case v1alpha1.ProvisionerAWSRDSInstance:
- if err := r.deleteAWSRDSInstance(ctx, aws.NewRdsClient(r.AwsRDS), node, storageProvider); err != nil {
+ if err := r.deleteAWSRDSInstance(ctx, r.getAwsRdsClient(), node, storageProvider); err != nil {
return fmt.Errorf("delete aws rds instance failed: %w", err)
}
case v1alpha1.ProvisionerAWSRDSCluster:
- if err := r.deleteAWSRDSCluster(ctx, aws.NewRdsClient(r.AwsRDS), node, storageProvider); err != nil {
+ if err := r.deleteAWSRDSCluster(ctx, r.getAwsRdsClient(), node, storageProvider); err != nil {
return fmt.Errorf("delete aws rds cluster failed: %w", err)
}
case v1alpha1.ProvisionerAWSAurora:
- if err := r.deleteAWSAurora(ctx, aws.NewRdsClient(r.AwsRDS), node, storageProvider); err != nil {
+ if err := r.deleteAWSAurora(ctx, r.getAwsRdsClient(), node, storageProvider); err != nil {
return fmt.Errorf("delete aws aurora cluster failed: %w", err)
}
default:
@@ -531,18 +542,18 @@ func (r *StorageNodeReconciler) deleteDatabaseCluster(ctx context.Context, node
return nil
}
-func (r *StorageNodeReconciler) deleteAWSRDSInstance(ctx context.Context, client aws.IRdsClient, node *v1alpha1.StorageNode, storageProvider *v1alpha1.StorageProvider) error {
+func (r *StorageNodeReconciler) deleteAWSRDSInstance(ctx context.Context, rdsClient aws.IRdsClient, node *v1alpha1.StorageNode, storageProvider *v1alpha1.StorageProvider) error {
if node.Annotations[v1alpha1.AnnotationsInstanceIdentifier] == "" {
return nil
}
- instance, err := client.GetInstance(ctx, node)
+ instance, err := rdsClient.GetInstance(ctx, node)
if err != nil {
return err
}
- if instance != nil && instance.DBInstanceStatus != rds.DBInstanceStatusDeleting {
- if err := client.DeleteInstance(ctx, node, storageProvider); err != nil {
+ if instance != nil && instance.DBInstanceStatus != dbmeshawsrds.DBInstanceStatusDeleting {
+ if err := rdsClient.DeleteInstance(ctx, node, storageProvider); err != nil {
r.Recorder.Eventf(node, corev1.EventTypeWarning, "DeleteFailed", "Failed to delete instance %s: %s", node.Annotations[v1alpha1.AnnotationsInstanceIdentifier], err.Error())
return err
}
@@ -558,17 +569,17 @@ func (r *StorageNodeReconciler) deleteAWSRDSInstance(ctx context.Context, client
}
// nolint:dupl
-func (r *StorageNodeReconciler) deleteAWSRDSCluster(ctx context.Context, client aws.IRdsClient, node *v1alpha1.StorageNode, storageProvider *v1alpha1.StorageProvider) error {
+func (r *StorageNodeReconciler) deleteAWSRDSCluster(ctx context.Context, rdsClient aws.IRdsClient, node *v1alpha1.StorageNode, storageProvider *v1alpha1.StorageProvider) error {
if node.Annotations[v1alpha1.AnnotationsClusterIdentifier] == "" {
return nil
}
- cluster, err := client.GetRDSCluster(ctx, node)
+ cluster, err := rdsClient.GetRDSCluster(ctx, node)
if err != nil {
return fmt.Errorf("get rds cluster failed: %w", err)
}
- if cluster != nil && cluster.Status != string(rds.DBClusterStatusDeleting) {
- if err := client.DeleteRDSCluster(ctx, node, storageProvider); err != nil {
+ if cluster != nil && cluster.Status != string(dbmeshawsrds.DBClusterStatusDeleting) {
+ if err := rdsClient.DeleteRDSCluster(ctx, node, storageProvider); err != nil {
r.Recorder.Eventf(node, corev1.EventTypeWarning, "DeleteFailed", "Failed to delete rds cluster %s: %s", node.Annotations[v1alpha1.AnnotationsClusterIdentifier], err.Error())
return err
}
@@ -576,24 +587,24 @@ func (r *StorageNodeReconciler) deleteAWSRDSCluster(ctx context.Context, client
}
// update storage node status
- if err := updateClusterStatus(ctx, client, node, cluster); err != nil {
+ if err := updateClusterStatus(ctx, rdsClient, node, cluster); err != nil {
return fmt.Errorf("updateClusterStatus failed: %w", err)
}
return nil
}
// nolint:dupl
-func (r *StorageNodeReconciler) deleteAWSAurora(ctx context.Context, client aws.IRdsClient, node *v1alpha1.StorageNode, storageProvider *v1alpha1.StorageProvider) error {
+func (r *StorageNodeReconciler) deleteAWSAurora(ctx context.Context, rdsClient aws.IRdsClient, node *v1alpha1.StorageNode, storageProvider *v1alpha1.StorageProvider) error {
if node.Annotations[v1alpha1.AnnotationsClusterIdentifier] == "" {
return nil
}
- auroraCluster, err := client.GetAuroraCluster(ctx, node)
+ auroraCluster, err := rdsClient.GetAuroraCluster(ctx, node)
if err != nil {
return fmt.Errorf("get aurora cluster failed: %w", err)
}
- if auroraCluster != nil && auroraCluster.Status != string(rds.DBClusterStatusDeleting) {
- if err := client.DeleteAuroraCluster(ctx, node, storageProvider); err != nil {
+ if auroraCluster != nil && auroraCluster.Status != string(dbmeshawsrds.DBClusterStatusDeleting) {
+ if err := rdsClient.DeleteAuroraCluster(ctx, node, storageProvider); err != nil {
r.Recorder.Eventf(node, corev1.EventTypeWarning, "DeleteFailed", "Failed to delete aurora cluster %s: %s", node.Annotations[v1alpha1.AnnotationsClusterIdentifier], err.Error())
return err
}
@@ -601,7 +612,7 @@ func (r *StorageNodeReconciler) deleteAWSAurora(ctx context.Context, client aws.
}
// update storage node status
- if err := updateClusterStatus(ctx, client, node, auroraCluster); err != nil {
+ if err := updateClusterStatus(ctx, rdsClient, node, auroraCluster); err != nil {
return fmt.Errorf("updateClusterStatus failed: %w", err)
}
return nil
diff --git a/shardingsphere-operator/test/e2e/e2e_suite_test.go b/shardingsphere-operator/test/e2e/e2e_suite_test.go
index 17e42d3..1f0f050 100644
--- a/shardingsphere-operator/test/e2e/e2e_suite_test.go
+++ b/shardingsphere-operator/test/e2e/e2e_suite_test.go
@@ -30,8 +30,6 @@ import (
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/kubernetes/deployment"
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/kubernetes/service"
- dbmesh_aws "github.com/database-mesh/golang-sdk/aws"
- dbmesh_rds "github.com/database-mesh/golang-sdk/aws/client/rds"
"github.com/golang/mock/gomock"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
@@ -93,14 +91,15 @@ var _ = BeforeSuite(func() {
k8sManager, err := ctrl.NewManager(cfg, ctrl.Options{})
Expect(err).ToNot(HaveOccurred())
// print k8sManager Options
- sess := dbmesh_aws.NewSessions().SetCredential("AwsRegion", "AwsAccessKeyID", "AwsSecretAccessKey").Build()
err = (&controllers.StorageNodeReconciler{
- Client: k8sManager.GetClient(),
- Scheme: k8sManager.GetScheme(),
- Log: ctrl.Log.WithName("controllers").WithName("StorageNode"),
- Recorder: k8sManager.GetEventRecorderFor("StorageNode"),
- AwsRDS: dbmesh_rds.NewService(sess["AwsRegion"]),
- Service: service.NewServiceClient(k8sManager.GetClient()),
+ Client: k8sManager.GetClient(),
+ Scheme: k8sManager.GetScheme(),
+ Log: ctrl.Log.WithName("controllers").WithName("StorageNode"),
+ Recorder: k8sManager.GetEventRecorderFor("StorageNode"),
+ AwsRegion: "AwsRegion",
+ AwsAccessKeyID: "AwsAccessKeyID",
+ AwsSecretAccessKey: "AwsSecretAccessKey",
+ Service: service.NewServiceClient(k8sManager.GetClient()),
}).SetupWithManager(k8sManager)
Expect(err).ToNot(HaveOccurred())
diff --git a/shardingsphere-operator/test/e2e/storage_node_controller_test.go b/shardingsphere-operator/test/e2e/storage_node_controller_test.go
index 47508ff..1fac6f8 100644
--- a/shardingsphere-operator/test/e2e/storage_node_controller_test.go
+++ b/shardingsphere-operator/test/e2e/storage_node_controller_test.go
@@ -31,7 +31,7 @@ import (
"bou.ke/monkey"
"github.com/DATA-DOG/go-sqlmock"
- dbmesh_rds "github.com/database-mesh/golang-sdk/aws/client/rds"
+ dbmeshawsrds "github.com/database-mesh/golang-sdk/aws/client/rds"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
apierrors "k8s.io/apimachinery/pkg/api/errors"
@@ -77,10 +77,10 @@ var _ = Describe("StorageNode Controller Suite Test For AWS RDS Instance", func(
It("should create success", func() {
// mock get instance func returns success
- monkey.PatchInstanceMethod(reflect.TypeOf(&aws.RdsClient{}), "GetInstance", func(_ *aws.RdsClient, _ context.Context, _ *v1alpha1.StorageNode) (*dbmesh_rds.DescInstance, error) {
- return &dbmesh_rds.DescInstance{
- DBInstanceStatus: dbmesh_rds.DBInstanceStatusAvailable,
- Endpoint: dbmesh_rds.Endpoint{
+ monkey.PatchInstanceMethod(reflect.TypeOf(&aws.RdsClient{}), "GetInstance", func(_ *aws.RdsClient, _ context.Context, _ *v1alpha1.StorageNode) (*dbmeshawsrds.DescInstance, error) {
+ return &dbmeshawsrds.DescInstance{
+ DBInstanceStatus: dbmeshawsrds.DBInstanceStatusAvailable,
+ Endpoint: dbmeshawsrds.Endpoint{
Address: "127.0.0.1",
Port: 3306,
},
@@ -153,10 +153,10 @@ var _ = Describe("StorageNode Controller Suite Test For AWS RDS Instance", func(
Expect(err).Should(Succeed())
Expect(dbmock).ShouldNot(BeNil())
// mock rds DescribeDBInstances func returns success
- g := monkey.PatchInstanceMethod(reflect.TypeOf(&aws.RdsClient{}), "GetInstance", func(_ *aws.RdsClient, _ context.Context, _ *v1alpha1.StorageNode) (*dbmesh_rds.DescInstance, error) {
- return &dbmesh_rds.DescInstance{
- DBInstanceStatus: dbmesh_rds.DBInstanceStatusAvailable,
- Endpoint: dbmesh_rds.Endpoint{
+ g := monkey.PatchInstanceMethod(reflect.TypeOf(&aws.RdsClient{}), "GetInstance", func(_ *aws.RdsClient, _ context.Context, _ *v1alpha1.StorageNode) (*dbmeshawsrds.DescInstance, error) {
+ return &dbmeshawsrds.DescInstance{
+ DBInstanceStatus: dbmeshawsrds.DBInstanceStatusAvailable,
+ Endpoint: dbmeshawsrds.Endpoint{
Address: "127.0.0.1",
Port: 3306,
},
@@ -273,7 +273,7 @@ var _ = Describe("StorageNode Controller Suite Test For AWS RDS Instance", func(
}, 20, 2).Should(Equal(v1alpha1.StorageNodePhaseDeleting))
g.Unpatch()
- monkey.PatchInstanceMethod(reflect.TypeOf(&aws.RdsClient{}), "GetInstance", func(_ *aws.RdsClient, _ context.Context, _ *v1alpha1.StorageNode) (*dbmesh_rds.DescInstance, error) {
+ monkey.PatchInstanceMethod(reflect.TypeOf(&aws.RdsClient{}), "GetInstance", func(_ *aws.RdsClient, _ context.Context, _ *v1alpha1.StorageNode) (*dbmeshawsrds.DescInstance, error) {
return nil, nil
})
@@ -321,21 +321,21 @@ var _ = Describe("StorageNode Controller Suite Test For AWS Aurora Cluster", fun
It("Should Success", func() {
snName := "test-storage-node-creating"
// monkey patch
- monkey.PatchInstanceMethod(reflect.TypeOf(&aws.RdsClient{}), "GetAuroraCluster", func(_ *aws.RdsClient, _ context.Context, _ *v1alpha1.StorageNode) (*dbmesh_rds.DescCluster, error) {
- return &dbmesh_rds.DescCluster{
+ monkey.PatchInstanceMethod(reflect.TypeOf(&aws.RdsClient{}), "GetAuroraCluster", func(_ *aws.RdsClient, _ context.Context, _ *v1alpha1.StorageNode) (*dbmeshawsrds.DescCluster, error) {
+ return &dbmeshawsrds.DescCluster{
DBClusterIdentifier: clusterIdentifier,
- Status: string(dbmesh_rds.DBClusterStatusCreating),
+ Status: string(dbmeshawsrds.DBClusterStatusCreating),
PrimaryEndpoint: "test-primary-endpoint",
ReaderEndpoint: "test-reader-endpoint",
Port: 3306,
}, nil
})
- monkey.PatchInstanceMethod(reflect.TypeOf(&aws.RdsClient{}), "GetInstancesByFilters", func(_ *aws.RdsClient, _ context.Context, _ map[string][]string) ([]*dbmesh_rds.DescInstance, error) {
- return []*dbmesh_rds.DescInstance{
+ monkey.PatchInstanceMethod(reflect.TypeOf(&aws.RdsClient{}), "GetInstancesByFilters", func(_ *aws.RdsClient, _ context.Context, _ map[string][]string) ([]*dbmeshawsrds.DescInstance, error) {
+ return []*dbmeshawsrds.DescInstance{
{
DBInstanceIdentifier: fmt.Sprintf("%s-insatnce-0", clusterIdentifier),
- DBInstanceStatus: dbmesh_rds.DBInstanceStatusCreating,
- Endpoint: dbmesh_rds.Endpoint{Address: "test-instance-0-endpoint", Port: 3306},
+ DBInstanceStatus: dbmeshawsrds.DBInstanceStatusCreating,
+ Endpoint: dbmeshawsrds.Endpoint{Address: "test-instance-0-endpoint", Port: 3306},
},
}, nil
})
@@ -359,31 +359,31 @@ var _ = Describe("StorageNode Controller Suite Test For AWS Aurora Cluster", fun
newSN := &v1alpha1.StorageNode{}
Expect(k8sClient.Get(ctx, client.ObjectKey{Name: snName, Namespace: "default"}, newSN)).Should(Succeed())
return newSN.Status.Cluster.Status
- }, time.Second*10, time.Millisecond*250).Should(Equal(string(dbmesh_rds.DBClusterStatusCreating)))
+ }, time.Second*10, time.Millisecond*250).Should(Equal(string(dbmeshawsrds.DBClusterStatusCreating)))
})
It("should success when cluster is available", func() {
snName := "test-storage-node-available"
- monkey.PatchInstanceMethod(reflect.TypeOf(&aws.RdsClient{}), "GetAuroraCluster", func(_ *aws.RdsClient, _ context.Context, _ *v1alpha1.StorageNode) (*dbmesh_rds.DescCluster, error) {
- return &dbmesh_rds.DescCluster{
+ monkey.PatchInstanceMethod(reflect.TypeOf(&aws.RdsClient{}), "GetAuroraCluster", func(_ *aws.RdsClient, _ context.Context, _ *v1alpha1.StorageNode) (*dbmeshawsrds.DescCluster, error) {
+ return &dbmeshawsrds.DescCluster{
DBClusterIdentifier: clusterIdentifier,
- Status: string(dbmesh_rds.DBClusterStatusAvailable),
+ Status: string(dbmeshawsrds.DBClusterStatusAvailable),
PrimaryEndpoint: "test-primary-endpoint",
ReaderEndpoint: "test-reader-endpoint",
Port: 3306,
}, nil
})
- monkey.PatchInstanceMethod(reflect.TypeOf(&aws.RdsClient{}), "GetInstancesByFilters", func(_ *aws.RdsClient, _ context.Context, _ map[string][]string) ([]*dbmesh_rds.DescInstance, error) {
- return []*dbmesh_rds.DescInstance{
+ monkey.PatchInstanceMethod(reflect.TypeOf(&aws.RdsClient{}), "GetInstancesByFilters", func(_ *aws.RdsClient, _ context.Context, _ map[string][]string) ([]*dbmeshawsrds.DescInstance, error) {
+ return []*dbmeshawsrds.DescInstance{
{
DBInstanceIdentifier: fmt.Sprintf("%s-insatnce-0", clusterIdentifier),
- DBInstanceStatus: dbmesh_rds.DBInstanceStatusAvailable,
- Endpoint: dbmesh_rds.Endpoint{Address: "test-instance-0-endpoint", Port: 3306},
+ DBInstanceStatus: dbmeshawsrds.DBInstanceStatusAvailable,
+ Endpoint: dbmeshawsrds.Endpoint{Address: "test-instance-0-endpoint", Port: 3306},
},
{
DBInstanceIdentifier: fmt.Sprintf("%s-insatnce-1", clusterIdentifier),
- DBInstanceStatus: dbmesh_rds.DBInstanceStatusAvailable,
- Endpoint: dbmesh_rds.Endpoint{Address: "test-instance-1-endpoint", Port: 3306},
+ DBInstanceStatus: dbmeshawsrds.DBInstanceStatusAvailable,
+ Endpoint: dbmeshawsrds.Endpoint{Address: "test-instance-1-endpoint", Port: 3306},
},
}, nil
})