You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2020/12/04 23:01:12 UTC

[skywalking-swck] branch master updated: Operator templates enhancement (#14)

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking-swck.git


The following commit(s) were added to refs/heads/master by this push:
     new b86e3e9  Operator templates enhancement (#14)
b86e3e9 is described below

commit b86e3e95dbd4c06a25ebf20782e8f6eeee09e562
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Sat Dec 5 07:01:04 2020 +0800

    Operator templates enhancement (#14)
    
    * Introduce templates
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
    
    * Update docs
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
    
    * Polish codes
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
 cmd/manager/manager.go                       |   4 +-
 controllers/operator/oapserver_controller.go | 108 ++++++++++-----------------
 pkg/kubernetes/apply.go                      |  82 ++++++++++----------
 pkg/kubernetes/object.go                     |  95 +++++++++++++++++++++++
 pkg/operator/repo/repo.go                    |  91 +++++++++++++++++++++-
 5 files changed, 265 insertions(+), 115 deletions(-)

diff --git a/cmd/manager/manager.go b/cmd/manager/manager.go
index 18ae64e..3fe8046 100644
--- a/cmd/manager/manager.go
+++ b/cmd/manager/manager.go
@@ -48,14 +48,16 @@ func init() {
 func main() {
 	var metricsAddr string
 	var enableLeaderElection bool
+	var dev bool
 	flag.StringVar(&metricsAddr, "metrics-addr", ":8080", "The address the metric endpoint binds to.")
 	flag.BoolVar(&enableLeaderElection, "enable-leader-election", false,
 		"Enable leader election for controller manager. "+
 			"Enabling this will ensure there is only one active controller manager.")
+	flag.BoolVar(&dev, "dev", false, "Enable development mode")
 	flag.Parse()
 
 	ctrl.SetLogger(zap.New(func(o *zap.Options) {
-		o.Development = true
+		o.Development = dev
 	}))
 
 	mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
diff --git a/controllers/operator/oapserver_controller.go b/controllers/operator/oapserver_controller.go
index ab91f6a..76d2d26 100644
--- a/controllers/operator/oapserver_controller.go
+++ b/controllers/operator/oapserver_controller.go
@@ -26,7 +26,6 @@ import (
 	"github.com/go-logr/logr"
 	apps "k8s.io/api/apps/v1"
 	core "k8s.io/api/core/v1"
-	rbac "k8s.io/api/rbac/v1"
 	apiequal "k8s.io/apimachinery/pkg/api/equality"
 	"k8s.io/apimachinery/pkg/runtime"
 	ctrl "sigs.k8s.io/controller-runtime"
@@ -63,84 +62,59 @@ func (r *OAPServerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
 	if err := r.Client.Get(ctx, req.NamespacedName, &oapServer); err != nil {
 		return ctrl.Result{}, client.IgnoreNotFound(err)
 	}
+	ff, err := r.FileRepo.GetFilesRecursive("templates")
+	if err != nil {
+		log.Error(err, "failed to load resource templates")
+		return ctrl.Result{}, err
+	}
 	app := kubernetes.Application{
-		Log:      r.Log,
 		Client:   r.Client,
 		FileRepo: r.FileRepo,
 		CR:       &oapServer,
 		GVK:      operatorv1alpha1.GroupVersion.WithKind("OAPServer"),
+		TmplFunc: tmplFunc(&oapServer),
 	}
-	if err := app.Apply(ctx, kubernetes.K8SObj{
-		Name:      "service_account",
-		Key:       client.ObjectKey{Namespace: oapServer.Namespace, Name: oapServer.Name + "-oap"},
-		Prototype: &core.ServiceAccount{},
-	}); err != nil {
-		return ctrl.Result{}, err
-	}
-	if err := app.Apply(ctx, kubernetes.K8SObj{
-		Name:      "cluster_role",
-		Key:       client.ObjectKey{Name: "swck:oapserver"},
-		Prototype: &rbac.ClusterRole{},
-	}); err != nil {
-		return ctrl.Result{}, err
-	}
-	if err := app.Apply(ctx, kubernetes.K8SObj{
-		Name:      "cluster_role_binding",
-		Key:       client.ObjectKey{Name: "swck:oapserver"},
-		Prototype: &rbac.ClusterRoleBinding{},
-	}); err != nil {
-		return ctrl.Result{}, err
-	}
-	if err := app.Apply(ctx, kubernetes.K8SObj{
-		Name:      "service",
-		Key:       client.ObjectKey{Namespace: oapServer.Namespace, Name: oapServer.Name},
-		Prototype: &core.Service{},
-		Extract: func(obj client.Object) interface{} {
-			return obj.(*core.Service).Spec
-		},
-	}); err != nil {
-		return ctrl.Result{}, err
+	for _, f := range ff {
+		l := log.WithName(f)
+		if err := app.Apply(ctx, f, l); err != nil {
+			l.Error(err, "failed to apply resource")
+			return ctrl.Result{}, err
+		}
 	}
-	if err := app.Apply(ctx, kubernetes.K8SObj{
-		Name:      "deployment",
-		Key:       client.ObjectKey{Namespace: oapServer.Namespace, Name: oapServer.Name},
-		Prototype: &apps.Deployment{},
-		TmplFunc: template.FuncMap{
-			"generateImage": func() string {
-				image := oapServer.Spec.Image
-				if image == "" {
-					v := oapServer.Spec.Version
-					vTmpl := "apache/skywalking-oap-server:%s-%s"
-					vES := "es6"
-					for _, e := range oapServer.Spec.Config {
-						if e.Name != "SW_STORAGE" {
-							continue
-						}
-						if e.Value == "elasticsearch7" {
-							vES = "es7"
-						}
+
+	r.istio(ctx, log, oapServer.Name, &oapServer)
+
+	return ctrl.Result{RequeueAfter: r.checkState(ctx, log, &oapServer)}, nil
+}
+
+func tmplFunc(oapServer *operatorv1alpha1.OAPServer) template.FuncMap {
+	return template.FuncMap{
+		"generateImage": func() string {
+			image := oapServer.Spec.Image
+			if image == "" {
+				v := oapServer.Spec.Version
+				vTmpl := "apache/skywalking-oap-server:%s-%s"
+				vES := "es6"
+				for _, e := range oapServer.Spec.Config {
+					if e.Name != "SW_STORAGE" {
+						continue
+					}
+					if e.Value == "elasticsearch7" {
+						vES = "es7"
 					}
-					image = fmt.Sprintf(vTmpl, v, vES)
 				}
-				return image
-			},
-		},
-		Extract: func(obj client.Object) interface{} {
-			return obj.(*apps.Deployment).Spec
+				image = fmt.Sprintf(vTmpl, v, vES)
+			}
+			return image
 		},
-	}); err != nil {
-		return ctrl.Result{}, err
 	}
-	r.istio(ctx, log, oapServer.Name, &oapServer)
-
-	return ctrl.Result{RequeueAfter: r.checkState(ctx, log, &oapServer, oapServer.Name)}, nil
 }
 
-func (r *OAPServerReconciler) checkState(ctx context.Context, log logr.Logger, oapServer *operatorv1alpha1.OAPServer, name string) time.Duration {
+func (r *OAPServerReconciler) checkState(ctx context.Context, log logr.Logger, oapServer *operatorv1alpha1.OAPServer) time.Duration {
 	overlay := operatorv1alpha1.OAPServerStatus{}
 	deployment := apps.Deployment{}
 	nextSchedule := schedDuration
-	if err := r.Client.Get(ctx, client.ObjectKey{Namespace: oapServer.Namespace, Name: name}, &deployment); err != nil {
+	if err := r.Client.Get(ctx, client.ObjectKey{Namespace: oapServer.Namespace, Name: oapServer.Name}, &deployment); err != nil {
 		nextSchedule = rushModeSchedDuration
 	} else {
 		overlay.Conditions = deployment.Status.Conditions
@@ -148,17 +122,17 @@ func (r *OAPServerReconciler) checkState(ctx context.Context, log logr.Logger, o
 		if oapServer.Spec.Instances != overlay.AvailableReplicas {
 			nextSchedule = rushModeSchedDuration
 		}
-		if oapServer.Spec.Image == "" {
+		if oapServer.Spec.Image != deployment.Spec.Template.Spec.Containers[0].Image {
 			oapServer.Spec.Image = deployment.Spec.Template.Spec.Containers[0].Image
 			if err := r.Update(ctx, oapServer); err != nil {
 				log.Error(err, "failed to update OAPServer Image field")
 			}
-			log.Info("updated OAPServer Image field")
-			return rushModeSchedDuration
+			log.Info("updated OAPServer Image")
+			nextSchedule = rushModeSchedDuration
 		}
 	}
 	service := core.Service{}
-	if err := r.Client.Get(ctx, client.ObjectKey{Namespace: oapServer.Namespace, Name: name}, &service); err != nil {
+	if err := r.Client.Get(ctx, client.ObjectKey{Namespace: oapServer.Namespace, Name: oapServer.Name}, &service); err != nil {
 		nextSchedule = rushModeSchedDuration
 	} else {
 		overlay.Address = fmt.Sprintf("%s.%s", service.Name, service.Namespace)
diff --git a/pkg/kubernetes/apply.go b/pkg/kubernetes/apply.go
index ad73220..5ec48fb 100644
--- a/pkg/kubernetes/apply.go
+++ b/pkg/kubernetes/apply.go
@@ -23,9 +23,9 @@ import (
 	"text/template"
 
 	"github.com/go-logr/logr"
-	apiequal "k8s.io/apimachinery/pkg/api/equality"
 	apierrors "k8s.io/apimachinery/pkg/api/errors"
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
 	"k8s.io/apimachinery/pkg/runtime/schema"
 	"sigs.k8s.io/controller-runtime/pkg/client"
 )
@@ -33,89 +33,85 @@ import (
 // Repo provides tools to access templates
 type Repo interface {
 	ReadFile(path string) ([]byte, error)
+	GetFilesRecursive(path string) ([]string, error)
 }
 
 // Application contains the resource of one single component which is applied to api server
 type Application struct {
 	client.Client
 	CR       client.Object
-	Log      logr.Logger
 	FileRepo Repo
 	GVK      schema.GroupVersionKind
+	TmplFunc template.FuncMap
 }
 
-// K8SObj is a sub template of an Application
-type K8SObj struct {
-	Name      string
-	Key       client.ObjectKey
-	Prototype client.Object
-	TmplFunc  template.FuncMap
-	Extract   ExtractFunc
-}
-
-type ExtractFunc func(obj client.Object) interface{}
-
 // Apply a template represents a component to api server
-func (a *Application) Apply(ctx context.Context, input K8SObj) error {
-	log := a.Log.WithName(input.Name)
-	manifests, err := a.FileRepo.ReadFile(fmt.Sprintf("%s.yaml", input.Name))
+func (a *Application) Apply(ctx context.Context, manifest string, log logr.Logger) error {
+	manifests, err := a.FileRepo.ReadFile(manifest)
 	if err != nil {
 		return err
 	}
-	overlay := input.Prototype.DeepCopyObject().(client.Object)
-	current := input.Prototype.DeepCopyObject().(client.Object)
-	err = a.Get(ctx, input.Key, current)
+	proto := &unstructured.Unstructured{}
+	err = LoadTemplate(string(manifests), a.CR, a.TmplFunc, proto)
+	if err != nil {
+		return fmt.Errorf("failed to load template: %w", err)
+	}
+	key := client.ObjectKeyFromObject(proto)
+	current := &unstructured.Unstructured{}
+	current.SetGroupVersionKind(proto.GetObjectKind().GroupVersionKind())
+	err = a.Get(ctx, key, current)
 	if apierrors.IsNotFound(err) {
 		log.Info("could not find existing resource, creating one...")
-
-		curr, errComp := compose(string(manifests), current, overlay, a.CR, a.GVK, input.TmplFunc)
+		curr, errComp := a.compose(proto)
 		if errComp != nil {
-			return errComp
+			return fmt.Errorf("failed to compose: %w", errComp)
 		}
 
 		if err = a.Create(ctx, curr); err != nil {
-			return err
+			return fmt.Errorf("failed to create: %w", errComp)
 		}
 
-		log.Info("created resource")
+		log.Info("created")
 		return nil
 	}
 	if err != nil {
-		return err
-	}
-	if input.Extract == nil {
-		return nil
+		return fmt.Errorf("failed to get %v : %w", key, err)
 	}
 
-	object, err := compose(string(manifests), current, overlay, a.CR, a.GVK, input.TmplFunc)
+	object, err := a.compose(proto)
 	if err != nil {
-		return err
+		return fmt.Errorf("failed to compose: %w", err)
 	}
 
-	if apiequal.Semantic.DeepDerivative(input.Extract(object), input.Extract(current)) {
+	if getVersion(current, a.versionKey()) == getVersion(object, a.versionKey()) {
 		log.Info("resource keeps the same as before")
 		return nil
 	}
 	if err := a.Update(ctx, object); err != nil {
-		return err
+		return fmt.Errorf("failed to update: %w", err)
 	}
-	log.Info("updated resource")
+	log.Info("updated")
 	return nil
 }
 
-func compose(manifests string, base, overlay, cr client.Object,
-	gvk schema.GroupVersionKind, tmplFunc template.FuncMap) (client.Object, error) {
-	err := LoadTemplate(manifests, cr, tmplFunc, overlay)
+func (a *Application) setVersionAnnotation(o *unstructured.Unstructured) error {
+	h, err := hash(o)
 	if err != nil {
-		return nil, err
+		return err
 	}
+	setVersion(o, a.versionKey(), h)
+	return nil
+}
 
-	overlay.SetOwnerReferences([]metav1.OwnerReference{*metav1.NewControllerRef(cr, gvk)})
-	if base == nil {
-		return overlay, nil
-	}
-	if err := ApplyOverlay(base, overlay); err != nil {
+func (a *Application) versionKey() string {
+	return a.GVK.Group + "/version"
+}
+
+func (a *Application) compose(object *unstructured.Unstructured) (*unstructured.Unstructured, error) {
+	object.SetOwnerReferences([]metav1.OwnerReference{*metav1.NewControllerRef(a.CR, a.GVK)})
+	err := a.setVersionAnnotation(object)
+	if err != nil {
 		return nil, err
 	}
-	return base, nil
+	return object, nil
 }
diff --git a/pkg/kubernetes/object.go b/pkg/kubernetes/object.go
new file mode 100644
index 0000000..0a1a5b9
--- /dev/null
+++ b/pkg/kubernetes/object.go
@@ -0,0 +1,95 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package kubernetes
+
+import (
+	"encoding/hex"
+	"encoding/json"
+	"hash/fnv"
+	"strings"
+
+	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
+)
+
+func getVersion(o *unstructured.Unstructured, key string) string {
+	ann := o.GetAnnotations()
+	if ann == nil {
+		return ""
+	}
+	return ann[key]
+}
+
+func setVersion(o *unstructured.Unstructured, key string, version string) {
+	ann := make(map[string]string)
+	for k, v := range o.GetAnnotations() {
+		ann[k] = v
+	}
+	ann[key] = version
+	o.SetAnnotations(ann)
+}
+
+func hash(o *unstructured.Unstructured) (string, error) {
+	forHashing := make(map[string]interface{})
+	for field, contents := range o.Object {
+		if !isMeta(field) {
+			forHashing[field] = contents
+		}
+	}
+	if len(forHashing) == 0 {
+		forHashing = map[string]interface{}{"no-contents-use-meta": metaHash(o)}
+	}
+	bytes, err := json.Marshal(forHashing)
+	if err != nil {
+		return "", err
+	}
+	return alphanumeric(bytes), nil
+}
+
+func metaHash(o *unstructured.Unstructured) string {
+	if o == nil {
+		return "-"
+	}
+	kind := o.GetObjectKind().GroupVersionKind().Kind
+	namespace := o.GetNamespace()
+	switch kind {
+	case "ClusterRole", "ClusterRoleBinding":
+		namespace = ""
+	}
+	return strings.Join([]string{kind, namespace, o.GetName()}, ":")
+}
+
+func isMeta(name string) bool {
+	switch name {
+	case "kind", "apiVersion", "metadata":
+		return true
+
+	default:
+		return false
+	}
+}
+
+func alphanumeric(in []byte) string {
+	if in == nil {
+		return ""
+	}
+
+	hash := fnv.New64a()
+	_, _ = hash.Write(in)
+	out := hash.Sum(make([]byte, 0, 8))
+	return hex.EncodeToString(out)
+}
diff --git a/pkg/operator/repo/repo.go b/pkg/operator/repo/repo.go
index ffba7d5..5c300f4 100644
--- a/pkg/operator/repo/repo.go
+++ b/pkg/operator/repo/repo.go
@@ -17,7 +17,16 @@
 
 package repo
 
-import "fmt"
+import (
+	"fmt"
+	"os"
+	"path/filepath"
+	"time"
+
+	"github.com/apache/skywalking-swck/pkg/kubernetes"
+)
+
+var _ kubernetes.Repo = &AssetsRepo{}
 
 // AssetsRepo provides templates through assets
 type AssetsRepo struct {
@@ -25,10 +34,84 @@ type AssetsRepo struct {
 }
 
 func NewRepo(component string) *AssetsRepo {
-	return &AssetsRepo{Root: fmt.Sprintf("%s/templates", component)}
+	return &AssetsRepo{Root: component}
 }
 
 // ReadFile reads the content of compiled in files at path and returns a buffer with the data.
-func (v *AssetsRepo) ReadFile(path string) ([]byte, error) {
-	return Asset(fmt.Sprintf("%s/%s", v.Root, path))
+func (a *AssetsRepo) ReadFile(path string) ([]byte, error) {
+	return Asset(path)
+}
+
+func (a *AssetsRepo) GetFilesRecursive(path string) ([]string, error) {
+	ap := fmt.Sprintf("%s/%s", a.Root, path)
+	rootFI, err := Stat(ap)
+	if err != nil {
+		return nil, err
+	}
+	return getFilesRecursive(filepath.Dir(ap), rootFI)
+}
+
+func getFilesRecursive(prefix string, root os.FileInfo) ([]string, error) {
+	if !root.IsDir() {
+		return nil, fmt.Errorf("not a dir: %s", root.Name())
+	}
+	prefix = filepath.Join(prefix, root.Name())
+	fs, _ := AssetDir(prefix)
+	out := make([]string, 0)
+	for _, f := range fs {
+		info, err := Stat(filepath.Join(prefix, f))
+		if err != nil {
+			return nil, err
+		}
+		if !info.IsDir() {
+			out = append(out, filepath.Join(prefix, filepath.Base(info.Name())))
+			continue
+		}
+		nfs, err := getFilesRecursive(prefix, info)
+		if err != nil {
+			return nil, err
+		}
+		out = append(out, nfs...)
+	}
+	return out, nil
+}
+
+// Stat returns a FileInfo object for the given path.
+func Stat(path string) (os.FileInfo, error) {
+	info, err := AssetInfo(path)
+	if err != nil {
+		// try it as a directory instead
+		_, err = AssetDir(path)
+		if err == nil {
+			info = &dirInfo{name: filepath.Base(path)}
+		}
+	} else {
+		fi := info.(bindataFileInfo)
+		fi.name = filepath.Base(fi.name)
+	}
+
+	return info, err
+}
+
+type dirInfo struct {
+	name string
+}
+
+func (di dirInfo) Name() string {
+	return di.name
+}
+func (di dirInfo) Size() int64 {
+	return 0
+}
+func (di dirInfo) Mode() os.FileMode {
+	return os.FileMode(0)
+}
+func (di dirInfo) ModTime() time.Time {
+	return time.Unix(0, 0)
+}
+func (di dirInfo) IsDir() bool {
+	return true
+}
+func (di dirInfo) Sys() interface{} {
+	return nil
 }