You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ke...@apache.org on 2022/07/05 05:05:05 UTC

[dolphinscheduler-operator] 27/30: fix(all): merge the version of hpa and update the type of deleteResource

This is an automated email from the ASF dual-hosted git repository.

kezhenxu94 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler-operator.git

commit 2bcaee13795c166b3403ad8ba6b30dd2fc226725
Author: nobolity <no...@gmail.com>
AuthorDate: Tue Jun 14 14:27:56 2022 +0800

    fix(all): merge the version of hpa and  update the type of deleteResource
---
 config/manager/kustomization.yaml        |  4 +--
 config/rbac/role.yaml                    | 22 ++++++++++++
 config/samples/ds_v1alpha1_dsapi.yaml    |  1 +
 config/samples/ds_v1alpha1_dsmaster.yaml |  8 ++---
 controllers/dsalert_controller.go        |  3 +-
 controllers/dsapi_controller.go          |  3 +-
 controllers/dsmaster_controller.go       | 59 ++++++++++++++++----------------
 controllers/dsworker_controller.go       | 25 +++++++++++---
 controllers/master_reconcile.go          | 34 +++++++++---------
 main.go                                  |  8 +++--
 10 files changed, 106 insertions(+), 61 deletions(-)

diff --git a/config/manager/kustomization.yaml b/config/manager/kustomization.yaml
index dd1052d..c2c4746 100644
--- a/config/manager/kustomization.yaml
+++ b/config/manager/kustomization.yaml
@@ -12,5 +12,5 @@ apiVersion: kustomize.config.k8s.io/v1beta1
 kind: Kustomization
 images:
 - name: controller
-  newName: kezhenxu94/controller
-  newTag: latest
+  newName: nobolity/ds-operator
+  newTag: v1alpha1
diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml
index b8bf865..35f12b4 100644
--- a/config/rbac/role.yaml
+++ b/config/rbac/role.yaml
@@ -5,6 +5,16 @@ metadata:
   creationTimestamp: null
   name: manager-role
 rules:
+- apiGroups:
+  - ""
+  resources:
+  - persistentvolumeclaims
+  verbs:
+  - create
+  - delete
+  - get
+  - list
+  - watch
 - apiGroups:
   - ""
   resources:
@@ -41,6 +51,18 @@ rules:
   - patch
   - update
   - watch
+- apiGroups:
+  - autoscaling
+  resources:
+  - horizontalpodautoscalers
+  verbs:
+  - create
+  - delete
+  - get
+  - list
+  - patch
+  - update
+  - watch
 - apiGroups:
   - ds.apache.dolphinscheduler.dev
   resources:
diff --git a/config/samples/ds_v1alpha1_dsapi.yaml b/config/samples/ds_v1alpha1_dsapi.yaml
index 809c853..fef6f69 100644
--- a/config/samples/ds_v1alpha1_dsapi.yaml
+++ b/config/samples/ds_v1alpha1_dsapi.yaml
@@ -10,6 +10,7 @@ spec:
   version: 3.0.0-alpha
   zookeeper_connect: "172.17.0.5:2181"
   repository: apache/dolphinscheduler-api
+  node_port: 30002
   datasource:
     drive_name: "org.postgresql.Driver"
     url: "jdbc:postgresql://172.17.0.4:5432/dolphinscheduler"
diff --git a/config/samples/ds_v1alpha1_dsmaster.yaml b/config/samples/ds_v1alpha1_dsmaster.yaml
index 8d0bbae..de71b27 100644
--- a/config/samples/ds_v1alpha1_dsmaster.yaml
+++ b/config/samples/ds_v1alpha1_dsmaster.yaml
@@ -15,9 +15,9 @@ spec:
     url: "jdbc:postgresql://172.17.0.4:5432/dolphinscheduler"
     username: "postgresadmin"
     password: "admin12345"
-  hpa:
-    min_replicas: 1
-    max_replicas: 5
-    mem_average_utilization: 85
+#  hpa:
+#    min_replicas: 1
+#    max_replicas: 5
+#    mem_average_utilization: 85
 
 
diff --git a/controllers/dsalert_controller.go b/controllers/dsalert_controller.go
index f3179a0..4a57bdc 100644
--- a/controllers/dsalert_controller.go
+++ b/controllers/dsalert_controller.go
@@ -150,6 +150,7 @@ func (r *DSAlertReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
 
 // SetupWithManager sets up the controller with the Manager.
 func (r *DSAlertReconciler) SetupWithManager(mgr ctrl.Manager) error {
+	r.Recorder = mgr.GetEventRecorderFor("alert-controller")
 	return ctrl.NewControllerManagedBy(mgr).
 		For(&dsv1alpha1.DSAlert{}).
 		Owns(&v1.Deployment{}).
@@ -158,7 +159,7 @@ func (r *DSAlertReconciler) SetupWithManager(mgr ctrl.Manager) error {
 }
 
 func (r *DSAlertReconciler) ensureDSAlertDeleted(ctx context.Context, DSAlert *dsv1alpha1.DSAlert) error {
-	if err := r.Client.Delete(ctx, DSAlert, client.PropagationPolicy(metav1.DeletePropagationOrphan)); err != nil {
+	if err := r.Client.Delete(ctx, DSAlert, client.PropagationPolicy(metav1.DeletePropagationBackground)); err != nil {
 		return err
 	}
 	return nil
diff --git a/controllers/dsapi_controller.go b/controllers/dsapi_controller.go
index 3612326..939ebd1 100644
--- a/controllers/dsapi_controller.go
+++ b/controllers/dsapi_controller.go
@@ -148,6 +148,7 @@ func (r *DSApiReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl
 
 // SetupWithManager sets up the controller with the Manager.
 func (r *DSApiReconciler) SetupWithManager(mgr ctrl.Manager) error {
+	r.Recorder = mgr.GetEventRecorderFor("api-controller")
 	return ctrl.NewControllerManagedBy(mgr).
 		For(&dsv1alpha1.DSApi{}).
 		Owns(&v1.Deployment{}).
@@ -156,7 +157,7 @@ func (r *DSApiReconciler) SetupWithManager(mgr ctrl.Manager) error {
 }
 
 func (r *DSApiReconciler) ensureDSApiDeleted(ctx context.Context, DSApi *dsv1alpha1.DSApi) error {
-	if err := r.Client.Delete(ctx, DSApi, client.PropagationPolicy(metav1.DeletePropagationOrphan)); err != nil {
+	if err := r.Client.Delete(ctx, DSApi, client.PropagationPolicy(metav1.DeletePropagationBackground)); err != nil {
 		return err
 	}
 	return nil
diff --git a/controllers/dsmaster_controller.go b/controllers/dsmaster_controller.go
index 01ca5b7..c5013fa 100644
--- a/controllers/dsmaster_controller.go
+++ b/controllers/dsmaster_controller.go
@@ -18,12 +18,11 @@ package controllers
 
 import (
 	"context"
-	"sync"
+	"k8s.io/api/autoscaling/v2beta2"
 	"time"
 
-	v2 "k8s.io/api/autoscaling/v2"
+	dsv1alpha1 "dolphinscheduler-operator/api/v1alpha1"
 	corev1 "k8s.io/api/core/v1"
-	"k8s.io/apimachinery/pkg/api/errors"
 	apierrors "k8s.io/apimachinery/pkg/api/errors"
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 	"k8s.io/apimachinery/pkg/labels"
@@ -36,10 +35,6 @@ import (
 	"sigs.k8s.io/controller-runtime/pkg/client"
 	"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
 	"sigs.k8s.io/controller-runtime/pkg/event"
-	"sigs.k8s.io/controller-runtime/pkg/handler"
-	"sigs.k8s.io/controller-runtime/pkg/source"
-
-	dsv1alpha1 "dolphinscheduler-operator/api/v1alpha1"
 )
 
 var (
@@ -51,18 +46,14 @@ type DSMasterReconciler struct {
 	client.Client
 	Scheme   *runtime.Scheme
 	recorder record.EventRecorder
-	clusters sync.Map
-	resyncCh chan event.GenericEvent
 }
 
 //+kubebuilder:rbac:groups=ds.apache.dolphinscheduler.dev,resources=dsmasters,verbs=get;list;watch;create;update;patch;delete
 //+kubebuilder:rbac:groups=ds.apache.dolphinscheduler.dev,resources=dsmasters/status,verbs=get;update;patch
 //+kubebuilder:rbac:groups=ds.apache.dolphinscheduler.dev,resources=dsmasters/finalizers,verbs=update
-// +kubebuilder:rbac:groups=autoscaling,resources=horizontalpodautoscalers,verbs=get;list;watch;create;update;patch;delete
-// +kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete
-// +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch;create;update;patch;delete
-//+kubebuilder:rbac:groups=core,resources=persistentvolumeclaims,verbs=get;create;delete;list;watch
+//+kubebuilder:rbac:groups=autoscaling,resources=horizontalpodautoscalers,verbs=get;list;watch;create;update;patch;delete
 //+kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete
+//+kubebuilder:rbac:groups="",resources=persistentvolumeclaims,verbs=get;create;delete;list;watch
 //+kubebuilder:rbac:groups="",resources=pods,verbs=get;list;watch;create;update;patch;delete
 //+kubebuilder:rbac:groups="",resources=services,verbs=get;list;watch;create;update;patch;delete
 
@@ -81,11 +72,7 @@ func (r *DSMasterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
 	cluster := &dsv1alpha1.DSMaster{}
 
 	if err := r.Client.Get(ctx, req.NamespacedName, cluster); err != nil {
-		if errors.IsNotFound(err) {
-			r.recorder.Event(cluster, corev1.EventTypeWarning, "dsMaster is not Found", "dsMaster is not Found")
-			return ctrl.Result{}, nil
-		}
-		return ctrl.Result{}, err
+		return ctrl.Result{}, client.IgnoreNotFound(err)
 	}
 	desired := cluster.DeepCopy()
 
@@ -125,9 +112,14 @@ func (r *DSMasterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
 		masterLogger.Info("ds-master control has been paused: ", "ds-master-name", cluster.Name)
 		desired.Status.ControlPaused = true
 		if err := r.Status().Patch(ctx, desired, client.MergeFrom(cluster)); err != nil {
-			return ctrl.Result{}, err
+			if apierrors.IsConflict(err) {
+				return ctrl.Result{Requeue: true}, nil
+			} else {
+				masterLogger.Error(err, "unexpected error when master update status in paused")
+				return ctrl.Result{}, err
+			}
 		}
-		r.recorder.Event(cluster, corev1.EventTypeNormal, "the spec status is paused", "do nothing")
+		r.recorder.Event(cluster, corev1.EventTypeNormal, "the master spec status is paused", "do nothing")
 		return ctrl.Result{}, nil
 	}
 
@@ -135,8 +127,15 @@ func (r *DSMasterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
 	if cluster.Status.Phase == dsv1alpha1.DsPhaseNone {
 		desired.Status.Phase = dsv1alpha1.DsPhaseCreating
 		masterLogger.Info("phase had been changed from  none ---> creating")
-		err := r.Client.Status().Patch(ctx, desired, client.MergeFrom(cluster))
-		return ctrl.Result{RequeueAfter: 100 * time.Millisecond}, err
+		if err := r.Client.Status().Patch(ctx, desired, client.MergeFrom(cluster)); err != nil {
+
+			if apierrors.IsConflict(err) {
+				return ctrl.Result{RequeueAfter: 100 * time.Millisecond}, err
+			} else {
+				masterLogger.Error(err, "unexpected error when master update status in creating")
+				return ctrl.Result{}, err
+			}
+		}
 	}
 
 	//2 ensure the headless service
@@ -173,7 +172,12 @@ func (r *DSMasterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
 
 	desired.Status.Phase = dsv1alpha1.DsPhaseFinished
 	if err := r.Status().Patch(ctx, desired, client.MergeFrom(cluster)); err != nil {
-		return ctrl.Result{}, err
+		if apierrors.IsConflict(err) {
+			return ctrl.Result{Requeue: true}, nil
+		} else {
+			masterLogger.Error(err, "unexpected error when master update status in finished")
+			return ctrl.Result{}, err
+		}
 	}
 
 	masterLogger.Info("******************************************************")
@@ -186,8 +190,6 @@ func (r *DSMasterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
 
 // SetupWithManager sets up the controller with the Manager.
 func (r *DSMasterReconciler) SetupWithManager(mgr ctrl.Manager) error {
-	r.clusters = sync.Map{}
-	r.resyncCh = make(chan event.GenericEvent)
 	r.recorder = mgr.GetEventRecorderFor("master-controller")
 
 	filter := &Predicate{}
@@ -195,8 +197,7 @@ func (r *DSMasterReconciler) SetupWithManager(mgr ctrl.Manager) error {
 		For(&dsv1alpha1.DSMaster{}).
 		Owns(&corev1.Pod{}).
 		Owns(&corev1.Service{}).
-		Owns(&v2.HorizontalPodAutoscaler{}).
-		Watches(&source.Channel{Source: r.resyncCh}, &handler.EnqueueRequestForObject{}).
+		Owns(&v2beta2.HorizontalPodAutoscaler{}).
 		// or use WithEventFilter()
 		WithEventFilter(filter).
 		Complete(r)
@@ -407,7 +408,7 @@ func (r *DSMasterReconciler) predicateUpdate(member *Member, cluster *dsv1alpha1
 }
 
 func (r *DSMasterReconciler) ensureHPA(ctx context.Context, cluster *dsv1alpha1.DSMaster) error {
-	hpa := &v2.HorizontalPodAutoscaler{}
+	hpa := &v2beta2.HorizontalPodAutoscaler{}
 	namespacedName := types.NamespacedName{Namespace: cluster.Namespace, Name: dsv1alpha1.DsWorkerHpa}
 	if err := r.Client.Get(ctx, namespacedName, hpa); err != nil {
 		// Local cache not found
@@ -425,7 +426,7 @@ func (r *DSMasterReconciler) ensureHPA(ctx context.Context, cluster *dsv1alpha1.
 		}
 	}
 
-	if &hpa != nil && cluster.Spec.HpaPolicy == nil {
+	if hpa.Kind != "" && cluster.Spec.HpaPolicy == nil {
 		if err := r.deleteHPA(ctx, hpa); err != nil {
 			masterLogger.Info("delete hpa error")
 			return err
diff --git a/controllers/dsworker_controller.go b/controllers/dsworker_controller.go
index dc3ddeb..3d6dafb 100644
--- a/controllers/dsworker_controller.go
+++ b/controllers/dsworker_controller.go
@@ -49,6 +49,7 @@ var (
 //+kubebuilder:rbac:groups=ds.apache.dolphinscheduler.dev,resources=dsworkers/status,verbs=get;update;patch
 //+kubebuilder:rbac:groups=ds.apache.dolphinscheduler.dev,resources=dsworkers/finalizers,verbs=update
 //+kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete
+//+kubebuilder:rbac:groups="",resources=persistentvolumeclaims,verbs=get;create;delete;list;watch
 //+kubebuilder:rbac:groups="",resources=pods,verbs=get;list;watch;create;update;patch;delete
 //+kubebuilder:rbac:groups="",resources=services,verbs=get;list;watch;create;update;patch;delete
 
@@ -109,7 +110,12 @@ func (r *DSWorkerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
 		workerLogger.Info("ds-worker control has been paused: ", "ds-worker-name", cluster.Name)
 		desired.Status.ControlPaused = true
 		if err := r.Status().Patch(ctx, desired, client.MergeFrom(cluster)); err != nil {
-			return ctrl.Result{}, err
+			if apierrors.IsConflict(err) {
+				return ctrl.Result{Requeue: true}, nil
+			} else {
+				masterLogger.Error(err, "unexpected error when worker update status in paused")
+				return ctrl.Result{}, err
+			}
 		}
 		r.recorder.Event(cluster, corev1.EventTypeNormal, "the spec status is paused", "do nothing")
 		return ctrl.Result{}, nil
@@ -127,8 +133,14 @@ func (r *DSWorkerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
 		}
 		desired.Status.Phase = dsv1alpha1.DsPhaseCreating
 		workerLogger.Info("phase had been changed from  none ---> creating")
-		err := r.Client.Status().Patch(ctx, desired, client.MergeFrom(cluster))
-		return ctrl.Result{RequeueAfter: 100 * time.Millisecond}, err
+		if err := r.Client.Status().Patch(ctx, desired, client.MergeFrom(cluster)); err != nil {
+			if apierrors.IsConflict(err) {
+				return ctrl.Result{RequeueAfter: 100 * time.Millisecond}, err
+			} else {
+				masterLogger.Error(err, "unexpected error when worker update status in creating")
+				return ctrl.Result{}, err
+			}
+		}
 	}
 
 	// 3. Ensure bootstrapped, we will block here util cluster is up and healthy
@@ -151,7 +163,12 @@ func (r *DSWorkerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
 
 	desired.Status.Phase = dsv1alpha1.DsPhaseFinished
 	if err := r.Status().Patch(ctx, desired, client.MergeFrom(cluster)); err != nil {
-		return ctrl.Result{}, err
+		if apierrors.IsConflict(err) {
+			return ctrl.Result{Requeue: true}, nil
+		} else {
+			masterLogger.Error(err, "unexpected error when worker update status in finished")
+			return ctrl.Result{}, err
+		}
 	}
 
 	workerLogger.Info("******************************************************")
diff --git a/controllers/master_reconcile.go b/controllers/master_reconcile.go
index 7a3fb4d..f11a464 100644
--- a/controllers/master_reconcile.go
+++ b/controllers/master_reconcile.go
@@ -19,8 +19,8 @@ package controllers
 import (
 	"context"
 	dsv1alpha1 "dolphinscheduler-operator/api/v1alpha1"
+	"k8s.io/api/autoscaling/v2beta2"
 
-	v2 "k8s.io/api/autoscaling/v2"
 	_ "k8s.io/api/core/v1"
 	corev1 "k8s.io/api/core/v1"
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -106,7 +106,7 @@ func newDSMasterPod(cr *dsv1alpha1.DSMaster) *corev1.Pod {
 }
 
 func (r *DSMasterReconciler) ensureDSMasterDeleted(ctx context.Context, DSMaster *dsv1alpha1.DSMaster) error {
-	if err := r.Client.Delete(ctx, DSMaster, client.PropagationPolicy(metav1.DeletePropagationOrphan)); err != nil {
+	if err := r.Client.Delete(ctx, DSMaster, client.PropagationPolicy(metav1.DeletePropagationBackground)); err != nil {
 		return err
 	}
 	return nil
@@ -140,15 +140,15 @@ func createMasterService(cluster *dsv1alpha1.DSMaster) *corev1.Service {
 	return &service
 }
 
-func (r *DSMasterReconciler) createHPA(cluster *dsv1alpha1.DSMaster) *v2.HorizontalPodAutoscaler {
-	hpa := v2.HorizontalPodAutoscaler{
+func (r *DSMasterReconciler) createHPA(cluster *dsv1alpha1.DSMaster) *v2beta2.HorizontalPodAutoscaler {
+	hpa := v2beta2.HorizontalPodAutoscaler{
 		ObjectMeta: metav1.ObjectMeta{
 			Name:            dsv1alpha1.DsWorkerHpa,
 			Namespace:       cluster.Namespace,
 			ResourceVersion: dsv1alpha1.DSVersion,
 		},
-		Spec: v2.HorizontalPodAutoscalerSpec{
-			ScaleTargetRef: v2.CrossVersionObjectReference{
+		Spec: v2beta2.HorizontalPodAutoscalerSpec{
+			ScaleTargetRef: v2beta2.CrossVersionObjectReference{
 				Kind:       dsv1alpha1.DsWorkerKind,
 				Name:       dsv1alpha1.DsWorkerLabel,
 				APIVersion: dsv1alpha1.APIVersion,
@@ -159,12 +159,12 @@ func (r *DSMasterReconciler) createHPA(cluster *dsv1alpha1.DSMaster) *v2.Horizon
 	}
 
 	if cluster.Spec.HpaPolicy.CPUAverageUtilization > 0 {
-		hpa.Spec.Metrics = append(hpa.Spec.Metrics, v2.MetricSpec{
-			Type: v2.ResourceMetricSourceType,
-			Resource: &v2.ResourceMetricSource{
+		hpa.Spec.Metrics = append(hpa.Spec.Metrics, v2beta2.MetricSpec{
+			Type: v2beta2.ResourceMetricSourceType,
+			Resource: &v2beta2.ResourceMetricSource{
 				Name: corev1.ResourceCPU,
-				Target: v2.MetricTarget{
-					Type:               v2.UtilizationMetricType,
+				Target: v2beta2.MetricTarget{
+					Type:               v2beta2.UtilizationMetricType,
 					AverageUtilization: &cluster.Spec.HpaPolicy.CPUAverageUtilization,
 				},
 			},
@@ -172,12 +172,12 @@ func (r *DSMasterReconciler) createHPA(cluster *dsv1alpha1.DSMaster) *v2.Horizon
 	}
 
 	if cluster.Spec.HpaPolicy.MEMAverageUtilization > 0 {
-		hpa.Spec.Metrics = append(hpa.Spec.Metrics, v2.MetricSpec{
-			Type: v2.ResourceMetricSourceType,
-			Resource: &v2.ResourceMetricSource{
+		hpa.Spec.Metrics = append(hpa.Spec.Metrics, v2beta2.MetricSpec{
+			Type: v2beta2.ResourceMetricSourceType,
+			Resource: &v2beta2.ResourceMetricSource{
 				Name: corev1.ResourceMemory,
-				Target: v2.MetricTarget{
-					Type:               v2.UtilizationMetricType,
+				Target: v2beta2.MetricTarget{
+					Type:               v2beta2.UtilizationMetricType,
 					AverageUtilization: &cluster.Spec.HpaPolicy.MEMAverageUtilization,
 				},
 			},
@@ -187,7 +187,7 @@ func (r *DSMasterReconciler) createHPA(cluster *dsv1alpha1.DSMaster) *v2.Horizon
 	return &hpa
 }
 
-func (r *DSMasterReconciler) deleteHPA(ctx context.Context, hpa *v2.HorizontalPodAutoscaler) error {
+func (r *DSMasterReconciler) deleteHPA(ctx context.Context, hpa *v2beta2.HorizontalPodAutoscaler) error {
 	if err := r.Client.Delete(ctx, hpa, client.PropagationPolicy(metav1.DeletePropagationOrphan)); err != nil {
 		return err
 	}
diff --git a/main.go b/main.go
index 1661e0c..d7f2398 100644
--- a/main.go
+++ b/main.go
@@ -106,9 +106,11 @@ func main() {
 		setupLog.Error(err, "unable to create controller", "controller", "DSApi")
 		os.Exit(1)
 	}
-	if err = (&dsv1alpha1.DSMaster{}).SetupWebhookWithManager(mgr); err != nil {
-		setupLog.Error(err, "unable to create webhook", "webhook", "DSMaster")
-		os.Exit(1)
+	if os.Getenv("ENABLE_WEBHOOKS") != "false" {
+		if err = (&dsv1alpha1.DSMaster{}).SetupWebhookWithManager(mgr); err != nil {
+			setupLog.Error(err, "unable to create webhook", "webhook", "DSMaster")
+			os.Exit(1)
+		}
 	}
 	//+kubebuilder:scaffold:builder