You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ke...@apache.org on 2022/07/05 05:05:05 UTC
[dolphinscheduler-operator] 27/30: fix(all): merge the version of hpa and update the type of deleteResource
This is an automated email from the ASF dual-hosted git repository.
kezhenxu94 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler-operator.git
commit 2bcaee13795c166b3403ad8ba6b30dd2fc226725
Author: nobolity <no...@gmail.com>
AuthorDate: Tue Jun 14 14:27:56 2022 +0800
fix(all): merge the version of hpa and update the type of deleteResource
---
config/manager/kustomization.yaml | 4 +--
config/rbac/role.yaml | 22 ++++++++++++
config/samples/ds_v1alpha1_dsapi.yaml | 1 +
config/samples/ds_v1alpha1_dsmaster.yaml | 8 ++---
controllers/dsalert_controller.go | 3 +-
controllers/dsapi_controller.go | 3 +-
controllers/dsmaster_controller.go | 59 ++++++++++++++++----------------
controllers/dsworker_controller.go | 25 +++++++++++---
controllers/master_reconcile.go | 34 +++++++++---------
main.go | 8 +++--
10 files changed, 106 insertions(+), 61 deletions(-)
diff --git a/config/manager/kustomization.yaml b/config/manager/kustomization.yaml
index dd1052d..c2c4746 100644
--- a/config/manager/kustomization.yaml
+++ b/config/manager/kustomization.yaml
@@ -12,5 +12,5 @@ apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
images:
- name: controller
- newName: kezhenxu94/controller
- newTag: latest
+ newName: nobolity/ds-operator
+ newTag: v1alpha1
diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml
index b8bf865..35f12b4 100644
--- a/config/rbac/role.yaml
+++ b/config/rbac/role.yaml
@@ -5,6 +5,16 @@ metadata:
creationTimestamp: null
name: manager-role
rules:
+- apiGroups:
+ - ""
+ resources:
+ - persistentvolumeclaims
+ verbs:
+ - create
+ - delete
+ - get
+ - list
+ - watch
- apiGroups:
- ""
resources:
@@ -41,6 +51,18 @@ rules:
- patch
- update
- watch
+- apiGroups:
+ - autoscaling
+ resources:
+ - horizontalpodautoscalers
+ verbs:
+ - create
+ - delete
+ - get
+ - list
+ - patch
+ - update
+ - watch
- apiGroups:
- ds.apache.dolphinscheduler.dev
resources:
diff --git a/config/samples/ds_v1alpha1_dsapi.yaml b/config/samples/ds_v1alpha1_dsapi.yaml
index 809c853..fef6f69 100644
--- a/config/samples/ds_v1alpha1_dsapi.yaml
+++ b/config/samples/ds_v1alpha1_dsapi.yaml
@@ -10,6 +10,7 @@ spec:
version: 3.0.0-alpha
zookeeper_connect: "172.17.0.5:2181"
repository: apache/dolphinscheduler-api
+ node_port: 30002
datasource:
drive_name: "org.postgresql.Driver"
url: "jdbc:postgresql://172.17.0.4:5432/dolphinscheduler"
diff --git a/config/samples/ds_v1alpha1_dsmaster.yaml b/config/samples/ds_v1alpha1_dsmaster.yaml
index 8d0bbae..de71b27 100644
--- a/config/samples/ds_v1alpha1_dsmaster.yaml
+++ b/config/samples/ds_v1alpha1_dsmaster.yaml
@@ -15,9 +15,9 @@ spec:
url: "jdbc:postgresql://172.17.0.4:5432/dolphinscheduler"
username: "postgresadmin"
password: "admin12345"
- hpa:
- min_replicas: 1
- max_replicas: 5
- mem_average_utilization: 85
+# hpa:
+# min_replicas: 1
+# max_replicas: 5
+# mem_average_utilization: 85
diff --git a/controllers/dsalert_controller.go b/controllers/dsalert_controller.go
index f3179a0..4a57bdc 100644
--- a/controllers/dsalert_controller.go
+++ b/controllers/dsalert_controller.go
@@ -150,6 +150,7 @@ func (r *DSAlertReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
// SetupWithManager sets up the controller with the Manager.
func (r *DSAlertReconciler) SetupWithManager(mgr ctrl.Manager) error {
+ r.Recorder = mgr.GetEventRecorderFor("alert-controller")
return ctrl.NewControllerManagedBy(mgr).
For(&dsv1alpha1.DSAlert{}).
Owns(&v1.Deployment{}).
@@ -158,7 +159,7 @@ func (r *DSAlertReconciler) SetupWithManager(mgr ctrl.Manager) error {
}
func (r *DSAlertReconciler) ensureDSAlertDeleted(ctx context.Context, DSAlert *dsv1alpha1.DSAlert) error {
- if err := r.Client.Delete(ctx, DSAlert, client.PropagationPolicy(metav1.DeletePropagationOrphan)); err != nil {
+ if err := r.Client.Delete(ctx, DSAlert, client.PropagationPolicy(metav1.DeletePropagationBackground)); err != nil {
return err
}
return nil
diff --git a/controllers/dsapi_controller.go b/controllers/dsapi_controller.go
index 3612326..939ebd1 100644
--- a/controllers/dsapi_controller.go
+++ b/controllers/dsapi_controller.go
@@ -148,6 +148,7 @@ func (r *DSApiReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl
// SetupWithManager sets up the controller with the Manager.
func (r *DSApiReconciler) SetupWithManager(mgr ctrl.Manager) error {
+ r.Recorder = mgr.GetEventRecorderFor("api-controller")
return ctrl.NewControllerManagedBy(mgr).
For(&dsv1alpha1.DSApi{}).
Owns(&v1.Deployment{}).
@@ -156,7 +157,7 @@ func (r *DSApiReconciler) SetupWithManager(mgr ctrl.Manager) error {
}
func (r *DSApiReconciler) ensureDSApiDeleted(ctx context.Context, DSApi *dsv1alpha1.DSApi) error {
- if err := r.Client.Delete(ctx, DSApi, client.PropagationPolicy(metav1.DeletePropagationOrphan)); err != nil {
+ if err := r.Client.Delete(ctx, DSApi, client.PropagationPolicy(metav1.DeletePropagationBackground)); err != nil {
return err
}
return nil
diff --git a/controllers/dsmaster_controller.go b/controllers/dsmaster_controller.go
index 01ca5b7..c5013fa 100644
--- a/controllers/dsmaster_controller.go
+++ b/controllers/dsmaster_controller.go
@@ -18,12 +18,11 @@ package controllers
import (
"context"
- "sync"
+ "k8s.io/api/autoscaling/v2beta2"
"time"
- v2 "k8s.io/api/autoscaling/v2"
+ dsv1alpha1 "dolphinscheduler-operator/api/v1alpha1"
corev1 "k8s.io/api/core/v1"
- "k8s.io/apimachinery/pkg/api/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
@@ -36,10 +35,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
- "sigs.k8s.io/controller-runtime/pkg/handler"
- "sigs.k8s.io/controller-runtime/pkg/source"
-
- dsv1alpha1 "dolphinscheduler-operator/api/v1alpha1"
)
var (
@@ -51,18 +46,14 @@ type DSMasterReconciler struct {
client.Client
Scheme *runtime.Scheme
recorder record.EventRecorder
- clusters sync.Map
- resyncCh chan event.GenericEvent
}
//+kubebuilder:rbac:groups=ds.apache.dolphinscheduler.dev,resources=dsmasters,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=ds.apache.dolphinscheduler.dev,resources=dsmasters/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=ds.apache.dolphinscheduler.dev,resources=dsmasters/finalizers,verbs=update
-// +kubebuilder:rbac:groups=autoscaling,resources=horizontalpodautoscalers,verbs=get;list;watch;create;update;patch;delete
-// +kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete
-// +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch;create;update;patch;delete
-//+kubebuilder:rbac:groups=core,resources=persistentvolumeclaims,verbs=get;create;delete;list;watch
+//+kubebuilder:rbac:groups=autoscaling,resources=horizontalpodautoscalers,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete
+//+kubebuilder:rbac:groups="",resources=persistentvolumeclaims,verbs=get;create;delete;list;watch
//+kubebuilder:rbac:groups="",resources=pods,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups="",resources=services,verbs=get;list;watch;create;update;patch;delete
@@ -81,11 +72,7 @@ func (r *DSMasterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
cluster := &dsv1alpha1.DSMaster{}
if err := r.Client.Get(ctx, req.NamespacedName, cluster); err != nil {
- if errors.IsNotFound(err) {
- r.recorder.Event(cluster, corev1.EventTypeWarning, "dsMaster is not Found", "dsMaster is not Found")
- return ctrl.Result{}, nil
- }
- return ctrl.Result{}, err
+ return ctrl.Result{}, client.IgnoreNotFound(err)
}
desired := cluster.DeepCopy()
@@ -125,9 +112,14 @@ func (r *DSMasterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
masterLogger.Info("ds-master control has been paused: ", "ds-master-name", cluster.Name)
desired.Status.ControlPaused = true
if err := r.Status().Patch(ctx, desired, client.MergeFrom(cluster)); err != nil {
- return ctrl.Result{}, err
+ if apierrors.IsConflict(err) {
+ return ctrl.Result{Requeue: true}, nil
+ } else {
+ masterLogger.Error(err, "unexpected error when master update status in paused")
+ return ctrl.Result{}, err
+ }
}
- r.recorder.Event(cluster, corev1.EventTypeNormal, "the spec status is paused", "do nothing")
+ r.recorder.Event(cluster, corev1.EventTypeNormal, "the master spec status is paused", "do nothing")
return ctrl.Result{}, nil
}
@@ -135,8 +127,15 @@ func (r *DSMasterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
if cluster.Status.Phase == dsv1alpha1.DsPhaseNone {
desired.Status.Phase = dsv1alpha1.DsPhaseCreating
masterLogger.Info("phase had been changed from none ---> creating")
- err := r.Client.Status().Patch(ctx, desired, client.MergeFrom(cluster))
- return ctrl.Result{RequeueAfter: 100 * time.Millisecond}, err
+ if err := r.Client.Status().Patch(ctx, desired, client.MergeFrom(cluster)); err != nil {
+
+ if apierrors.IsConflict(err) {
+ return ctrl.Result{RequeueAfter: 100 * time.Millisecond}, err
+ } else {
+ masterLogger.Error(err, "unexpected error when master update status in creating")
+ return ctrl.Result{}, err
+ }
+ }
}
//2 ensure the headless service
@@ -173,7 +172,12 @@ func (r *DSMasterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
desired.Status.Phase = dsv1alpha1.DsPhaseFinished
if err := r.Status().Patch(ctx, desired, client.MergeFrom(cluster)); err != nil {
- return ctrl.Result{}, err
+ if apierrors.IsConflict(err) {
+ return ctrl.Result{Requeue: true}, nil
+ } else {
+ masterLogger.Error(err, "unexpected error when master update status in finished")
+ return ctrl.Result{}, err
+ }
}
masterLogger.Info("******************************************************")
@@ -186,8 +190,6 @@ func (r *DSMasterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
// SetupWithManager sets up the controller with the Manager.
func (r *DSMasterReconciler) SetupWithManager(mgr ctrl.Manager) error {
- r.clusters = sync.Map{}
- r.resyncCh = make(chan event.GenericEvent)
r.recorder = mgr.GetEventRecorderFor("master-controller")
filter := &Predicate{}
@@ -195,8 +197,7 @@ func (r *DSMasterReconciler) SetupWithManager(mgr ctrl.Manager) error {
For(&dsv1alpha1.DSMaster{}).
Owns(&corev1.Pod{}).
Owns(&corev1.Service{}).
- Owns(&v2.HorizontalPodAutoscaler{}).
- Watches(&source.Channel{Source: r.resyncCh}, &handler.EnqueueRequestForObject{}).
+ Owns(&v2beta2.HorizontalPodAutoscaler{}).
// or use WithEventFilter()
WithEventFilter(filter).
Complete(r)
@@ -407,7 +408,7 @@ func (r *DSMasterReconciler) predicateUpdate(member *Member, cluster *dsv1alpha1
}
func (r *DSMasterReconciler) ensureHPA(ctx context.Context, cluster *dsv1alpha1.DSMaster) error {
- hpa := &v2.HorizontalPodAutoscaler{}
+ hpa := &v2beta2.HorizontalPodAutoscaler{}
namespacedName := types.NamespacedName{Namespace: cluster.Namespace, Name: dsv1alpha1.DsWorkerHpa}
if err := r.Client.Get(ctx, namespacedName, hpa); err != nil {
// Local cache not found
@@ -425,7 +426,7 @@ func (r *DSMasterReconciler) ensureHPA(ctx context.Context, cluster *dsv1alpha1.
}
}
- if &hpa != nil && cluster.Spec.HpaPolicy == nil {
+ if hpa.Kind != "" && cluster.Spec.HpaPolicy == nil {
if err := r.deleteHPA(ctx, hpa); err != nil {
masterLogger.Info("delete hpa error")
return err
diff --git a/controllers/dsworker_controller.go b/controllers/dsworker_controller.go
index dc3ddeb..3d6dafb 100644
--- a/controllers/dsworker_controller.go
+++ b/controllers/dsworker_controller.go
@@ -49,6 +49,7 @@ var (
//+kubebuilder:rbac:groups=ds.apache.dolphinscheduler.dev,resources=dsworkers/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=ds.apache.dolphinscheduler.dev,resources=dsworkers/finalizers,verbs=update
//+kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete
+//+kubebuilder:rbac:groups="",resources=persistentvolumeclaims,verbs=get;create;delete;list;watch
//+kubebuilder:rbac:groups="",resources=pods,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups="",resources=services,verbs=get;list;watch;create;update;patch;delete
@@ -109,7 +110,12 @@ func (r *DSWorkerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
workerLogger.Info("ds-worker control has been paused: ", "ds-worker-name", cluster.Name)
desired.Status.ControlPaused = true
if err := r.Status().Patch(ctx, desired, client.MergeFrom(cluster)); err != nil {
- return ctrl.Result{}, err
+ if apierrors.IsConflict(err) {
+ return ctrl.Result{Requeue: true}, nil
+ } else {
+ masterLogger.Error(err, "unexpected error when worker update status in paused")
+ return ctrl.Result{}, err
+ }
}
r.recorder.Event(cluster, corev1.EventTypeNormal, "the spec status is paused", "do nothing")
return ctrl.Result{}, nil
@@ -127,8 +133,14 @@ func (r *DSWorkerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
}
desired.Status.Phase = dsv1alpha1.DsPhaseCreating
workerLogger.Info("phase had been changed from none ---> creating")
- err := r.Client.Status().Patch(ctx, desired, client.MergeFrom(cluster))
- return ctrl.Result{RequeueAfter: 100 * time.Millisecond}, err
+ if err := r.Client.Status().Patch(ctx, desired, client.MergeFrom(cluster)); err != nil {
+ if apierrors.IsConflict(err) {
+ return ctrl.Result{RequeueAfter: 100 * time.Millisecond}, err
+ } else {
+ masterLogger.Error(err, "unexpected error when worker update status in creating")
+ return ctrl.Result{}, err
+ }
+ }
}
// 3. Ensure bootstrapped, we will block here util cluster is up and healthy
@@ -151,7 +163,12 @@ func (r *DSWorkerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
desired.Status.Phase = dsv1alpha1.DsPhaseFinished
if err := r.Status().Patch(ctx, desired, client.MergeFrom(cluster)); err != nil {
- return ctrl.Result{}, err
+ if apierrors.IsConflict(err) {
+ return ctrl.Result{Requeue: true}, nil
+ } else {
+ masterLogger.Error(err, "unexpected error when worker update status in finished")
+ return ctrl.Result{}, err
+ }
}
workerLogger.Info("******************************************************")
diff --git a/controllers/master_reconcile.go b/controllers/master_reconcile.go
index 7a3fb4d..f11a464 100644
--- a/controllers/master_reconcile.go
+++ b/controllers/master_reconcile.go
@@ -19,8 +19,8 @@ package controllers
import (
"context"
dsv1alpha1 "dolphinscheduler-operator/api/v1alpha1"
+ "k8s.io/api/autoscaling/v2beta2"
- v2 "k8s.io/api/autoscaling/v2"
_ "k8s.io/api/core/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -106,7 +106,7 @@ func newDSMasterPod(cr *dsv1alpha1.DSMaster) *corev1.Pod {
}
func (r *DSMasterReconciler) ensureDSMasterDeleted(ctx context.Context, DSMaster *dsv1alpha1.DSMaster) error {
- if err := r.Client.Delete(ctx, DSMaster, client.PropagationPolicy(metav1.DeletePropagationOrphan)); err != nil {
+ if err := r.Client.Delete(ctx, DSMaster, client.PropagationPolicy(metav1.DeletePropagationBackground)); err != nil {
return err
}
return nil
@@ -140,15 +140,15 @@ func createMasterService(cluster *dsv1alpha1.DSMaster) *corev1.Service {
return &service
}
-func (r *DSMasterReconciler) createHPA(cluster *dsv1alpha1.DSMaster) *v2.HorizontalPodAutoscaler {
- hpa := v2.HorizontalPodAutoscaler{
+func (r *DSMasterReconciler) createHPA(cluster *dsv1alpha1.DSMaster) *v2beta2.HorizontalPodAutoscaler {
+ hpa := v2beta2.HorizontalPodAutoscaler{
ObjectMeta: metav1.ObjectMeta{
Name: dsv1alpha1.DsWorkerHpa,
Namespace: cluster.Namespace,
ResourceVersion: dsv1alpha1.DSVersion,
},
- Spec: v2.HorizontalPodAutoscalerSpec{
- ScaleTargetRef: v2.CrossVersionObjectReference{
+ Spec: v2beta2.HorizontalPodAutoscalerSpec{
+ ScaleTargetRef: v2beta2.CrossVersionObjectReference{
Kind: dsv1alpha1.DsWorkerKind,
Name: dsv1alpha1.DsWorkerLabel,
APIVersion: dsv1alpha1.APIVersion,
@@ -159,12 +159,12 @@ func (r *DSMasterReconciler) createHPA(cluster *dsv1alpha1.DSMaster) *v2.Horizon
}
if cluster.Spec.HpaPolicy.CPUAverageUtilization > 0 {
- hpa.Spec.Metrics = append(hpa.Spec.Metrics, v2.MetricSpec{
- Type: v2.ResourceMetricSourceType,
- Resource: &v2.ResourceMetricSource{
+ hpa.Spec.Metrics = append(hpa.Spec.Metrics, v2beta2.MetricSpec{
+ Type: v2beta2.ResourceMetricSourceType,
+ Resource: &v2beta2.ResourceMetricSource{
Name: corev1.ResourceCPU,
- Target: v2.MetricTarget{
- Type: v2.UtilizationMetricType,
+ Target: v2beta2.MetricTarget{
+ Type: v2beta2.UtilizationMetricType,
AverageUtilization: &cluster.Spec.HpaPolicy.CPUAverageUtilization,
},
},
@@ -172,12 +172,12 @@ func (r *DSMasterReconciler) createHPA(cluster *dsv1alpha1.DSMaster) *v2.Horizon
}
if cluster.Spec.HpaPolicy.MEMAverageUtilization > 0 {
- hpa.Spec.Metrics = append(hpa.Spec.Metrics, v2.MetricSpec{
- Type: v2.ResourceMetricSourceType,
- Resource: &v2.ResourceMetricSource{
+ hpa.Spec.Metrics = append(hpa.Spec.Metrics, v2beta2.MetricSpec{
+ Type: v2beta2.ResourceMetricSourceType,
+ Resource: &v2beta2.ResourceMetricSource{
Name: corev1.ResourceMemory,
- Target: v2.MetricTarget{
- Type: v2.UtilizationMetricType,
+ Target: v2beta2.MetricTarget{
+ Type: v2beta2.UtilizationMetricType,
AverageUtilization: &cluster.Spec.HpaPolicy.MEMAverageUtilization,
},
},
@@ -187,7 +187,7 @@ func (r *DSMasterReconciler) createHPA(cluster *dsv1alpha1.DSMaster) *v2.Horizon
return &hpa
}
-func (r *DSMasterReconciler) deleteHPA(ctx context.Context, hpa *v2.HorizontalPodAutoscaler) error {
+func (r *DSMasterReconciler) deleteHPA(ctx context.Context, hpa *v2beta2.HorizontalPodAutoscaler) error {
if err := r.Client.Delete(ctx, hpa, client.PropagationPolicy(metav1.DeletePropagationOrphan)); err != nil {
return err
}
diff --git a/main.go b/main.go
index 1661e0c..d7f2398 100644
--- a/main.go
+++ b/main.go
@@ -106,9 +106,11 @@ func main() {
setupLog.Error(err, "unable to create controller", "controller", "DSApi")
os.Exit(1)
}
- if err = (&dsv1alpha1.DSMaster{}).SetupWebhookWithManager(mgr); err != nil {
- setupLog.Error(err, "unable to create webhook", "webhook", "DSMaster")
- os.Exit(1)
+ if os.Getenv("ENABLE_WEBHOOKS") != "false" {
+ if err = (&dsv1alpha1.DSMaster{}).SetupWebhookWithManager(mgr); err != nil {
+ setupLog.Error(err, "unable to create webhook", "webhook", "DSMaster")
+ os.Exit(1)
+ }
}
//+kubebuilder:scaffold:builder