You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by su...@apache.org on 2022/12/20 04:59:16 UTC
[shardingsphere-on-cloud] branch main updated: refactor: refactor reconcile for different workloads (#150)
This is an automated email from the ASF dual-hosted git repository.
sunnianjun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/shardingsphere-on-cloud.git
The following commit(s) were added to refs/heads/main by this push:
new ab2fe68 refactor: refactor reconcile for different workloads (#150)
ab2fe68 is described below
commit ab2fe68eff5ddad29aa3a65f944bc8dabe7d13e8
Author: liyao <ma...@126.com>
AuthorDate: Tue Dec 20 12:59:08 2022 +0800
refactor: refactor reconcile for different workloads (#150)
* chore: update gitignore
Signed-off-by: mlycore <ma...@126.com>
* refactor: update deployment reconcile
Signed-off-by: mlycore <ma...@126.com>
* refactor: remove diff
Signed-off-by: mlycore <ma...@126.com>
* refactor: add exp to UpdateService
Signed-off-by: mlycore <ma...@126.com>
* refactor: move proxy to near update
Signed-off-by: mlycore <ma...@126.com>
* fix: fix deployment resource requirement
Signed-off-by: mlycore <ma...@126.com>
* chore: fix unit test for UpdateDeployment
Signed-off-by: mlycore <ma...@126.com>
Signed-off-by: mlycore <ma...@126.com>
---
.gitignore | 2 +
.../pkg/controllers/proxy_controller.go | 78 ++++++++-----
.../pkg/reconcile/deployment.go | 130 +++++++++++++++++----
.../pkg/reconcile/reconcile_test.go | 18 +--
shardingsphere-operator/pkg/reconcile/service.go | 5 +-
5 files changed, 171 insertions(+), 62 deletions(-)
diff --git a/.gitignore b/.gitignore
index b2b4e7f..021b109 100644
--- a/.gitignore
+++ b/.gitignore
@@ -28,3 +28,5 @@ shardingsphere-operator/config/
charts/apache-shardingsphere-operator-cluster-charts/charts/
charts/apache-shardingsphere-operator-charts/charts/
**/Chart.lock
+test
+certs
diff --git a/shardingsphere-operator/pkg/controllers/proxy_controller.go b/shardingsphere-operator/pkg/controllers/proxy_controller.go
index 85bf3c8..e582448 100644
--- a/shardingsphere-operator/pkg/controllers/proxy_controller.go
+++ b/shardingsphere-operator/pkg/controllers/proxy_controller.go
@@ -84,21 +84,21 @@ func (r *ProxyReconciler) getRuntimeShardingSphereProxy(ctx context.Context, nam
func (r *ProxyReconciler) reconcile(ctx context.Context, req ctrl.Request, rt *v1alpha1.ShardingSphereProxy) (ctrl.Result, error) {
log := logger.FromContext(ctx)
- if res, err := r.reconcileDeployment(ctx, req.NamespacedName, rt); err != nil {
+ if res, err := r.reconcileDeployment(ctx, req.NamespacedName); err != nil {
log.Error(err, "Error reconcile Deployment")
return res, err
}
- if res, err := r.reconcileService(ctx, req.NamespacedName, rt); err != nil {
+ if res, err := r.reconcileService(ctx, req.NamespacedName); err != nil {
log.Error(err, "Error reconcile Service")
return res, err
}
- if res, err := r.reconcilePodList(ctx, req.Namespace, req.Name, rt); err != nil {
+ if res, err := r.reconcilePodList(ctx, req.Namespace, req.Name); err != nil {
log.Error(err, "Error reconcile Pod list")
return res, err
}
- if res, err := r.reconcileHPA(ctx, req.NamespacedName, rt); err != nil {
+ if res, err := r.reconcileHPA(ctx, req.NamespacedName); err != nil {
log.Error(err, "Error reconcile HPA")
return res, err
}
@@ -106,11 +106,15 @@ func (r *ProxyReconciler) reconcile(ctx context.Context, req ctrl.Request, rt *v
return ctrl.Result{}, nil
}
-func (r *ProxyReconciler) reconcileDeployment(ctx context.Context, namespacedName types.NamespacedName, ssproxy *v1alpha1.ShardingSphereProxy) (ctrl.Result, error) {
+func (r *ProxyReconciler) reconcileDeployment(ctx context.Context, namespacedName types.NamespacedName) (ctrl.Result, error) {
+ ssproxy, err := r.getRuntimeShardingSphereProxy(ctx, namespacedName)
+ if err != nil {
+ return ctrl.Result{}, err
+ }
+
deploy := &appsv1.Deployment{}
- var err error
- if err = r.Get(ctx, namespacedName, deploy); err != nil {
+ if err := r.Get(ctx, namespacedName, deploy); err != nil {
if !apierrors.IsNotFound(err) {
return ctrl.Result{}, err
} else {
@@ -123,13 +127,11 @@ func (r *ProxyReconciler) reconcileDeployment(ctx context.Context, namespacedNam
}
} else {
act := deploy.DeepCopy()
- exp := reconcile.UpdateDeployment(ssproxy, act)
-
- //FIXME: using diff to trigger update
- // if reflect.DeepEqual(act.Spec.Template, exp.Spec.Template) {
- // return ctrl.Result{}, nil
- // }
+ exp := reconcile.UpdateDeployment(ssproxy, act)
+ if err != nil {
+ return ctrl.Result{}, err
+ }
if err := r.Update(ctx, exp); err != nil {
return ctrl.Result{Requeue: true}, err
}
@@ -137,11 +139,15 @@ func (r *ProxyReconciler) reconcileDeployment(ctx context.Context, namespacedNam
return ctrl.Result{}, nil
}
-func (r *ProxyReconciler) reconcileHPA(ctx context.Context, namespacedName types.NamespacedName, ssproxy *v1alpha1.ShardingSphereProxy) (ctrl.Result, error) {
+func (r *ProxyReconciler) reconcileHPA(ctx context.Context, namespacedName types.NamespacedName) (ctrl.Result, error) {
+ ssproxy, err := r.getRuntimeShardingSphereProxy(ctx, namespacedName)
+ if err != nil {
+ return ctrl.Result{}, err
+ }
+
hpa := &autoscalingv2beta2.HorizontalPodAutoscaler{}
- var err error
- if err = r.Get(ctx, namespacedName, hpa); err != nil {
+ if err := r.Get(ctx, namespacedName, hpa); err != nil {
if !apierrors.IsNotFound(err) {
return ctrl.Result{}, err
} else {
@@ -171,11 +177,15 @@ func (r *ProxyReconciler) reconcileHPA(ctx context.Context, namespacedName types
return ctrl.Result{}, nil
}
-func (r *ProxyReconciler) reconcileService(ctx context.Context, namespacedName types.NamespacedName, ssproxy *v1alpha1.ShardingSphereProxy) (ctrl.Result, error) {
+func (r *ProxyReconciler) reconcileService(ctx context.Context, namespacedName types.NamespacedName) (ctrl.Result, error) {
+ ssproxy, err := r.getRuntimeShardingSphereProxy(ctx, namespacedName)
+ if err != nil {
+ return ctrl.Result{}, err
+ }
+
service := &v1.Service{}
- var err error
- if err = r.Get(ctx, namespacedName, service); err != nil {
+ if err := r.Get(ctx, namespacedName, service); err != nil {
if !apierrors.IsNotFound(err) {
return ctrl.Result{}, err
} else {
@@ -190,8 +200,8 @@ func (r *ProxyReconciler) reconcileService(ctx context.Context, namespacedName t
}
} else {
act := service.DeepCopy()
- reconcile.UpdateService(ssproxy, act)
- if err := r.Update(ctx, act); err != nil {
+ exp := reconcile.UpdateService(ssproxy, act)
+ if err := r.Update(ctx, exp); err != nil {
return ctrl.Result{}, err
}
}
@@ -199,7 +209,7 @@ func (r *ProxyReconciler) reconcileService(ctx context.Context, namespacedName t
return ctrl.Result{}, nil
}
-func (r *ProxyReconciler) reconcilePodList(ctx context.Context, namespace, name string, ssproxy *v1alpha1.ShardingSphereProxy) (ctrl.Result, error) {
+func (r *ProxyReconciler) reconcilePodList(ctx context.Context, namespace, name string) (ctrl.Result, error) {
podList := &v1.PodList{}
if err := r.List(ctx, podList, client.InNamespace(namespace), client.MatchingLabels(map[string]string{"apps": name})); err != nil {
return ctrl.Result{}, err
@@ -207,27 +217,35 @@ func (r *ProxyReconciler) reconcilePodList(ctx context.Context, namespace, name
result := ctrl.Result{}
readyNodes := reconcile.CountingReadyPods(podList)
+
+ rt, err := r.getRuntimeShardingSphereProxy(ctx, types.NamespacedName{
+ Namespace: namespace,
+ Name: name,
+ })
+ if err != nil {
+ return ctrl.Result{}, err
+ }
if reconcile.IsRunning(podList) {
if readyNodes < miniReadyCount {
result.RequeueAfter = WaitingForReady
- if readyNodes != ssproxy.Status.ReadyNodes {
- ssproxy.SetPodStarted(readyNodes)
+ if readyNodes != rt.Status.ReadyNodes {
+ rt.SetPodStarted(readyNodes)
}
} else {
- if ssproxy.Status.Phase != v1alpha1.StatusReady {
- ssproxy.SetReady(readyNodes)
- } else if readyNodes != ssproxy.Spec.Replicas {
- ssproxy.UpdateReadyNodes(readyNodes)
+ if rt.Status.Phase != v1alpha1.StatusReady {
+ rt.SetReady(readyNodes)
+ } else if readyNodes != rt.Spec.Replicas {
+ rt.UpdateReadyNodes(readyNodes)
}
}
} else {
// TODO: Waiting for pods to start exceeds the maximum number of retries
- ssproxy.SetPodNotStarted(readyNodes)
+ rt.SetPodNotStarted(readyNodes)
result.RequeueAfter = WaitingForReady
}
// TODO: Compare Status with or without modification
- if err := r.Status().Update(ctx, ssproxy); err != nil {
+ if err := r.Status().Update(ctx, rt); err != nil {
return result, err
}
diff --git a/shardingsphere-operator/pkg/reconcile/deployment.go b/shardingsphere-operator/pkg/reconcile/deployment.go
index fb1bb0f..9734763 100644
--- a/shardingsphere-operator/pkg/reconcile/deployment.go
+++ b/shardingsphere-operator/pkg/reconcile/deployment.go
@@ -19,7 +19,6 @@ package reconcile
import (
"fmt"
- "html/template"
"reflect"
"strconv"
"strings"
@@ -130,8 +129,13 @@ func processOptionalParameter(proxy *v1alpha1.ShardingSphereProxy, dp *v1.Deploy
return dp
}
-func addInitContainer(dp *v1.Deployment, mysql *v1alpha1.MySQLDriver) {
+const script = `wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/${VERSION}/mysql-connector-java-${VERSION}.jar;
+wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/${VERSION}/mysql-connector-java-${VERSION}.jar.md5;
+if [ $(md5sum /mysql-connector-java-${VERSION}.jar | cut -d ' ' -f1) = $(cat /mysql-connector-java-${VERSION}.jar.md5) ];
+then echo success;
+else echo failed;exit 1;fi;mv /mysql-connector-java-${VERSION}.jar /opt/shardingsphere-proxy/ext-lib`
+func addInitContainer(dp *v1.Deployment, mysql *v1alpha1.MySQLDriver) {
if len(dp.Spec.Template.Spec.InitContainers) == 0 {
dp.Spec.Template.Spec.Containers[0].VolumeMounts = append(dp.Spec.Template.Spec.Containers[0].VolumeMounts, corev1.VolumeMount{
Name: "mysql-connect-jar",
@@ -146,18 +150,17 @@ func addInitContainer(dp *v1.Deployment, mysql *v1alpha1.MySQLDriver) {
})
}
- scriptStr := strings.Builder{}
- t1, _ := template.New("shell").Parse(`wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/{{ .Version }}/mysql-connector-java-{{ .Version }}.jar;
-wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/{{ .Version }}/mysql-connector-java-{{ .Version }}.jar.md5;
-if [ $(md5sum /mysql-connector-java-{{ .Version }}.jar | cut -d ' ' -f1) = $(cat /mysql-connector-java-{{ .Version }}.jar.md5) ];
-then echo success;
-else echo failed;exit 1;fi;mv /mysql-connector-java-{{ .Version }}.jar /opt/shardingsphere-proxy/ext-lib`)
- _ = t1.Execute(&scriptStr, mysql)
dp.Spec.Template.Spec.InitContainers = []corev1.Container{
{
Name: "download-mysql-connect",
Image: "busybox:1.35.0",
- Command: []string{"/bin/sh", "-c", scriptStr.String()},
+ Command: []string{"/bin/sh", "-c", script},
+ Env: []corev1.EnvVar{
+ {
+ Name: "VERSION",
+ Value: mysql.Version,
+ },
+ },
VolumeMounts: []corev1.VolumeMount{
{
Name: "mysql-connect-jar",
@@ -171,25 +174,108 @@ else echo failed;exit 1;fi;mv /mysql-connector-java-{{ .Version }}.jar /opt/shar
// UpdateDeployment FIXME:merge UpdateDeployment and ConstructCascadingDeployment
func UpdateDeployment(proxy *v1alpha1.ShardingSphereProxy, act *v1.Deployment) *v1.Deployment {
+ exp := act.DeepCopy()
+
if proxy.Spec.AutomaticScaling == nil || !proxy.Spec.AutomaticScaling.Enable {
- act.Spec.Replicas = &proxy.Spec.Replicas
+ exp.Spec.Replicas = updateReplicas(proxy, act)
}
+ exp.Spec.Template = updatePodTemplateSpec(proxy, act)
+ return exp
+}
- act.Spec.Template.Spec.Containers[0].Image = fmt.Sprintf("%s:%s", imageName, proxy.Spec.Version)
- act.Spec.Template.Spec.Containers[0].Env[0].Value = strconv.FormatInt(int64(proxy.Spec.Port), 10)
- act.Spec.Template.Spec.Containers[0].Ports[0].ContainerPort = proxy.Spec.Port
+func updateReplicas(proxy *v1alpha1.ShardingSphereProxy, act *v1.Deployment) *int32 {
+ if *act.Spec.Replicas != proxy.Spec.Replicas {
+ return &proxy.Spec.Replicas
+ }
+ return act.Spec.Replicas
+}
- act.Spec.Template.Spec.Containers[0].Resources = proxy.Spec.Resources
- act.Spec.Template.Spec.Containers[0].LivenessProbe = proxy.Spec.LivenessProbe
- act.Spec.Template.Spec.Containers[0].ReadinessProbe = proxy.Spec.ReadinessProbe
- act.Spec.Template.Spec.Containers[0].StartupProbe = proxy.Spec.StartupProbe
+func updatePodTemplateSpec(proxy *v1alpha1.ShardingSphereProxy, act *v1.Deployment) corev1.PodTemplateSpec {
+ exp := act.Spec.Template.DeepCopy()
- act.Spec.Template.Spec.Volumes[0].ConfigMap.Name = proxy.Spec.ProxyConfigName
+ SSProxyContainer := updateSSProxyContainer(proxy, act)
+ for i, _ := range exp.Spec.Containers {
+ if exp.Spec.Containers[i].Name == "proxy" {
+ exp.Spec.Containers[i] = *SSProxyContainer
+ }
+ }
- if proxy.Spec.MySQLDriver.Version != "" {
- addInitContainer(act, proxy.Spec.MySQLDriver)
+ initContainer := updateInitContainer(proxy, act)
+ for i, _ := range exp.Spec.InitContainers {
+ if exp.Spec.InitContainers[i].Name == "download-mysql-connect" {
+ exp.Spec.InitContainers[i] = *initContainer
+ }
}
- exp := act.DeepCopy()
+ configName := updateConfigName(proxy, act)
+ exp.Spec.Volumes[0].ConfigMap.Name = configName
+
+ return *exp
+}
+
+func updateConfigName(proxy *v1alpha1.ShardingSphereProxy, act *v1.Deployment) string {
+ if act.Spec.Template.Spec.Volumes[0].ConfigMap.Name != proxy.Spec.ProxyConfigName {
+ return proxy.Spec.ProxyConfigName
+ }
+ return act.Spec.Template.Spec.Volumes[0].ConfigMap.Name
+}
+
+func updateInitContainer(proxy *v1alpha1.ShardingSphereProxy, act *v1.Deployment) *corev1.Container {
+ var exp *corev1.Container
+
+ for _, c := range act.Spec.Template.Spec.InitContainers {
+ if c.Name == "download-mysql-connect" {
+ for i, _ := range c.Env {
+ if c.Env[i].Name == "VERSION" {
+ if c.Env[i].Value != proxy.Spec.MySQLDriver.Version {
+ c.Env[i].Value = proxy.Spec.MySQLDriver.Version
+ }
+ }
+ }
+ exp = c.DeepCopy()
+ }
+ }
+
+ return exp
+}
+
+func updateSSProxyContainer(proxy *v1alpha1.ShardingSphereProxy, act *v1.Deployment) *corev1.Container {
+ var exp *corev1.Container
+
+ for _, c := range act.Spec.Template.Spec.Containers {
+ if c.Name == "proxy" {
+ exp = c.DeepCopy()
+
+ tag := strings.Split(c.Image, ":")[1]
+ if tag != proxy.Spec.Version {
+ exp.Image = fmt.Sprintf("%s:%s", imageName, proxy.Spec.Version)
+ }
+
+ exp.Resources = proxy.Spec.Resources
+
+ if proxy.Spec.LivenessProbe != nil && !reflect.DeepEqual(c.LivenessProbe, *proxy.Spec.LivenessProbe) {
+ exp.LivenessProbe = proxy.Spec.LivenessProbe
+ }
+
+ if proxy.Spec.ReadinessProbe != nil && !reflect.DeepEqual(c.ReadinessProbe, *proxy.Spec.ReadinessProbe) {
+ exp.ReadinessProbe = proxy.Spec.ReadinessProbe
+ }
+
+ if proxy.Spec.StartupProbe != nil && !reflect.DeepEqual(c.StartupProbe, *proxy.Spec.StartupProbe) {
+ exp.StartupProbe = proxy.Spec.StartupProbe
+ }
+
+ for i, _ := range c.Env {
+ if c.Env[i].Name == "PORT" {
+ proxyPort := strconv.FormatInt(int64(proxy.Spec.Port), 10)
+ if c.Env[i].Value != proxyPort {
+ c.Env[i].Value = proxyPort
+ exp.Ports[0].ContainerPort = proxy.Spec.Port
+ }
+ }
+ }
+ exp.Env = c.Env
+ }
+ }
return exp
}
diff --git a/shardingsphere-operator/pkg/reconcile/reconcile_test.go b/shardingsphere-operator/pkg/reconcile/reconcile_test.go
index 9a12ef4..d7580c1 100644
--- a/shardingsphere-operator/pkg/reconcile/reconcile_test.go
+++ b/shardingsphere-operator/pkg/reconcile/reconcile_test.go
@@ -860,15 +860,15 @@ func Test_UpdateDeployment(t *testing.T) {
}
for _, c := range cases {
- UpdateDeployment(c.proxy, c.deploy)
- assert.Equal(t, fmt.Sprintf("%s:%s", imageName, c.proxy.Spec.Version), c.deploy.Spec.Template.Spec.Containers[0].Image, c.message)
- assert.Equal(t, c.proxy.Spec.Replicas, *c.deploy.Spec.Replicas, c.message)
- assert.Equal(t, c.proxy.Spec.ProxyConfigName, c.deploy.Spec.Template.Spec.Volumes[0].ConfigMap.Name, c.message)
- assert.Equal(t, c.proxy.Spec.Port, c.deploy.Spec.Template.Spec.Containers[0].Ports[0].ContainerPort, c.message)
- assert.EqualValues(t, c.proxy.Spec.Resources, c.deploy.Spec.Template.Spec.Containers[0].Resources, c.message)
- assert.EqualValues(t, c.proxy.Spec.LivenessProbe, c.deploy.Spec.Template.Spec.Containers[0].LivenessProbe, c.message)
- assert.EqualValues(t, c.proxy.Spec.ReadinessProbe, c.deploy.Spec.Template.Spec.Containers[0].ReadinessProbe, c.message)
- assert.EqualValues(t, c.proxy.Spec.StartupProbe, c.deploy.Spec.Template.Spec.Containers[0].StartupProbe, c.message)
+ exp := UpdateDeployment(c.proxy, c.deploy)
+ assert.Equal(t, fmt.Sprintf("%s:%s", imageName, c.proxy.Spec.Version), exp.Spec.Template.Spec.Containers[0].Image, c.message)
+ assert.Equal(t, c.proxy.Spec.Replicas, *exp.Spec.Replicas, c.message)
+ assert.Equal(t, c.proxy.Spec.ProxyConfigName, exp.Spec.Template.Spec.Volumes[0].ConfigMap.Name, c.message)
+ assert.Equal(t, c.proxy.Spec.Port, exp.Spec.Template.Spec.Containers[0].Ports[0].ContainerPort, c.message)
+ assert.EqualValues(t, c.proxy.Spec.Resources, exp.Spec.Template.Spec.Containers[0].Resources, c.message)
+ assert.EqualValues(t, c.proxy.Spec.LivenessProbe, exp.Spec.Template.Spec.Containers[0].LivenessProbe, c.message)
+ assert.EqualValues(t, c.proxy.Spec.ReadinessProbe, exp.Spec.Template.Spec.Containers[0].ReadinessProbe, c.message)
+ assert.EqualValues(t, c.proxy.Spec.StartupProbe, exp.Spec.Template.Spec.Containers[0].StartupProbe, c.message)
}
}
diff --git a/shardingsphere-operator/pkg/reconcile/service.go b/shardingsphere-operator/pkg/reconcile/service.go
index 3fdf693..65cefc2 100644
--- a/shardingsphere-operator/pkg/reconcile/service.go
+++ b/shardingsphere-operator/pkg/reconcile/service.go
@@ -62,11 +62,14 @@ func ConstructCascadingService(proxy *v1alpha1.ShardingSphereProxy) *v1.Service
return &svc
}
-func UpdateService(proxy *v1alpha1.ShardingSphereProxy, runtimeService *v1.Service) {
+func UpdateService(proxy *v1alpha1.ShardingSphereProxy, runtimeService *v1.Service) *v1.Service {
+ exp := &v1.Service{}
runtimeService.Spec.Type = proxy.Spec.ServiceType.Type
runtimeService.Spec.Ports[0].Port = proxy.Spec.Port
runtimeService.Spec.Ports[0].TargetPort = fromInt32(proxy.Spec.Port)
if proxy.Spec.ServiceType.NodePort != 0 {
runtimeService.Spec.Ports[0].NodePort = proxy.Spec.ServiceType.NodePort
}
+ exp = runtimeService.DeepCopy()
+ return exp
}