You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ch...@apache.org on 2022/09/20 03:43:50 UTC
[dolphinscheduler-operator] 13/44: feat(operator): add api
This is an automated email from the ASF dual-hosted git repository.
chufenggao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler-operator.git
commit c87b0c0a0e64815fa4706845d7db0091733b816e
Author: nobolity <no...@gmail.com>
AuthorDate: Sun May 29 21:52:19 2022 +0800
feat(operator): add api
---
api/v1alpha1/dsapi_types.go | 120 +++++++++++
.../ds.apache.dolphinscheduler.dev_dsapis.yaml | 56 ++++++
config/crd/patches/cainjection_in_dsapis.yaml | 7 +
config/crd/patches/webhook_in_dsapis.yaml | 16 ++
config/rbac/dsapi_editor_role.yaml | 24 +++
config/rbac/dsapi_viewer_role.yaml | 20 ++
config/samples/ds_v1alpha1_dsapi.yaml | 17 ++
controllers/dsapi_controller.go | 221 +++++++++++++++++++++
8 files changed, 481 insertions(+)
diff --git a/api/v1alpha1/dsapi_types.go b/api/v1alpha1/dsapi_types.go
new file mode 100644
index 0000000..01f11f9
--- /dev/null
+++ b/api/v1alpha1/dsapi_types.go
@@ -0,0 +1,120 @@
+/*
+Copyright 2022.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package v1alpha1
+
+import (
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+)
+
+// EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN!
+// NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized.
+
+// DSApiSpec defines the desired state of DSApi
+type DSApiSpec struct {
+ Datasource *DateSourceTemplate `json:"datasource"`
+ // INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
+ // Important: Run "make" to regenerate code after modifying this file
+
+ // Version is the expected version of the ds cluster.
+ // The ds-operator will eventually make the ds cluster version
+ // equal to the expected version.
+ // If version is not set, default is "3.0.0-alpha".
+ // +kubebuilder:default="3.0.0-alpha"
+ Version string `json:"version,omitempty"`
+
+ // Repository is the name of the repository that hosts
+ // ds container images. It should be direct clone of the repository in official
+ // By default, it is `apache/dolphinscheduler-master`.
+ // +kubebuilder:default=apache/dolphinscheduler-master
+ Repository string `json:"repository,omitempty"`
+
+ // Replicas is the expected size of the ms-master.
+ // The ds-master-operator will eventually make the size of the running
+ // equal to the expected size.
+ // The vaild range of the size is from 1 to 7.
+ // +kubebuilder:default=3
+ // +kubebuilder:validation:Minimum=1
+ // +kubebuilder:validation:Maximum=7
+ Replicas int `json:"replicas"`
+
+ // Pod defines the policy to create pod for the dm-master pod.
+ // Updating Pod does not take effect on any existing dm-master pods.
+ Pod *PodPolicy `json:"pod,omitempty"`
+
+ // Paused is to pause the control of the operator for the ds-master .
+ // +kubebuilder:default=false
+ Paused bool `json:"paused,omitempty"`
+
+ //LogPvcName defines the log capacity of application ,the position is /opt/dolphinscheduler/logs eg 20Gi
+ LogPvcName string `json:"log_pvc_name,omitempty"`
+
+ //ReGenerate defines if delete the old_deployment and create a new deployment
+ // +kubebuilder:default=false
+ ReGenerate bool `json:"re_generate,omitempty"`
+
+ //NodePort is the port node exposed
+ NodePort int32 `json:"node_port"`
+}
+
+// DSApiStatus defines the observed state of DSApi
+type DSApiStatus struct {
+ // INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
+ // Important: Run "make" to regenerate code after modifying this file
+ // INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
+ // Important: Run "make" to regenerate code after modifying this file
+ // Phase is the cluster running phase
+ // +kubebuilder:validation:Enum="";Creating;Running;Failed;Finished
+ Phase DsPhase `json:"phase,omitempty"`
+ // ControlPaused indicates the operator pauses the control of the cluster.
+ // +kubebuilder:default=false
+ ControlPaused bool `json:"controlPaused,omitempty"`
+
+ // Condition keeps track of all cluster conditions, if they exist.
+ Conditions []DsCondition `json:"conditions,omitempty"`
+
+ // Replicas is the current size of the cluster
+ // +kubebuilder:default=0
+ Replicas int `json:"replicas,omitempty"`
+
+ // Members are the dsMaster members in the cluster
+ Members MembersStatus `json:"members,omitempty"`
+}
+
+//+kubebuilder:object:root=true
+//+kubebuilder:subresource:status
+
+// DSApi is the Schema for the dsapis API
+type DSApi struct {
+ metav1.TypeMeta `json:",inline"`
+ metav1.ObjectMeta `json:"metadata,omitempty"`
+
+ Spec DSApiSpec `json:"spec,omitempty"`
+ Status DSApiStatus `json:"status,omitempty"`
+}
+
+//+kubebuilder:object:root=true
+
+// DSApiList contains a list of DSApi
+type DSApiList struct {
+ metav1.TypeMeta `json:",inline"`
+ metav1.ListMeta `json:"metadata,omitempty"`
+ Items []DSApi `json:"items"`
+}
+
+func init() {
+ SchemeBuilder.Register(&DSApi{}, &DSApiList{})
+}
diff --git a/config/crd/bases/ds.apache.dolphinscheduler.dev_dsapis.yaml b/config/crd/bases/ds.apache.dolphinscheduler.dev_dsapis.yaml
new file mode 100644
index 0000000..5f98244
--- /dev/null
+++ b/config/crd/bases/ds.apache.dolphinscheduler.dev_dsapis.yaml
@@ -0,0 +1,56 @@
+---
+apiVersion: apiextensions.k8s.io/v1
+kind: CustomResourceDefinition
+metadata:
+ annotations:
+ controller-gen.kubebuilder.io/version: v0.8.0
+ creationTimestamp: null
+ name: dsapis.ds.apache.dolphinscheduler.dev
+spec:
+ group: ds.apache.dolphinscheduler.dev
+ names:
+ kind: DSApi
+ listKind: DSApiList
+ plural: dsapis
+ singular: dsapi
+ scope: Namespaced
+ versions:
+ - name: v1alpha1
+ schema:
+ openAPIV3Schema:
+ description: DSApi is the Schema for the dsapis API
+ properties:
+ apiVersion:
+ description: 'APIVersion defines the versioned schema of this representation
+ of an object. Servers should convert recognized schemas to the latest
+ internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources'
+ type: string
+ kind:
+ description: 'Kind is a string value representing the REST resource this
+ object represents. Servers may infer this from the endpoint the client
+ submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds'
+ type: string
+ metadata:
+ type: object
+ spec:
+ description: DSApiSpec defines the desired state of DSApi
+ properties:
+ foo:
+ description: Foo is an example field of DSApi. Edit dsapi_types.go
+ to remove/update
+ type: string
+ type: object
+ status:
+ description: DSApiStatus defines the observed state of DSApi
+ type: object
+ type: object
+ served: true
+ storage: true
+ subresources:
+ status: {}
+status:
+ acceptedNames:
+ kind: ""
+ plural: ""
+ conditions: []
+ storedVersions: []
diff --git a/config/crd/patches/cainjection_in_dsapis.yaml b/config/crd/patches/cainjection_in_dsapis.yaml
new file mode 100644
index 0000000..c11123f
--- /dev/null
+++ b/config/crd/patches/cainjection_in_dsapis.yaml
@@ -0,0 +1,7 @@
+# The following patch adds a directive for certmanager to inject CA into the CRD
+apiVersion: apiextensions.k8s.io/v1
+kind: CustomResourceDefinition
+metadata:
+ annotations:
+ cert-manager.io/inject-ca-from: $(CERTIFICATE_NAMESPACE)/$(CERTIFICATE_NAME)
+ name: dsapis.ds.apache.dolphinscheduler.dev
diff --git a/config/crd/patches/webhook_in_dsapis.yaml b/config/crd/patches/webhook_in_dsapis.yaml
new file mode 100644
index 0000000..937789c
--- /dev/null
+++ b/config/crd/patches/webhook_in_dsapis.yaml
@@ -0,0 +1,16 @@
+# The following patch enables a conversion webhook for the CRD
+apiVersion: apiextensions.k8s.io/v1
+kind: CustomResourceDefinition
+metadata:
+ name: dsapis.ds.apache.dolphinscheduler.dev
+spec:
+ conversion:
+ strategy: Webhook
+ webhook:
+ clientConfig:
+ service:
+ namespace: system
+ name: webhook-service
+ path: /convert
+ conversionReviewVersions:
+ - v1
diff --git a/config/rbac/dsapi_editor_role.yaml b/config/rbac/dsapi_editor_role.yaml
new file mode 100644
index 0000000..c7c0e68
--- /dev/null
+++ b/config/rbac/dsapi_editor_role.yaml
@@ -0,0 +1,24 @@
+# permissions for end users to edit dsapis.
+apiVersion: rbac.authorization.k8s.io/v1
+kind: ClusterRole
+metadata:
+ name: dsapi-editor-role
+rules:
+- apiGroups:
+ - ds.apache.dolphinscheduler.dev
+ resources:
+ - dsapis
+ verbs:
+ - create
+ - delete
+ - get
+ - list
+ - patch
+ - update
+ - watch
+- apiGroups:
+ - ds.apache.dolphinscheduler.dev
+ resources:
+ - dsapis/status
+ verbs:
+ - get
diff --git a/config/rbac/dsapi_viewer_role.yaml b/config/rbac/dsapi_viewer_role.yaml
new file mode 100644
index 0000000..acf5226
--- /dev/null
+++ b/config/rbac/dsapi_viewer_role.yaml
@@ -0,0 +1,20 @@
+# permissions for end users to view dsapis.
+apiVersion: rbac.authorization.k8s.io/v1
+kind: ClusterRole
+metadata:
+ name: dsapi-viewer-role
+rules:
+- apiGroups:
+ - ds.apache.dolphinscheduler.dev
+ resources:
+ - dsapis
+ verbs:
+ - get
+ - list
+ - watch
+- apiGroups:
+ - ds.apache.dolphinscheduler.dev
+ resources:
+ - dsapis/status
+ verbs:
+ - get
diff --git a/config/samples/ds_v1alpha1_dsapi.yaml b/config/samples/ds_v1alpha1_dsapi.yaml
new file mode 100644
index 0000000..e3264d7
--- /dev/null
+++ b/config/samples/ds_v1alpha1_dsapi.yaml
@@ -0,0 +1,17 @@
+apiVersion: ds.apache.dolphinscheduler.dev/v1alpha1
+kind: DSApi
+metadata:
+ name: ds-api
+ namespace: ds
+ labels:
+ app: ds-api
+spec:
+ replicas: 1
+ version: 3.0.0-alpha
+ repository: apache/dolphinscheduler-api
+ node_port:30002
+ datasource:
+ drive_name: "org.postgresql.Driver"
+ url: "jdbc:postgresql://172.17.0.4:5432/dolphinscheduler"
+ username: "postgresadmin"
+ password: "admin12345"
\ No newline at end of file
diff --git a/controllers/dsapi_controller.go b/controllers/dsapi_controller.go
new file mode 100644
index 0000000..8d74913
--- /dev/null
+++ b/controllers/dsapi_controller.go
@@ -0,0 +1,221 @@
+/*
+Copyright 2022.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package controllers
+
+import (
+ "context"
+ dsv1alpha1 "dolphinscheduler-operator/api/v1alpha1"
+ v1 "k8s.io/api/apps/v1"
+ corev1 "k8s.io/api/core/v1"
+ "k8s.io/apimachinery/pkg/api/errors"
+ apierrors "k8s.io/apimachinery/pkg/api/errors"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/types"
+ "k8s.io/client-go/tools/record"
+ ctrl "sigs.k8s.io/controller-runtime"
+ "sigs.k8s.io/controller-runtime/pkg/client"
+ "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
+ "time"
+)
+
+var (
+ apiLogger = ctrl.Log.WithName("DSApi-controller")
+)
+
+// DSApiReconciler reconciles a DSApi object
+type DSApiReconciler struct {
+ client.Client
+ Scheme *runtime.Scheme
+ Recorder record.EventRecorder
+}
+
+//+kubebuilder:rbac:groups=ds.apache.dolphinscheduler.dev,resources=dsapis,verbs=get;list;watch;create;update;patch;delete
+//+kubebuilder:rbac:groups=ds.apache.dolphinscheduler.dev,resources=dsapis/status,verbs=get;update;patch
+//+kubebuilder:rbac:groups=ds.apache.dolphinscheduler.dev,resources=dsapis/finalizers,verbs=update
+
+// Reconcile is part of the main kubernetes reconciliation loop which aims to
+// move the current state of the cluster closer to the desired state.
+// TODO(user): Modify the Reconcile function to compare the state specified by
+// the DSApi object against the actual cluster state, and then
+// perform operations to make the cluster state reflect the state specified by
+// the user.
+//
+// For more details, check Reconcile and its Result here:
+// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.11.0/pkg/reconcile
+func (r *DSApiReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
+ apiLogger.Info("dmApi start reconcile logic")
+ defer apiLogger.Info("dmApi Reconcile end ---------------------------------------------")
+
+ cluster := &dsv1alpha1.DSApi{}
+
+ if err := r.Client.Get(ctx, req.NamespacedName, cluster); err != nil {
+ if errors.IsNotFound(err) {
+ r.Recorder.Event(cluster, corev1.EventTypeWarning, "dsApi is not Found", "dsApi is not Found")
+ return ctrl.Result{}, nil
+ }
+ return ctrl.Result{}, err
+ }
+ desired := cluster.DeepCopy()
+
+ // Handler finalizer
+ // examine DeletionTimestamp to determine if object is under deletion
+ if cluster.ObjectMeta.DeletionTimestamp.IsZero() {
+ // The object is not being deleted, so if it does not have our finalizer,
+ // then lets add the finalizer and update the object. This is equivalent
+ // registering our finalizer.
+ if !controllerutil.ContainsFinalizer(desired, dsv1alpha1.FinalizerName) {
+ controllerutil.AddFinalizer(desired, dsv1alpha1.FinalizerName)
+ if err := r.Update(ctx, desired); err != nil {
+ return ctrl.Result{}, err
+ }
+ }
+ } else {
+ // The object is being deleted
+
+ if controllerutil.ContainsFinalizer(desired, dsv1alpha1.FinalizerName) {
+ // our finalizer is present, so lets handle any external dependency
+ if err := r.ensureDSApiDeleted(ctx, cluster); err != nil {
+ return ctrl.Result{}, err
+ }
+
+ // remove our finalizer from the list and update it.
+ controllerutil.RemoveFinalizer(desired, dsv1alpha1.FinalizerName)
+ if err := r.Update(ctx, desired); err != nil {
+ return ctrl.Result{}, err
+ }
+ }
+ // Stop reconciliation as the item is being deleted
+ return ctrl.Result{}, nil
+ }
+
+ if cluster.Spec.Paused {
+ apiLogger.Info("ds-Api control has been paused: ", "ds-Api-name", cluster.Name)
+ desired.Status.ControlPaused = true
+ if err := r.Status().Patch(ctx, desired, client.MergeFrom(cluster)); err != nil {
+ return ctrl.Result{}, err
+ }
+ r.Recorder.Event(cluster, corev1.EventTypeNormal, "the spec status is paused", "do nothing")
+ return ctrl.Result{}, nil
+ }
+
+ // 1. First time we see the ds-master-cluster, initialize it
+ if cluster.Status.Phase == dsv1alpha1.DsPhaseNone {
+ desired.Status.Phase = dsv1alpha1.DsPhaseCreating
+ apiLogger.Info("phase had been changed from none ---> creating")
+ err := r.Client.Status().Patch(ctx, desired, client.MergeFrom(cluster))
+ return ctrl.Result{RequeueAfter: 100 * time.Millisecond}, err
+ }
+
+ //2 ensure the Api service
+ apiLogger.Info("Ensuring Api service")
+
+ if err := r.ensureApiService(ctx, cluster); err != nil {
+ return ctrl.Result{Requeue: true}, nil
+ }
+
+ if requeue, err := r.ensureApiDeployment(ctx, cluster); err != nil {
+ return ctrl.Result{Requeue: false}, err
+ } else {
+ if !requeue {
+ return ctrl.Result{Requeue: false}, nil
+ }
+ }
+
+ apiLogger.Info("******************************************************")
+ desired.Status.Phase = dsv1alpha1.DsPhaseNone
+ if err := r.Update(ctx, desired); err != nil {
+ return ctrl.Result{}, err
+ }
+ return ctrl.Result{Requeue: false}, nil
+}
+
+// SetupWithManager sets up the controller with the Manager.
+func (r *DSApiReconciler) SetupWithManager(mgr ctrl.Manager) error {
+ return ctrl.NewControllerManagedBy(mgr).
+ For(&dsv1alpha1.DSApi{}).
+ Owns(&v1.Deployment{}).
+ Owns(&corev1.Service{}).
+ Owns(&corev1.Pod{}).
+ Complete(r)
+}
+
+func (r *DSApiReconciler) ensureDSApiDeleted(ctx context.Context, DSApi *dsv1alpha1.DSApi) error {
+ if err := r.Client.Delete(ctx, DSApi, client.PropagationPolicy(metav1.DeletePropagationOrphan)); err != nil {
+ return err
+ }
+ return nil
+}
+
+func (r *DSApiReconciler) ensureApiService(ctx context.Context, cluster *dsv1alpha1.DSApi) error {
+ // 1. Client service
+ service := &corev1.Service{}
+ namespacedName := types.NamespacedName{Namespace: cluster.Namespace, Name: dsv1alpha1.DsApiServiceValue}
+ if err := r.Client.Get(ctx, namespacedName, service); err != nil {
+ // Local cache not found
+ logger.Info("get service error")
+ if apierrors.IsNotFound(err) {
+ service = createApiService(cluster)
+ if err := controllerutil.SetControllerReference(cluster, service, r.Scheme); err != nil {
+ logger.Info("create Api service error")
+ return err
+ }
+ // Remote may already exist, so we will return err, for the next time, this code will not execute
+ if err := r.Client.Create(ctx, service); err != nil {
+ logger.Info("create Api service error1")
+ return err
+ }
+ logger.Info("the Api service had been created")
+ }
+ }
+ return nil
+}
+
+func (r *DSApiReconciler) ensureApiDeployment(ctx context.Context, cluster *dsv1alpha1.DSApi) (bool, error) {
+ deployment := &v1.Deployment{}
+ deploymentNamespaceName := types.NamespacedName{Namespace: cluster.Namespace, Name: dsv1alpha1.DsApiDeploymentValue}
+ if err := r.Client.Get(ctx, deploymentNamespaceName, deployment); err != nil {
+ if apierrors.IsNotFound(err) {
+ deployment = createApiDeployment(cluster)
+ }
+ if err := controllerutil.SetControllerReference(cluster, deployment, r.Scheme); err != nil {
+ return true, err
+ }
+ if err := r.Client.Create(ctx, deployment); err == nil {
+ return false, nil
+ } else {
+ return true, err
+ }
+ } else {
+ err := r.updateApiDeployment(ctx, deployment, cluster)
+ if err != nil {
+ return false, err
+ }
+ }
+
+ return true, nil
+}
+
+//only notice the property of replicas and image and version
+func (r *DSApiReconciler) updateApiDeployment(ctx context.Context, deployment *v1.Deployment, cluster *dsv1alpha1.DSApi) error {
+ deployment.Spec.Replicas = int32Ptr(int32(cluster.Spec.Replicas))
+ deployment.Spec.Template.Spec.Containers[0].Image = ImageName(cluster.Spec.Repository, cluster.Spec.Version)
+ if err := r.Client.Update(ctx, deployment); err != nil {
+ return err
+ }
+ return nil
+}