You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ch...@apache.org on 2022/09/20 03:43:50 UTC

[dolphinscheduler-operator] 13/44: feat(operator): add api

This is an automated email from the ASF dual-hosted git repository.

chufenggao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler-operator.git

commit c87b0c0a0e64815fa4706845d7db0091733b816e
Author: nobolity <no...@gmail.com>
AuthorDate: Sun May 29 21:52:19 2022 +0800

    feat(operator): add api
---
 api/v1alpha1/dsapi_types.go                        | 120 +++++++++++
 .../ds.apache.dolphinscheduler.dev_dsapis.yaml     |  56 ++++++
 config/crd/patches/cainjection_in_dsapis.yaml      |   7 +
 config/crd/patches/webhook_in_dsapis.yaml          |  16 ++
 config/rbac/dsapi_editor_role.yaml                 |  24 +++
 config/rbac/dsapi_viewer_role.yaml                 |  20 ++
 config/samples/ds_v1alpha1_dsapi.yaml              |  17 ++
 controllers/dsapi_controller.go                    | 221 +++++++++++++++++++++
 8 files changed, 481 insertions(+)

diff --git a/api/v1alpha1/dsapi_types.go b/api/v1alpha1/dsapi_types.go
new file mode 100644
index 0000000..01f11f9
--- /dev/null
+++ b/api/v1alpha1/dsapi_types.go
@@ -0,0 +1,120 @@
+/*
+Copyright 2022.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package v1alpha1
+
+import (
+	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+)
+
+// EDIT THIS FILE!  THIS IS SCAFFOLDING FOR YOU TO OWN!
+// NOTE: json tags are required.  Any new fields you add must have json tags for the fields to be serialized.
+
+// DSApiSpec defines the desired state of DSApi
+type DSApiSpec struct {
+	Datasource *DateSourceTemplate `json:"datasource"`
+	// INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
+	// Important: Run "make" to regenerate code after modifying this file
+
+	// Version is the expected version of the ds cluster.
+	// The ds-operator will eventually make the ds cluster version
+	// equal to the expected version.
+	// If version is not set, default is "3.0.0-alpha".
+	// +kubebuilder:default="3.0.0-alpha"
+	Version string `json:"version,omitempty"`
+
+	// Repository is the name of the repository that hosts
+	// ds container images. It should be direct clone of the repository in official
+	// By default, it is `apache/dolphinscheduler-master`.
+	// +kubebuilder:default=apache/dolphinscheduler-master
+	Repository string `json:"repository,omitempty"`
+
+	// Replicas is the expected size of the ms-master.
+	// The ds-master-operator will eventually make the size of the running
+	//  equal to the expected size.
+	// The vaild range of the size is from 1 to 7.
+	// +kubebuilder:default=3
+	// +kubebuilder:validation:Minimum=1
+	// +kubebuilder:validation:Maximum=7
+	Replicas int `json:"replicas"`
+
+	// Pod defines the policy to create pod for the dm-master pod.
+	// Updating Pod does not take effect on any existing dm-master pods.
+	Pod *PodPolicy `json:"pod,omitempty"`
+
+	// Paused is to pause the control of the operator for the ds-master .
+	// +kubebuilder:default=false
+	Paused bool `json:"paused,omitempty"`
+
+	//LogPvcName defines the  log capacity of application ,the position is /opt/dolphinscheduler/logs eg 20Gi
+	LogPvcName string `json:"log_pvc_name,omitempty"`
+
+	//ReGenerate defines if delete the old_deployment and create a new deployment
+	// +kubebuilder:default=false
+	ReGenerate bool `json:"re_generate,omitempty"`
+
+	//NodePort is the port node exposed
+	NodePort int32 `json:"node_port"`
+}
+
+// DSApiStatus defines the observed state of DSApi
+type DSApiStatus struct {
+	// INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
+	// Important: Run "make" to regenerate code after modifying this file
+	// INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
+	// Important: Run "make" to regenerate code after modifying this file
+	// Phase is the cluster running phase
+	// +kubebuilder:validation:Enum="";Creating;Running;Failed;Finished
+	Phase DsPhase `json:"phase,omitempty"`
+	// ControlPaused indicates the operator pauses the control of the cluster.
+	// +kubebuilder:default=false
+	ControlPaused bool `json:"controlPaused,omitempty"`
+
+	// Condition keeps track of all cluster conditions, if they exist.
+	Conditions []DsCondition `json:"conditions,omitempty"`
+
+	// Replicas is the current size of the cluster
+	// +kubebuilder:default=0
+	Replicas int `json:"replicas,omitempty"`
+
+	// Members are the dsMaster members in the cluster
+	Members MembersStatus `json:"members,omitempty"`
+}
+
+//+kubebuilder:object:root=true
+//+kubebuilder:subresource:status
+
+// DSApi is the Schema for the dsapis API
+type DSApi struct {
+	metav1.TypeMeta   `json:",inline"`
+	metav1.ObjectMeta `json:"metadata,omitempty"`
+
+	Spec   DSApiSpec   `json:"spec,omitempty"`
+	Status DSApiStatus `json:"status,omitempty"`
+}
+
+//+kubebuilder:object:root=true
+
+// DSApiList contains a list of DSApi
+type DSApiList struct {
+	metav1.TypeMeta `json:",inline"`
+	metav1.ListMeta `json:"metadata,omitempty"`
+	Items           []DSApi `json:"items"`
+}
+
+func init() {
+	SchemeBuilder.Register(&DSApi{}, &DSApiList{})
+}
diff --git a/config/crd/bases/ds.apache.dolphinscheduler.dev_dsapis.yaml b/config/crd/bases/ds.apache.dolphinscheduler.dev_dsapis.yaml
new file mode 100644
index 0000000..5f98244
--- /dev/null
+++ b/config/crd/bases/ds.apache.dolphinscheduler.dev_dsapis.yaml
@@ -0,0 +1,56 @@
+---
+apiVersion: apiextensions.k8s.io/v1
+kind: CustomResourceDefinition
+metadata:
+  annotations:
+    controller-gen.kubebuilder.io/version: v0.8.0
+  creationTimestamp: null
+  name: dsapis.ds.apache.dolphinscheduler.dev
+spec:
+  group: ds.apache.dolphinscheduler.dev
+  names:
+    kind: DSApi
+    listKind: DSApiList
+    plural: dsapis
+    singular: dsapi
+  scope: Namespaced
+  versions:
+  - name: v1alpha1
+    schema:
+      openAPIV3Schema:
+        description: DSApi is the Schema for the dsapis API
+        properties:
+          apiVersion:
+            description: 'APIVersion defines the versioned schema of this representation
+              of an object. Servers should convert recognized schemas to the latest
+              internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources'
+            type: string
+          kind:
+            description: 'Kind is a string value representing the REST resource this
+              object represents. Servers may infer this from the endpoint the client
+              submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds'
+            type: string
+          metadata:
+            type: object
+          spec:
+            description: DSApiSpec defines the desired state of DSApi
+            properties:
+              foo:
+                description: Foo is an example field of DSApi. Edit dsapi_types.go
+                  to remove/update
+                type: string
+            type: object
+          status:
+            description: DSApiStatus defines the observed state of DSApi
+            type: object
+        type: object
+    served: true
+    storage: true
+    subresources:
+      status: {}
+status:
+  acceptedNames:
+    kind: ""
+    plural: ""
+  conditions: []
+  storedVersions: []
diff --git a/config/crd/patches/cainjection_in_dsapis.yaml b/config/crd/patches/cainjection_in_dsapis.yaml
new file mode 100644
index 0000000..c11123f
--- /dev/null
+++ b/config/crd/patches/cainjection_in_dsapis.yaml
@@ -0,0 +1,7 @@
+# The following patch adds a directive for certmanager to inject CA into the CRD
+apiVersion: apiextensions.k8s.io/v1
+kind: CustomResourceDefinition
+metadata:
+  annotations:
+    cert-manager.io/inject-ca-from: $(CERTIFICATE_NAMESPACE)/$(CERTIFICATE_NAME)
+  name: dsapis.ds.apache.dolphinscheduler.dev
diff --git a/config/crd/patches/webhook_in_dsapis.yaml b/config/crd/patches/webhook_in_dsapis.yaml
new file mode 100644
index 0000000..937789c
--- /dev/null
+++ b/config/crd/patches/webhook_in_dsapis.yaml
@@ -0,0 +1,16 @@
+# The following patch enables a conversion webhook for the CRD
+apiVersion: apiextensions.k8s.io/v1
+kind: CustomResourceDefinition
+metadata:
+  name: dsapis.ds.apache.dolphinscheduler.dev
+spec:
+  conversion:
+    strategy: Webhook
+    webhook:
+      clientConfig:
+        service:
+          namespace: system
+          name: webhook-service
+          path: /convert
+      conversionReviewVersions:
+      - v1
diff --git a/config/rbac/dsapi_editor_role.yaml b/config/rbac/dsapi_editor_role.yaml
new file mode 100644
index 0000000..c7c0e68
--- /dev/null
+++ b/config/rbac/dsapi_editor_role.yaml
@@ -0,0 +1,24 @@
+# permissions for end users to edit dsapis.
+apiVersion: rbac.authorization.k8s.io/v1
+kind: ClusterRole
+metadata:
+  name: dsapi-editor-role
+rules:
+- apiGroups:
+  - ds.apache.dolphinscheduler.dev
+  resources:
+  - dsapis
+  verbs:
+  - create
+  - delete
+  - get
+  - list
+  - patch
+  - update
+  - watch
+- apiGroups:
+  - ds.apache.dolphinscheduler.dev
+  resources:
+  - dsapis/status
+  verbs:
+  - get
diff --git a/config/rbac/dsapi_viewer_role.yaml b/config/rbac/dsapi_viewer_role.yaml
new file mode 100644
index 0000000..acf5226
--- /dev/null
+++ b/config/rbac/dsapi_viewer_role.yaml
@@ -0,0 +1,20 @@
+# permissions for end users to view dsapis.
+apiVersion: rbac.authorization.k8s.io/v1
+kind: ClusterRole
+metadata:
+  name: dsapi-viewer-role
+rules:
+- apiGroups:
+  - ds.apache.dolphinscheduler.dev
+  resources:
+  - dsapis
+  verbs:
+  - get
+  - list
+  - watch
+- apiGroups:
+  - ds.apache.dolphinscheduler.dev
+  resources:
+  - dsapis/status
+  verbs:
+  - get
diff --git a/config/samples/ds_v1alpha1_dsapi.yaml b/config/samples/ds_v1alpha1_dsapi.yaml
new file mode 100644
index 0000000..e3264d7
--- /dev/null
+++ b/config/samples/ds_v1alpha1_dsapi.yaml
@@ -0,0 +1,17 @@
+apiVersion: ds.apache.dolphinscheduler.dev/v1alpha1
+kind: DSApi
+metadata:
+  name: ds-api
+  namespace: ds
+  labels:
+    app: ds-api
+spec:
+  replicas: 1
+  version: 3.0.0-alpha
+  repository: apache/dolphinscheduler-api
+  node_port:30002
+  datasource:
+    drive_name: "org.postgresql.Driver"
+    url: "jdbc:postgresql://172.17.0.4:5432/dolphinscheduler"
+    username: "postgresadmin"
+    password: "admin12345"
\ No newline at end of file
diff --git a/controllers/dsapi_controller.go b/controllers/dsapi_controller.go
new file mode 100644
index 0000000..8d74913
--- /dev/null
+++ b/controllers/dsapi_controller.go
@@ -0,0 +1,221 @@
+/*
+Copyright 2022.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package controllers
+
+import (
+	"context"
+	dsv1alpha1 "dolphinscheduler-operator/api/v1alpha1"
+	v1 "k8s.io/api/apps/v1"
+	corev1 "k8s.io/api/core/v1"
+	"k8s.io/apimachinery/pkg/api/errors"
+	apierrors "k8s.io/apimachinery/pkg/api/errors"
+	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+	"k8s.io/apimachinery/pkg/runtime"
+	"k8s.io/apimachinery/pkg/types"
+	"k8s.io/client-go/tools/record"
+	ctrl "sigs.k8s.io/controller-runtime"
+	"sigs.k8s.io/controller-runtime/pkg/client"
+	"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
+	"time"
+)
+
+var (
+	apiLogger = ctrl.Log.WithName("DSApi-controller")
+)
+
+// DSApiReconciler reconciles a DSApi object
+type DSApiReconciler struct {
+	client.Client
+	Scheme   *runtime.Scheme
+	Recorder record.EventRecorder
+}
+
+//+kubebuilder:rbac:groups=ds.apache.dolphinscheduler.dev,resources=dsapis,verbs=get;list;watch;create;update;patch;delete
+//+kubebuilder:rbac:groups=ds.apache.dolphinscheduler.dev,resources=dsapis/status,verbs=get;update;patch
+//+kubebuilder:rbac:groups=ds.apache.dolphinscheduler.dev,resources=dsapis/finalizers,verbs=update
+
+// Reconcile is part of the main kubernetes reconciliation loop which aims to
+// move the current state of the cluster closer to the desired state.
+// TODO(user): Modify the Reconcile function to compare the state specified by
+// the DSApi object against the actual cluster state, and then
+// perform operations to make the cluster state reflect the state specified by
+// the user.
+//
+// For more details, check Reconcile and its Result here:
+// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.11.0/pkg/reconcile
+func (r *DSApiReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
+	apiLogger.Info("dmApi start reconcile logic")
+	defer apiLogger.Info("dmApi Reconcile end ---------------------------------------------")
+
+	cluster := &dsv1alpha1.DSApi{}
+
+	if err := r.Client.Get(ctx, req.NamespacedName, cluster); err != nil {
+		if errors.IsNotFound(err) {
+			r.Recorder.Event(cluster, corev1.EventTypeWarning, "dsApi is not Found", "dsApi is not Found")
+			return ctrl.Result{}, nil
+		}
+		return ctrl.Result{}, err
+	}
+	desired := cluster.DeepCopy()
+
+	// Handler finalizer
+	// examine DeletionTimestamp to determine if object is under deletion
+	if cluster.ObjectMeta.DeletionTimestamp.IsZero() {
+		// The object is not being deleted, so if it does not have our finalizer,
+		// then lets add the finalizer and update the object. This is equivalent
+		// registering our finalizer.
+		if !controllerutil.ContainsFinalizer(desired, dsv1alpha1.FinalizerName) {
+			controllerutil.AddFinalizer(desired, dsv1alpha1.FinalizerName)
+			if err := r.Update(ctx, desired); err != nil {
+				return ctrl.Result{}, err
+			}
+		}
+	} else {
+		// The object is being deleted
+
+		if controllerutil.ContainsFinalizer(desired, dsv1alpha1.FinalizerName) {
+			// our finalizer is present, so lets handle any external dependency
+			if err := r.ensureDSApiDeleted(ctx, cluster); err != nil {
+				return ctrl.Result{}, err
+			}
+
+			// remove our finalizer from the list and update it.
+			controllerutil.RemoveFinalizer(desired, dsv1alpha1.FinalizerName)
+			if err := r.Update(ctx, desired); err != nil {
+				return ctrl.Result{}, err
+			}
+		}
+		// Stop reconciliation as the item is being deleted
+		return ctrl.Result{}, nil
+	}
+
+	if cluster.Spec.Paused {
+		apiLogger.Info("ds-Api control has been paused: ", "ds-Api-name", cluster.Name)
+		desired.Status.ControlPaused = true
+		if err := r.Status().Patch(ctx, desired, client.MergeFrom(cluster)); err != nil {
+			return ctrl.Result{}, err
+		}
+		r.Recorder.Event(cluster, corev1.EventTypeNormal, "the spec status is paused", "do nothing")
+		return ctrl.Result{}, nil
+	}
+
+	// 1. First time we see the ds-master-cluster, initialize it
+	if cluster.Status.Phase == dsv1alpha1.DsPhaseNone {
+		desired.Status.Phase = dsv1alpha1.DsPhaseCreating
+		apiLogger.Info("phase had been changed from  none ---> creating")
+		err := r.Client.Status().Patch(ctx, desired, client.MergeFrom(cluster))
+		return ctrl.Result{RequeueAfter: 100 * time.Millisecond}, err
+	}
+
+	//2 ensure the Api service
+	apiLogger.Info("Ensuring Api service")
+
+	if err := r.ensureApiService(ctx, cluster); err != nil {
+		return ctrl.Result{Requeue: true}, nil
+	}
+
+	if requeue, err := r.ensureApiDeployment(ctx, cluster); err != nil {
+		return ctrl.Result{Requeue: false}, err
+	} else {
+		if !requeue {
+			return ctrl.Result{Requeue: false}, nil
+		}
+	}
+
+	apiLogger.Info("******************************************************")
+	desired.Status.Phase = dsv1alpha1.DsPhaseNone
+	if err := r.Update(ctx, desired); err != nil {
+		return ctrl.Result{}, err
+	}
+	return ctrl.Result{Requeue: false}, nil
+}
+
+// SetupWithManager sets up the controller with the Manager.
+func (r *DSApiReconciler) SetupWithManager(mgr ctrl.Manager) error {
+	return ctrl.NewControllerManagedBy(mgr).
+		For(&dsv1alpha1.DSApi{}).
+		Owns(&v1.Deployment{}).
+		Owns(&corev1.Service{}).
+		Owns(&corev1.Pod{}).
+		Complete(r)
+}
+
+func (r *DSApiReconciler) ensureDSApiDeleted(ctx context.Context, DSApi *dsv1alpha1.DSApi) error {
+	if err := r.Client.Delete(ctx, DSApi, client.PropagationPolicy(metav1.DeletePropagationOrphan)); err != nil {
+		return err
+	}
+	return nil
+}
+
+func (r *DSApiReconciler) ensureApiService(ctx context.Context, cluster *dsv1alpha1.DSApi) error {
+	// 1. Client service
+	service := &corev1.Service{}
+	namespacedName := types.NamespacedName{Namespace: cluster.Namespace, Name: dsv1alpha1.DsApiServiceValue}
+	if err := r.Client.Get(ctx, namespacedName, service); err != nil {
+		// Local cache not found
+		logger.Info("get service error")
+		if apierrors.IsNotFound(err) {
+			service = createApiService(cluster)
+			if err := controllerutil.SetControllerReference(cluster, service, r.Scheme); err != nil {
+				logger.Info("create Api service error")
+				return err
+			}
+			// Remote may already exist, so we will return err, for the next time, this code will not execute
+			if err := r.Client.Create(ctx, service); err != nil {
+				logger.Info("create Api service error1")
+				return err
+			}
+			logger.Info("the Api service had been created")
+		}
+	}
+	return nil
+}
+
+func (r *DSApiReconciler) ensureApiDeployment(ctx context.Context, cluster *dsv1alpha1.DSApi) (bool, error) {
+	deployment := &v1.Deployment{}
+	deploymentNamespaceName := types.NamespacedName{Namespace: cluster.Namespace, Name: dsv1alpha1.DsApiDeploymentValue}
+	if err := r.Client.Get(ctx, deploymentNamespaceName, deployment); err != nil {
+		if apierrors.IsNotFound(err) {
+			deployment = createApiDeployment(cluster)
+		}
+		if err := controllerutil.SetControllerReference(cluster, deployment, r.Scheme); err != nil {
+			return true, err
+		}
+		if err := r.Client.Create(ctx, deployment); err == nil {
+			return false, nil
+		} else {
+			return true, err
+		}
+	} else {
+		err := r.updateApiDeployment(ctx, deployment, cluster)
+		if err != nil {
+			return false, err
+		}
+	}
+
+	return true, nil
+}
+
+//only notice the property of replicas  and image and version
+func (r *DSApiReconciler) updateApiDeployment(ctx context.Context, deployment *v1.Deployment, cluster *dsv1alpha1.DSApi) error {
+	deployment.Spec.Replicas = int32Ptr(int32(cluster.Spec.Replicas))
+	deployment.Spec.Template.Spec.Containers[0].Image = ImageName(cluster.Spec.Repository, cluster.Spec.Version)
+	if err := r.Client.Update(ctx, deployment); err != nil {
+		return err
+	}
+	return nil
+}