You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2020/12/04 23:01:12 UTC
[skywalking-swck] branch master updated: Operator templates
enhancement (#14)
This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking-swck.git
The following commit(s) were added to refs/heads/master by this push:
new b86e3e9 Operator templates enhancement (#14)
b86e3e9 is described below
commit b86e3e95dbd4c06a25ebf20782e8f6eeee09e562
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Sat Dec 5 07:01:04 2020 +0800
Operator templates enhancement (#14)
* Introduce templates
Signed-off-by: Gao Hongtao <ha...@gmail.com>
* Update docs
Signed-off-by: Gao Hongtao <ha...@gmail.com>
* Polish codes
Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
cmd/manager/manager.go | 4 +-
controllers/operator/oapserver_controller.go | 108 ++++++++++-----------------
pkg/kubernetes/apply.go | 82 ++++++++++----------
pkg/kubernetes/object.go | 95 +++++++++++++++++++++++
pkg/operator/repo/repo.go | 91 +++++++++++++++++++++-
5 files changed, 265 insertions(+), 115 deletions(-)
diff --git a/cmd/manager/manager.go b/cmd/manager/manager.go
index 18ae64e..3fe8046 100644
--- a/cmd/manager/manager.go
+++ b/cmd/manager/manager.go
@@ -48,14 +48,16 @@ func init() {
func main() {
var metricsAddr string
var enableLeaderElection bool
+ var dev bool
flag.StringVar(&metricsAddr, "metrics-addr", ":8080", "The address the metric endpoint binds to.")
flag.BoolVar(&enableLeaderElection, "enable-leader-election", false,
"Enable leader election for controller manager. "+
"Enabling this will ensure there is only one active controller manager.")
+ flag.BoolVar(&dev, "dev", false, "Enable development mode")
flag.Parse()
ctrl.SetLogger(zap.New(func(o *zap.Options) {
- o.Development = true
+ o.Development = dev
}))
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
diff --git a/controllers/operator/oapserver_controller.go b/controllers/operator/oapserver_controller.go
index ab91f6a..76d2d26 100644
--- a/controllers/operator/oapserver_controller.go
+++ b/controllers/operator/oapserver_controller.go
@@ -26,7 +26,6 @@ import (
"github.com/go-logr/logr"
apps "k8s.io/api/apps/v1"
core "k8s.io/api/core/v1"
- rbac "k8s.io/api/rbac/v1"
apiequal "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
@@ -63,84 +62,59 @@ func (r *OAPServerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
if err := r.Client.Get(ctx, req.NamespacedName, &oapServer); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}
+ ff, err := r.FileRepo.GetFilesRecursive("templates")
+ if err != nil {
+ log.Error(err, "failed to load resource templates")
+ return ctrl.Result{}, err
+ }
app := kubernetes.Application{
- Log: r.Log,
Client: r.Client,
FileRepo: r.FileRepo,
CR: &oapServer,
GVK: operatorv1alpha1.GroupVersion.WithKind("OAPServer"),
+ TmplFunc: tmplFunc(&oapServer),
}
- if err := app.Apply(ctx, kubernetes.K8SObj{
- Name: "service_account",
- Key: client.ObjectKey{Namespace: oapServer.Namespace, Name: oapServer.Name + "-oap"},
- Prototype: &core.ServiceAccount{},
- }); err != nil {
- return ctrl.Result{}, err
- }
- if err := app.Apply(ctx, kubernetes.K8SObj{
- Name: "cluster_role",
- Key: client.ObjectKey{Name: "swck:oapserver"},
- Prototype: &rbac.ClusterRole{},
- }); err != nil {
- return ctrl.Result{}, err
- }
- if err := app.Apply(ctx, kubernetes.K8SObj{
- Name: "cluster_role_binding",
- Key: client.ObjectKey{Name: "swck:oapserver"},
- Prototype: &rbac.ClusterRoleBinding{},
- }); err != nil {
- return ctrl.Result{}, err
- }
- if err := app.Apply(ctx, kubernetes.K8SObj{
- Name: "service",
- Key: client.ObjectKey{Namespace: oapServer.Namespace, Name: oapServer.Name},
- Prototype: &core.Service{},
- Extract: func(obj client.Object) interface{} {
- return obj.(*core.Service).Spec
- },
- }); err != nil {
- return ctrl.Result{}, err
+ for _, f := range ff {
+ l := log.WithName(f)
+ if err := app.Apply(ctx, f, l); err != nil {
+ l.Error(err, "failed to apply resource")
+ return ctrl.Result{}, err
+ }
}
- if err := app.Apply(ctx, kubernetes.K8SObj{
- Name: "deployment",
- Key: client.ObjectKey{Namespace: oapServer.Namespace, Name: oapServer.Name},
- Prototype: &apps.Deployment{},
- TmplFunc: template.FuncMap{
- "generateImage": func() string {
- image := oapServer.Spec.Image
- if image == "" {
- v := oapServer.Spec.Version
- vTmpl := "apache/skywalking-oap-server:%s-%s"
- vES := "es6"
- for _, e := range oapServer.Spec.Config {
- if e.Name != "SW_STORAGE" {
- continue
- }
- if e.Value == "elasticsearch7" {
- vES = "es7"
- }
+
+ r.istio(ctx, log, oapServer.Name, &oapServer)
+
+ return ctrl.Result{RequeueAfter: r.checkState(ctx, log, &oapServer)}, nil
+}
+
+func tmplFunc(oapServer *operatorv1alpha1.OAPServer) template.FuncMap {
+ return template.FuncMap{
+ "generateImage": func() string {
+ image := oapServer.Spec.Image
+ if image == "" {
+ v := oapServer.Spec.Version
+ vTmpl := "apache/skywalking-oap-server:%s-%s"
+ vES := "es6"
+ for _, e := range oapServer.Spec.Config {
+ if e.Name != "SW_STORAGE" {
+ continue
+ }
+ if e.Value == "elasticsearch7" {
+ vES = "es7"
}
- image = fmt.Sprintf(vTmpl, v, vES)
}
- return image
- },
- },
- Extract: func(obj client.Object) interface{} {
- return obj.(*apps.Deployment).Spec
+ image = fmt.Sprintf(vTmpl, v, vES)
+ }
+ return image
},
- }); err != nil {
- return ctrl.Result{}, err
}
- r.istio(ctx, log, oapServer.Name, &oapServer)
-
- return ctrl.Result{RequeueAfter: r.checkState(ctx, log, &oapServer, oapServer.Name)}, nil
}
-func (r *OAPServerReconciler) checkState(ctx context.Context, log logr.Logger, oapServer *operatorv1alpha1.OAPServer, name string) time.Duration {
+func (r *OAPServerReconciler) checkState(ctx context.Context, log logr.Logger, oapServer *operatorv1alpha1.OAPServer) time.Duration {
overlay := operatorv1alpha1.OAPServerStatus{}
deployment := apps.Deployment{}
nextSchedule := schedDuration
- if err := r.Client.Get(ctx, client.ObjectKey{Namespace: oapServer.Namespace, Name: name}, &deployment); err != nil {
+ if err := r.Client.Get(ctx, client.ObjectKey{Namespace: oapServer.Namespace, Name: oapServer.Name}, &deployment); err != nil {
nextSchedule = rushModeSchedDuration
} else {
overlay.Conditions = deployment.Status.Conditions
@@ -148,17 +122,17 @@ func (r *OAPServerReconciler) checkState(ctx context.Context, log logr.Logger, o
if oapServer.Spec.Instances != overlay.AvailableReplicas {
nextSchedule = rushModeSchedDuration
}
- if oapServer.Spec.Image == "" {
+ if oapServer.Spec.Image != deployment.Spec.Template.Spec.Containers[0].Image {
oapServer.Spec.Image = deployment.Spec.Template.Spec.Containers[0].Image
if err := r.Update(ctx, oapServer); err != nil {
log.Error(err, "failed to update OAPServer Image field")
}
- log.Info("updated OAPServer Image field")
- return rushModeSchedDuration
+ log.Info("updated OAPServer Image")
+ nextSchedule = rushModeSchedDuration
}
}
service := core.Service{}
- if err := r.Client.Get(ctx, client.ObjectKey{Namespace: oapServer.Namespace, Name: name}, &service); err != nil {
+ if err := r.Client.Get(ctx, client.ObjectKey{Namespace: oapServer.Namespace, Name: oapServer.Name}, &service); err != nil {
nextSchedule = rushModeSchedDuration
} else {
overlay.Address = fmt.Sprintf("%s.%s", service.Name, service.Namespace)
diff --git a/pkg/kubernetes/apply.go b/pkg/kubernetes/apply.go
index ad73220..5ec48fb 100644
--- a/pkg/kubernetes/apply.go
+++ b/pkg/kubernetes/apply.go
@@ -23,9 +23,9 @@ import (
"text/template"
"github.com/go-logr/logr"
- apiequal "k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/controller-runtime/pkg/client"
)
@@ -33,89 +33,85 @@ import (
// Repo provides tools to access templates
type Repo interface {
ReadFile(path string) ([]byte, error)
+ GetFilesRecursive(path string) ([]string, error)
}
// Application contains the resource of one single component which is applied to api server
type Application struct {
client.Client
CR client.Object
- Log logr.Logger
FileRepo Repo
GVK schema.GroupVersionKind
+ TmplFunc template.FuncMap
}
-// K8SObj is a sub template of an Application
-type K8SObj struct {
- Name string
- Key client.ObjectKey
- Prototype client.Object
- TmplFunc template.FuncMap
- Extract ExtractFunc
-}
-
-type ExtractFunc func(obj client.Object) interface{}
-
// Apply a template represents a component to api server
-func (a *Application) Apply(ctx context.Context, input K8SObj) error {
- log := a.Log.WithName(input.Name)
- manifests, err := a.FileRepo.ReadFile(fmt.Sprintf("%s.yaml", input.Name))
+func (a *Application) Apply(ctx context.Context, manifest string, log logr.Logger) error {
+ manifests, err := a.FileRepo.ReadFile(manifest)
if err != nil {
return err
}
- overlay := input.Prototype.DeepCopyObject().(client.Object)
- current := input.Prototype.DeepCopyObject().(client.Object)
- err = a.Get(ctx, input.Key, current)
+ proto := &unstructured.Unstructured{}
+ err = LoadTemplate(string(manifests), a.CR, a.TmplFunc, proto)
+ if err != nil {
+ return fmt.Errorf("failed to load template: %w", err)
+ }
+ key := client.ObjectKeyFromObject(proto)
+ current := &unstructured.Unstructured{}
+ current.SetGroupVersionKind(proto.GetObjectKind().GroupVersionKind())
+ err = a.Get(ctx, key, current)
if apierrors.IsNotFound(err) {
log.Info("could not find existing resource, creating one...")
-
- curr, errComp := compose(string(manifests), current, overlay, a.CR, a.GVK, input.TmplFunc)
+ curr, errComp := a.compose(proto)
if errComp != nil {
- return errComp
+ return fmt.Errorf("failed to compose: %w", errComp)
}
if err = a.Create(ctx, curr); err != nil {
- return err
+ return fmt.Errorf("failed to create: %w", errComp)
}
- log.Info("created resource")
+ log.Info("created")
return nil
}
if err != nil {
- return err
- }
- if input.Extract == nil {
- return nil
+ return fmt.Errorf("failed to get %v : %w", key, err)
}
- object, err := compose(string(manifests), current, overlay, a.CR, a.GVK, input.TmplFunc)
+ object, err := a.compose(proto)
if err != nil {
- return err
+ return fmt.Errorf("failed to compose: %w", err)
}
- if apiequal.Semantic.DeepDerivative(input.Extract(object), input.Extract(current)) {
+ if getVersion(current, a.versionKey()) == getVersion(object, a.versionKey()) {
log.Info("resource keeps the same as before")
return nil
}
if err := a.Update(ctx, object); err != nil {
- return err
+ return fmt.Errorf("failed to update: %w", err)
}
- log.Info("updated resource")
+ log.Info("updated")
return nil
}
-func compose(manifests string, base, overlay, cr client.Object,
- gvk schema.GroupVersionKind, tmplFunc template.FuncMap) (client.Object, error) {
- err := LoadTemplate(manifests, cr, tmplFunc, overlay)
+func (a *Application) setVersionAnnotation(o *unstructured.Unstructured) error {
+ h, err := hash(o)
if err != nil {
- return nil, err
+ return err
}
+ setVersion(o, a.versionKey(), h)
+ return nil
+}
- overlay.SetOwnerReferences([]metav1.OwnerReference{*metav1.NewControllerRef(cr, gvk)})
- if base == nil {
- return overlay, nil
- }
- if err := ApplyOverlay(base, overlay); err != nil {
+func (a *Application) versionKey() string {
+ return a.GVK.Group + "/version"
+}
+
+func (a *Application) compose(object *unstructured.Unstructured) (*unstructured.Unstructured, error) {
+ object.SetOwnerReferences([]metav1.OwnerReference{*metav1.NewControllerRef(a.CR, a.GVK)})
+ err := a.setVersionAnnotation(object)
+ if err != nil {
return nil, err
}
- return base, nil
+ return object, nil
}
diff --git a/pkg/kubernetes/object.go b/pkg/kubernetes/object.go
new file mode 100644
index 0000000..0a1a5b9
--- /dev/null
+++ b/pkg/kubernetes/object.go
@@ -0,0 +1,95 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package kubernetes
+
+import (
+ "encoding/hex"
+ "encoding/json"
+ "hash/fnv"
+ "strings"
+
+ "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
+)
+
+func getVersion(o *unstructured.Unstructured, key string) string {
+ ann := o.GetAnnotations()
+ if ann == nil {
+ return ""
+ }
+ return ann[key]
+}
+
+func setVersion(o *unstructured.Unstructured, key string, version string) {
+ ann := make(map[string]string)
+ for k, v := range o.GetAnnotations() {
+ ann[k] = v
+ }
+ ann[key] = version
+ o.SetAnnotations(ann)
+}
+
+func hash(o *unstructured.Unstructured) (string, error) {
+ forHashing := make(map[string]interface{})
+ for field, contents := range o.Object {
+ if !isMeta(field) {
+ forHashing[field] = contents
+ }
+ }
+ if len(forHashing) == 0 {
+ forHashing = map[string]interface{}{"no-contents-use-meta": metaHash(o)}
+ }
+ bytes, err := json.Marshal(forHashing)
+ if err != nil {
+ return "", err
+ }
+ return alphanumeric(bytes), nil
+}
+
+func metaHash(o *unstructured.Unstructured) string {
+ if o == nil {
+ return "-"
+ }
+ kind := o.GetObjectKind().GroupVersionKind().Kind
+ namespace := o.GetNamespace()
+ switch kind {
+ case "ClusterRole", "ClusterRoleBinding":
+ namespace = ""
+ }
+ return strings.Join([]string{kind, namespace, o.GetName()}, ":")
+}
+
+func isMeta(name string) bool {
+ switch name {
+ case "kind", "apiVersion", "metadata":
+ return true
+
+ default:
+ return false
+ }
+}
+
+func alphanumeric(in []byte) string {
+ if in == nil {
+ return ""
+ }
+
+ hash := fnv.New64a()
+ _, _ = hash.Write(in)
+ out := hash.Sum(make([]byte, 0, 8))
+ return hex.EncodeToString(out)
+}
diff --git a/pkg/operator/repo/repo.go b/pkg/operator/repo/repo.go
index ffba7d5..5c300f4 100644
--- a/pkg/operator/repo/repo.go
+++ b/pkg/operator/repo/repo.go
@@ -17,7 +17,16 @@
package repo
-import "fmt"
+import (
+ "fmt"
+ "os"
+ "path/filepath"
+ "time"
+
+ "github.com/apache/skywalking-swck/pkg/kubernetes"
+)
+
+var _ kubernetes.Repo = &AssetsRepo{}
// AssetsRepo provides templates through assets
type AssetsRepo struct {
@@ -25,10 +34,84 @@ type AssetsRepo struct {
}
func NewRepo(component string) *AssetsRepo {
- return &AssetsRepo{Root: fmt.Sprintf("%s/templates", component)}
+ return &AssetsRepo{Root: component}
}
// ReadFile reads the content of compiled in files at path and returns a buffer with the data.
-func (v *AssetsRepo) ReadFile(path string) ([]byte, error) {
- return Asset(fmt.Sprintf("%s/%s", v.Root, path))
+func (a *AssetsRepo) ReadFile(path string) ([]byte, error) {
+ return Asset(path)
+}
+
+func (a *AssetsRepo) GetFilesRecursive(path string) ([]string, error) {
+ ap := fmt.Sprintf("%s/%s", a.Root, path)
+ rootFI, err := Stat(ap)
+ if err != nil {
+ return nil, err
+ }
+ return getFilesRecursive(filepath.Dir(ap), rootFI)
+}
+
+func getFilesRecursive(prefix string, root os.FileInfo) ([]string, error) {
+ if !root.IsDir() {
+ return nil, fmt.Errorf("not a dir: %s", root.Name())
+ }
+ prefix = filepath.Join(prefix, root.Name())
+ fs, _ := AssetDir(prefix)
+ out := make([]string, 0)
+ for _, f := range fs {
+ info, err := Stat(filepath.Join(prefix, f))
+ if err != nil {
+ return nil, err
+ }
+ if !info.IsDir() {
+ out = append(out, filepath.Join(prefix, filepath.Base(info.Name())))
+ continue
+ }
+ nfs, err := getFilesRecursive(prefix, info)
+ if err != nil {
+ return nil, err
+ }
+ out = append(out, nfs...)
+ }
+ return out, nil
+}
+
+// Stat returns a FileInfo object for the given path.
+func Stat(path string) (os.FileInfo, error) {
+ info, err := AssetInfo(path)
+ if err != nil {
+ // try it as a directory instead
+ _, err = AssetDir(path)
+ if err == nil {
+ info = &dirInfo{name: filepath.Base(path)}
+ }
+ } else {
+ fi := info.(bindataFileInfo)
+ fi.name = filepath.Base(fi.name)
+ }
+
+ return info, err
+}
+
+type dirInfo struct {
+ name string
+}
+
+func (di dirInfo) Name() string {
+ return di.name
+}
+func (di dirInfo) Size() int64 {
+ return 0
+}
+func (di dirInfo) Mode() os.FileMode {
+ return os.FileMode(0)
+}
+func (di dirInfo) ModTime() time.Time {
+ return time.Unix(0, 0)
+}
+func (di dirInfo) IsDir() bool {
+ return true
+}
+func (di dirInfo) Sys() interface{} {
+ return nil
}