You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by su...@apache.org on 2022/12/20 04:59:16 UTC

[shardingsphere-on-cloud] branch main updated: refactor: refactor reconcile for different workloads (#150)

This is an automated email from the ASF dual-hosted git repository.

sunnianjun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/shardingsphere-on-cloud.git


The following commit(s) were added to refs/heads/main by this push:
     new ab2fe68  refactor: refactor reconcile for different workloads (#150)
ab2fe68 is described below

commit ab2fe68eff5ddad29aa3a65f944bc8dabe7d13e8
Author: liyao <ma...@126.com>
AuthorDate: Tue Dec 20 12:59:08 2022 +0800

    refactor: refactor reconcile for different workloads (#150)
    
    * chore: update gitignore
    
    Signed-off-by: mlycore <ma...@126.com>
    
    * refactor: update deployment reconcile
    
    Signed-off-by: mlycore <ma...@126.com>
    
    * refactor: remove diff
    
    Signed-off-by: mlycore <ma...@126.com>
    
    * refactor: add exp to UpdateService
    
    Signed-off-by: mlycore <ma...@126.com>
    
    * refactor: move proxy to near update
    
    Signed-off-by: mlycore <ma...@126.com>
    
    * fix: fix deployment resource requirement
    
    Signed-off-by: mlycore <ma...@126.com>
    
    * chore: fix unit test for UpdateDeployment
    
    Signed-off-by: mlycore <ma...@126.com>
    
    Signed-off-by: mlycore <ma...@126.com>
---
 .gitignore                                         |   2 +
 .../pkg/controllers/proxy_controller.go            |  78 ++++++++-----
 .../pkg/reconcile/deployment.go                    | 130 +++++++++++++++++----
 .../pkg/reconcile/reconcile_test.go                |  18 +--
 shardingsphere-operator/pkg/reconcile/service.go   |   5 +-
 5 files changed, 171 insertions(+), 62 deletions(-)

diff --git a/.gitignore b/.gitignore
index b2b4e7f..021b109 100644
--- a/.gitignore
+++ b/.gitignore
@@ -28,3 +28,5 @@ shardingsphere-operator/config/
 charts/apache-shardingsphere-operator-cluster-charts/charts/
 charts/apache-shardingsphere-operator-charts/charts/
 **/Chart.lock
+test
+certs
diff --git a/shardingsphere-operator/pkg/controllers/proxy_controller.go b/shardingsphere-operator/pkg/controllers/proxy_controller.go
index 85bf3c8..e582448 100644
--- a/shardingsphere-operator/pkg/controllers/proxy_controller.go
+++ b/shardingsphere-operator/pkg/controllers/proxy_controller.go
@@ -84,21 +84,21 @@ func (r *ProxyReconciler) getRuntimeShardingSphereProxy(ctx context.Context, nam
 
 func (r *ProxyReconciler) reconcile(ctx context.Context, req ctrl.Request, rt *v1alpha1.ShardingSphereProxy) (ctrl.Result, error) {
 	log := logger.FromContext(ctx)
-	if res, err := r.reconcileDeployment(ctx, req.NamespacedName, rt); err != nil {
+	if res, err := r.reconcileDeployment(ctx, req.NamespacedName); err != nil {
 		log.Error(err, "Error reconcile Deployment")
 		return res, err
 	}
 
-	if res, err := r.reconcileService(ctx, req.NamespacedName, rt); err != nil {
+	if res, err := r.reconcileService(ctx, req.NamespacedName); err != nil {
 		log.Error(err, "Error reconcile Service")
 		return res, err
 	}
-	if res, err := r.reconcilePodList(ctx, req.Namespace, req.Name, rt); err != nil {
+	if res, err := r.reconcilePodList(ctx, req.Namespace, req.Name); err != nil {
 		log.Error(err, "Error reconcile Pod list")
 		return res, err
 	}
 
-	if res, err := r.reconcileHPA(ctx, req.NamespacedName, rt); err != nil {
+	if res, err := r.reconcileHPA(ctx, req.NamespacedName); err != nil {
 		log.Error(err, "Error reconcile HPA")
 		return res, err
 	}
@@ -106,11 +106,15 @@ func (r *ProxyReconciler) reconcile(ctx context.Context, req ctrl.Request, rt *v
 	return ctrl.Result{}, nil
 }
 
-func (r *ProxyReconciler) reconcileDeployment(ctx context.Context, namespacedName types.NamespacedName, ssproxy *v1alpha1.ShardingSphereProxy) (ctrl.Result, error) {
+func (r *ProxyReconciler) reconcileDeployment(ctx context.Context, namespacedName types.NamespacedName) (ctrl.Result, error) {
+	ssproxy, err := r.getRuntimeShardingSphereProxy(ctx, namespacedName)
+	if err != nil {
+		return ctrl.Result{}, err
+	}
+
 	deploy := &appsv1.Deployment{}
 
-	var err error
-	if err = r.Get(ctx, namespacedName, deploy); err != nil {
+	if err := r.Get(ctx, namespacedName, deploy); err != nil {
 		if !apierrors.IsNotFound(err) {
 			return ctrl.Result{}, err
 		} else {
@@ -123,13 +127,11 @@ func (r *ProxyReconciler) reconcileDeployment(ctx context.Context, namespacedNam
 		}
 	} else {
 		act := deploy.DeepCopy()
-		exp := reconcile.UpdateDeployment(ssproxy, act)
-
-		//FIXME: using diff to trigger update
-		// if reflect.DeepEqual(act.Spec.Template, exp.Spec.Template) {
-		// 	return ctrl.Result{}, nil
-		// }
 
+		exp := reconcile.UpdateDeployment(ssproxy, act)
+		if err != nil {
+			return ctrl.Result{}, err
+		}
 		if err := r.Update(ctx, exp); err != nil {
 			return ctrl.Result{Requeue: true}, err
 		}
@@ -137,11 +139,15 @@ func (r *ProxyReconciler) reconcileDeployment(ctx context.Context, namespacedNam
 	return ctrl.Result{}, nil
 }
 
-func (r *ProxyReconciler) reconcileHPA(ctx context.Context, namespacedName types.NamespacedName, ssproxy *v1alpha1.ShardingSphereProxy) (ctrl.Result, error) {
+func (r *ProxyReconciler) reconcileHPA(ctx context.Context, namespacedName types.NamespacedName) (ctrl.Result, error) {
+	ssproxy, err := r.getRuntimeShardingSphereProxy(ctx, namespacedName)
+	if err != nil {
+		return ctrl.Result{}, err
+	}
+
 	hpa := &autoscalingv2beta2.HorizontalPodAutoscaler{}
 
-	var err error
-	if err = r.Get(ctx, namespacedName, hpa); err != nil {
+	if err := r.Get(ctx, namespacedName, hpa); err != nil {
 		if !apierrors.IsNotFound(err) {
 			return ctrl.Result{}, err
 		} else {
@@ -171,11 +177,15 @@ func (r *ProxyReconciler) reconcileHPA(ctx context.Context, namespacedName types
 	return ctrl.Result{}, nil
 }
 
-func (r *ProxyReconciler) reconcileService(ctx context.Context, namespacedName types.NamespacedName, ssproxy *v1alpha1.ShardingSphereProxy) (ctrl.Result, error) {
+func (r *ProxyReconciler) reconcileService(ctx context.Context, namespacedName types.NamespacedName) (ctrl.Result, error) {
+	ssproxy, err := r.getRuntimeShardingSphereProxy(ctx, namespacedName)
+	if err != nil {
+		return ctrl.Result{}, err
+	}
+
 	service := &v1.Service{}
 
-	var err error
-	if err = r.Get(ctx, namespacedName, service); err != nil {
+	if err := r.Get(ctx, namespacedName, service); err != nil {
 		if !apierrors.IsNotFound(err) {
 			return ctrl.Result{}, err
 		} else {
@@ -190,8 +200,8 @@ func (r *ProxyReconciler) reconcileService(ctx context.Context, namespacedName t
 		}
 	} else {
 		act := service.DeepCopy()
-		reconcile.UpdateService(ssproxy, act)
-		if err := r.Update(ctx, act); err != nil {
+		exp := reconcile.UpdateService(ssproxy, act)
+		if err := r.Update(ctx, exp); err != nil {
 			return ctrl.Result{}, err
 		}
 	}
@@ -199,7 +209,7 @@ func (r *ProxyReconciler) reconcileService(ctx context.Context, namespacedName t
 	return ctrl.Result{}, nil
 }
 
-func (r *ProxyReconciler) reconcilePodList(ctx context.Context, namespace, name string, ssproxy *v1alpha1.ShardingSphereProxy) (ctrl.Result, error) {
+func (r *ProxyReconciler) reconcilePodList(ctx context.Context, namespace, name string) (ctrl.Result, error) {
 	podList := &v1.PodList{}
 	if err := r.List(ctx, podList, client.InNamespace(namespace), client.MatchingLabels(map[string]string{"apps": name})); err != nil {
 		return ctrl.Result{}, err
@@ -207,27 +217,35 @@ func (r *ProxyReconciler) reconcilePodList(ctx context.Context, namespace, name
 
 	result := ctrl.Result{}
 	readyNodes := reconcile.CountingReadyPods(podList)
+
+	rt, err := r.getRuntimeShardingSphereProxy(ctx, types.NamespacedName{
+		Namespace: namespace,
+		Name:      name,
+	})
+	if err != nil {
+		return ctrl.Result{}, err
+	}
 	if reconcile.IsRunning(podList) {
 		if readyNodes < miniReadyCount {
 			result.RequeueAfter = WaitingForReady
-			if readyNodes != ssproxy.Status.ReadyNodes {
-				ssproxy.SetPodStarted(readyNodes)
+			if readyNodes != rt.Status.ReadyNodes {
+				rt.SetPodStarted(readyNodes)
 			}
 		} else {
-			if ssproxy.Status.Phase != v1alpha1.StatusReady {
-				ssproxy.SetReady(readyNodes)
-			} else if readyNodes != ssproxy.Spec.Replicas {
-				ssproxy.UpdateReadyNodes(readyNodes)
+			if rt.Status.Phase != v1alpha1.StatusReady {
+				rt.SetReady(readyNodes)
+			} else if readyNodes != rt.Spec.Replicas {
+				rt.UpdateReadyNodes(readyNodes)
 			}
 		}
 	} else {
 		// TODO: Waiting for pods to start exceeds the maximum number of retries
-		ssproxy.SetPodNotStarted(readyNodes)
+		rt.SetPodNotStarted(readyNodes)
 		result.RequeueAfter = WaitingForReady
 	}
 
 	// TODO: Compare Status with or without modification
-	if err := r.Status().Update(ctx, ssproxy); err != nil {
+	if err := r.Status().Update(ctx, rt); err != nil {
 		return result, err
 	}
 
diff --git a/shardingsphere-operator/pkg/reconcile/deployment.go b/shardingsphere-operator/pkg/reconcile/deployment.go
index fb1bb0f..9734763 100644
--- a/shardingsphere-operator/pkg/reconcile/deployment.go
+++ b/shardingsphere-operator/pkg/reconcile/deployment.go
@@ -19,7 +19,6 @@ package reconcile
 
 import (
 	"fmt"
-	"html/template"
 	"reflect"
 	"strconv"
 	"strings"
@@ -130,8 +129,13 @@ func processOptionalParameter(proxy *v1alpha1.ShardingSphereProxy, dp *v1.Deploy
 	return dp
 }
 
-func addInitContainer(dp *v1.Deployment, mysql *v1alpha1.MySQLDriver) {
+const script = `wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/${VERSION}/mysql-connector-java-${VERSION}.jar;
+wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/${VERSION}/mysql-connector-java-${VERSION}.jar.md5;
+if [ $(md5sum /mysql-connector-java-${VERSION}.jar | cut -d ' ' -f1) = $(cat /mysql-connector-java-${VERSION}.jar.md5) ];
+then echo success;
+else echo failed;exit 1;fi;mv /mysql-connector-java-${VERSION}.jar /opt/shardingsphere-proxy/ext-lib`
 
+func addInitContainer(dp *v1.Deployment, mysql *v1alpha1.MySQLDriver) {
 	if len(dp.Spec.Template.Spec.InitContainers) == 0 {
 		dp.Spec.Template.Spec.Containers[0].VolumeMounts = append(dp.Spec.Template.Spec.Containers[0].VolumeMounts, corev1.VolumeMount{
 			Name:      "mysql-connect-jar",
@@ -146,18 +150,17 @@ func addInitContainer(dp *v1.Deployment, mysql *v1alpha1.MySQLDriver) {
 		})
 	}
 
-	scriptStr := strings.Builder{}
-	t1, _ := template.New("shell").Parse(`wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/{{ .Version }}/mysql-connector-java-{{ .Version }}.jar;
-wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/{{ .Version }}/mysql-connector-java-{{ .Version }}.jar.md5;
-if [ $(md5sum /mysql-connector-java-{{ .Version }}.jar | cut -d ' ' -f1) = $(cat /mysql-connector-java-{{ .Version }}.jar.md5) ];
-then echo success;
-else echo failed;exit 1;fi;mv /mysql-connector-java-{{ .Version }}.jar /opt/shardingsphere-proxy/ext-lib`)
-	_ = t1.Execute(&scriptStr, mysql)
 	dp.Spec.Template.Spec.InitContainers = []corev1.Container{
 		{
 			Name:    "download-mysql-connect",
 			Image:   "busybox:1.35.0",
-			Command: []string{"/bin/sh", "-c", scriptStr.String()},
+			Command: []string{"/bin/sh", "-c", script},
+			Env: []corev1.EnvVar{
+				{
+					Name:  "VERSION",
+					Value: mysql.Version,
+				},
+			},
 			VolumeMounts: []corev1.VolumeMount{
 				{
 					Name:      "mysql-connect-jar",
@@ -171,25 +174,108 @@ else echo failed;exit 1;fi;mv /mysql-connector-java-{{ .Version }}.jar /opt/shar
 
 // UpdateDeployment FIXME:merge UpdateDeployment and ConstructCascadingDeployment
 func UpdateDeployment(proxy *v1alpha1.ShardingSphereProxy, act *v1.Deployment) *v1.Deployment {
+	exp := act.DeepCopy()
+
 	if proxy.Spec.AutomaticScaling == nil || !proxy.Spec.AutomaticScaling.Enable {
-		act.Spec.Replicas = &proxy.Spec.Replicas
+		exp.Spec.Replicas = updateReplicas(proxy, act)
 	}
+	exp.Spec.Template = updatePodTemplateSpec(proxy, act)
+	return exp
+}
 
-	act.Spec.Template.Spec.Containers[0].Image = fmt.Sprintf("%s:%s", imageName, proxy.Spec.Version)
-	act.Spec.Template.Spec.Containers[0].Env[0].Value = strconv.FormatInt(int64(proxy.Spec.Port), 10)
-	act.Spec.Template.Spec.Containers[0].Ports[0].ContainerPort = proxy.Spec.Port
+func updateReplicas(proxy *v1alpha1.ShardingSphereProxy, act *v1.Deployment) *int32 {
+	if *act.Spec.Replicas != proxy.Spec.Replicas {
+		return &proxy.Spec.Replicas
+	}
+	return act.Spec.Replicas
+}
 
-	act.Spec.Template.Spec.Containers[0].Resources = proxy.Spec.Resources
-	act.Spec.Template.Spec.Containers[0].LivenessProbe = proxy.Spec.LivenessProbe
-	act.Spec.Template.Spec.Containers[0].ReadinessProbe = proxy.Spec.ReadinessProbe
-	act.Spec.Template.Spec.Containers[0].StartupProbe = proxy.Spec.StartupProbe
+func updatePodTemplateSpec(proxy *v1alpha1.ShardingSphereProxy, act *v1.Deployment) corev1.PodTemplateSpec {
+	exp := act.Spec.Template.DeepCopy()
 
-	act.Spec.Template.Spec.Volumes[0].ConfigMap.Name = proxy.Spec.ProxyConfigName
+	SSProxyContainer := updateSSProxyContainer(proxy, act)
+	for i, _ := range exp.Spec.Containers {
+		if exp.Spec.Containers[i].Name == "proxy" {
+			exp.Spec.Containers[i] = *SSProxyContainer
+		}
+	}
 
-	if proxy.Spec.MySQLDriver.Version != "" {
-		addInitContainer(act, proxy.Spec.MySQLDriver)
+	initContainer := updateInitContainer(proxy, act)
+	for i, _ := range exp.Spec.InitContainers {
+		if exp.Spec.InitContainers[i].Name == "download-mysql-connect" {
+			exp.Spec.InitContainers[i] = *initContainer
+		}
 	}
 
-	exp := act.DeepCopy()
+	configName := updateConfigName(proxy, act)
+	exp.Spec.Volumes[0].ConfigMap.Name = configName
+
+	return *exp
+}
+
+func updateConfigName(proxy *v1alpha1.ShardingSphereProxy, act *v1.Deployment) string {
+	if act.Spec.Template.Spec.Volumes[0].ConfigMap.Name != proxy.Spec.ProxyConfigName {
+		return proxy.Spec.ProxyConfigName
+	}
+	return act.Spec.Template.Spec.Volumes[0].ConfigMap.Name
+}
+
+func updateInitContainer(proxy *v1alpha1.ShardingSphereProxy, act *v1.Deployment) *corev1.Container {
+	var exp *corev1.Container
+
+	for _, c := range act.Spec.Template.Spec.InitContainers {
+		if c.Name == "download-mysql-connect" {
+			for i, _ := range c.Env {
+				if c.Env[i].Name == "VERSION" {
+					if c.Env[i].Value != proxy.Spec.MySQLDriver.Version {
+						c.Env[i].Value = proxy.Spec.MySQLDriver.Version
+					}
+				}
+			}
+			exp = c.DeepCopy()
+		}
+	}
+
+	return exp
+}
+
+func updateSSProxyContainer(proxy *v1alpha1.ShardingSphereProxy, act *v1.Deployment) *corev1.Container {
+	var exp *corev1.Container
+
+	for _, c := range act.Spec.Template.Spec.Containers {
+		if c.Name == "proxy" {
+			exp = c.DeepCopy()
+
+			tag := strings.Split(c.Image, ":")[1]
+			if tag != proxy.Spec.Version {
+				exp.Image = fmt.Sprintf("%s:%s", imageName, proxy.Spec.Version)
+			}
+
+			exp.Resources = proxy.Spec.Resources
+
+			if proxy.Spec.LivenessProbe != nil && !reflect.DeepEqual(c.LivenessProbe, *proxy.Spec.LivenessProbe) {
+				exp.LivenessProbe = proxy.Spec.LivenessProbe
+			}
+
+			if proxy.Spec.ReadinessProbe != nil && !reflect.DeepEqual(c.ReadinessProbe, *proxy.Spec.ReadinessProbe) {
+				exp.ReadinessProbe = proxy.Spec.ReadinessProbe
+			}
+
+			if proxy.Spec.StartupProbe != nil && !reflect.DeepEqual(c.StartupProbe, *proxy.Spec.StartupProbe) {
+				exp.StartupProbe = proxy.Spec.StartupProbe
+			}
+
+			for i, _ := range c.Env {
+				if c.Env[i].Name == "PORT" {
+					proxyPort := strconv.FormatInt(int64(proxy.Spec.Port), 10)
+					if c.Env[i].Value != proxyPort {
+						c.Env[i].Value = proxyPort
+						exp.Ports[0].ContainerPort = proxy.Spec.Port
+					}
+				}
+			}
+			exp.Env = c.Env
+		}
+	}
 	return exp
 }
diff --git a/shardingsphere-operator/pkg/reconcile/reconcile_test.go b/shardingsphere-operator/pkg/reconcile/reconcile_test.go
index 9a12ef4..d7580c1 100644
--- a/shardingsphere-operator/pkg/reconcile/reconcile_test.go
+++ b/shardingsphere-operator/pkg/reconcile/reconcile_test.go
@@ -860,15 +860,15 @@ func Test_UpdateDeployment(t *testing.T) {
 	}
 
 	for _, c := range cases {
-		UpdateDeployment(c.proxy, c.deploy)
-		assert.Equal(t, fmt.Sprintf("%s:%s", imageName, c.proxy.Spec.Version), c.deploy.Spec.Template.Spec.Containers[0].Image, c.message)
-		assert.Equal(t, c.proxy.Spec.Replicas, *c.deploy.Spec.Replicas, c.message)
-		assert.Equal(t, c.proxy.Spec.ProxyConfigName, c.deploy.Spec.Template.Spec.Volumes[0].ConfigMap.Name, c.message)
-		assert.Equal(t, c.proxy.Spec.Port, c.deploy.Spec.Template.Spec.Containers[0].Ports[0].ContainerPort, c.message)
-		assert.EqualValues(t, c.proxy.Spec.Resources, c.deploy.Spec.Template.Spec.Containers[0].Resources, c.message)
-		assert.EqualValues(t, c.proxy.Spec.LivenessProbe, c.deploy.Spec.Template.Spec.Containers[0].LivenessProbe, c.message)
-		assert.EqualValues(t, c.proxy.Spec.ReadinessProbe, c.deploy.Spec.Template.Spec.Containers[0].ReadinessProbe, c.message)
-		assert.EqualValues(t, c.proxy.Spec.StartupProbe, c.deploy.Spec.Template.Spec.Containers[0].StartupProbe, c.message)
+		exp := UpdateDeployment(c.proxy, c.deploy)
+		assert.Equal(t, fmt.Sprintf("%s:%s", imageName, c.proxy.Spec.Version), exp.Spec.Template.Spec.Containers[0].Image, c.message)
+		assert.Equal(t, c.proxy.Spec.Replicas, *exp.Spec.Replicas, c.message)
+		assert.Equal(t, c.proxy.Spec.ProxyConfigName, exp.Spec.Template.Spec.Volumes[0].ConfigMap.Name, c.message)
+		assert.Equal(t, c.proxy.Spec.Port, exp.Spec.Template.Spec.Containers[0].Ports[0].ContainerPort, c.message)
+		assert.EqualValues(t, c.proxy.Spec.Resources, exp.Spec.Template.Spec.Containers[0].Resources, c.message)
+		assert.EqualValues(t, c.proxy.Spec.LivenessProbe, exp.Spec.Template.Spec.Containers[0].LivenessProbe, c.message)
+		assert.EqualValues(t, c.proxy.Spec.ReadinessProbe, exp.Spec.Template.Spec.Containers[0].ReadinessProbe, c.message)
+		assert.EqualValues(t, c.proxy.Spec.StartupProbe, exp.Spec.Template.Spec.Containers[0].StartupProbe, c.message)
 	}
 }
 
diff --git a/shardingsphere-operator/pkg/reconcile/service.go b/shardingsphere-operator/pkg/reconcile/service.go
index 3fdf693..65cefc2 100644
--- a/shardingsphere-operator/pkg/reconcile/service.go
+++ b/shardingsphere-operator/pkg/reconcile/service.go
@@ -62,11 +62,14 @@ func ConstructCascadingService(proxy *v1alpha1.ShardingSphereProxy) *v1.Service
 	return &svc
 }
 
-func UpdateService(proxy *v1alpha1.ShardingSphereProxy, runtimeService *v1.Service) {
+func UpdateService(proxy *v1alpha1.ShardingSphereProxy, runtimeService *v1.Service) *v1.Service {
+	exp := &v1.Service{}
 	runtimeService.Spec.Type = proxy.Spec.ServiceType.Type
 	runtimeService.Spec.Ports[0].Port = proxy.Spec.Port
 	runtimeService.Spec.Ports[0].TargetPort = fromInt32(proxy.Spec.Port)
 	if proxy.Spec.ServiceType.NodePort != 0 {
 		runtimeService.Spec.Ports[0].NodePort = proxy.Spec.ServiceType.NodePort
 	}
+	exp = runtimeService.DeepCopy()
+	return exp
 }