You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ke...@apache.org on 2021/04/02 13:26:52 UTC

[skywalking-kubernetes-event-exporter] 01/01: feature: provide more template context objects

This is an automated email from the ASF dual-hosted git repository.

kezhenxu94 pushed a commit to branch feature/more-ctx
in repository https://gitbox.apache.org/repos/asf/skywalking-kubernetes-event-exporter.git

commit 9a2d18e8535c384400a0b11cd00cd864e048cade
Author: kezhenxu94 <ke...@apache.org>
AuthorDate: Fri Apr 2 21:26:38 2021 +0800

    feature: provide more template context objects
---
 pkg/exporter/skywalking.go |  61 ++++++++----------
 pkg/filter/filter.go       |  34 ----------
 pkg/k8s/registry.go        | 153 +++++++++++++++++++++++++++++++++++++++++++++
 pkg/pipe/pipe.go           |   8 +++
 4 files changed, 187 insertions(+), 69 deletions(-)

diff --git a/pkg/exporter/skywalking.go b/pkg/exporter/skywalking.go
index f713a14..41aa11b 100644
--- a/pkg/exporter/skywalking.go
+++ b/pkg/exporter/skywalking.go
@@ -24,8 +24,10 @@ import (
 	"context"
 	"encoding/json"
 	"fmt"
+	"html/template"
 	"time"
 
+	"github.com/apache/skywalking-kubernetes-event-exporter/pkg/k8s"
 	sw "skywalking.apache.org/repo/goapi/collect/event/v3"
 
 	"google.golang.org/grpc"
@@ -147,51 +149,40 @@ func (exporter *SkyWalking) Export(events chan *k8score.Event) {
 	}()
 }
 
-func (tmplt *EventTemplate) Render(event *sw.Event, data *k8score.Event) error {
-	var buf bytes.Buffer
+func (tmplt *EventTemplate) Render(event *sw.Event, kEvent *k8score.Event) error {
+	templateCtx := k8s.Registry.GetContext(kEvent)
 
-	// Render Event Message
-	if t := tmplt.messageTemplate; t != nil {
-		buf.Reset()
-		if err := t.Execute(&buf, data); err != nil {
+	render := func(t *template.Template, destination *string) error {
+		if t == nil {
+			return nil
+		}
+
+		var buf bytes.Buffer
+
+		if err := t.Execute(&buf, templateCtx); err != nil {
 			return err
 		}
+
 		if buf.Len() > 0 {
-			event.Message = buf.String()
+			*destination = buf.String()
 		}
+
+		return nil
+	}
+
+	if err := render(tmplt.messageTemplate, &event.Message); err != nil {
+		return err
 	}
 
-	// Render Event Source
 	if tmplt.sourceTemplate != nil {
-		// Render Event Source Service
-		if t := tmplt.sourceTemplate.serviceTemplate; t != nil {
-			buf.Reset()
-			if err := t.Execute(&buf, data); err != nil {
-				return err
-			}
-			if buf.Len() > 0 {
-				event.Source.Service = buf.String()
-			}
+		if err := render(tmplt.sourceTemplate.serviceTemplate, &event.Source.Service); err != nil {
+			return err
 		}
-		// Render Event Source Service
-		if t := tmplt.sourceTemplate.serviceInstanceTemplate; t != nil {
-			buf.Reset()
-			if err := t.Execute(&buf, data); err != nil {
-				return err
-			}
-			if buf.Len() > 0 {
-				event.Source.ServiceInstance = buf.String()
-			}
+		if err := render(tmplt.sourceTemplate.serviceInstanceTemplate, &event.Source.ServiceInstance); err != nil {
+			return err
 		}
-		// Render Event Source Endpoint
-		if t := tmplt.sourceTemplate.endpointTemplate; t != nil {
-			buf.Reset()
-			if err := t.Execute(&buf, data); err != nil {
-				return err
-			}
-			if buf.Len() > 0 {
-				event.Source.Endpoint = buf.String()
-			}
+		if err := render(tmplt.sourceTemplate.endpointTemplate, &event.Source.Endpoint); err != nil {
+			return err
 		}
 	}
 
diff --git a/pkg/filter/filter.go b/pkg/filter/filter.go
deleted file mode 100644
index e23e6fa..0000000
--- a/pkg/filter/filter.go
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to Apache Software Foundation (ASF) under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Apache Software Foundation (ASF) licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package filter
-
-type Filter struct {
-	Reason   string `yaml:"reason"`
-	Message  string `yaml:"message"`
-	MinCount int32  `yaml:"min-count"`
-	Type     string `yaml:"type"`
-	Action   string `yaml:"action"`
-
-	Kind      string `yaml:"kind"`
-	Namespace string `yaml:"namespace"`
-	Name      string `yaml:"name"`
-
-	Exporter string `yaml:"exporter"`
-}
diff --git a/pkg/k8s/registry.go b/pkg/k8s/registry.go
new file mode 100644
index 0000000..99857e8
--- /dev/null
+++ b/pkg/k8s/registry.go
@@ -0,0 +1,153 @@
+/*
+ * Licensed to Apache Software Foundation (ASF) under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Apache Software Foundation (ASF) licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package k8s
+
+import (
+	"github.com/apache/skywalking-kubernetes-event-exporter/internal/pkg/logger"
+	corev1 "k8s.io/api/core/v1"
+	"k8s.io/client-go/informers"
+	"k8s.io/client-go/kubernetes"
+	"k8s.io/client-go/tools/cache"
+)
+
+type id struct {
+	namespace string
+	name      string
+}
+
+type registry struct {
+	informers []cache.SharedIndexInformer
+	stopCh    chan struct{}
+
+	podIdIpMap map[id]string
+	idSvcMap   map[id]*corev1.Service
+	idPodMap   map[id]*corev1.Pod
+	ipSvcIdMap map[string]id
+}
+
+func (r registry) OnAdd(obj interface{}) {
+	if pod, ok := obj.(*corev1.Pod); ok {
+		podID := id{namespace: pod.Namespace, name: pod.Name}
+		r.podIdIpMap[podID] = pod.Status.PodIP
+		r.idPodMap[podID] = pod
+	} else if svc, ok := obj.(*corev1.Service); ok {
+		r.idSvcMap[id{namespace: svc.Namespace, name: svc.Name}] = svc
+	} else if endpoints, ok := obj.(*corev1.Endpoints); ok {
+		for _, subset := range endpoints.Subsets {
+			for _, address := range subset.Addresses {
+				r.ipSvcIdMap[address.IP] = id{
+					namespace: endpoints.ObjectMeta.Namespace,
+					name:      endpoints.ObjectMeta.Name,
+				}
+			}
+		}
+	}
+}
+
+func (r registry) OnUpdate(oldObj, newObj interface{}) {
+	r.OnDelete(oldObj)
+	r.OnAdd(newObj)
+}
+
+func (r registry) OnDelete(obj interface{}) {
+	if pod, ok := obj.(*corev1.Pod); ok {
+		podID := id{namespace: pod.Namespace, name: pod.Name}
+		delete(r.podIdIpMap, podID)
+		delete(r.idPodMap, podID)
+	} else if svc, ok := obj.(*corev1.Service); ok {
+		delete(r.idSvcMap, id{namespace: svc.Namespace, name: svc.Name})
+	} else if endpoints, ok := obj.(*corev1.Endpoints); ok {
+		for _, subset := range endpoints.Subsets {
+			for _, address := range subset.Addresses {
+				delete(r.ipSvcIdMap, address.IP)
+			}
+		}
+	}
+}
+
+func (r *registry) Start() {
+	logger.Log.Debugf("starting registry")
+
+	for _, informer := range r.informers {
+		go informer.Run(r.stopCh)
+	}
+}
+
+func (r *registry) Stop() {
+	logger.Log.Debugf("stopping registry")
+
+	r.stopCh <- struct{}{}
+	close(r.stopCh)
+}
+
+type TemplateContext struct {
+	Service *corev1.Service
+	Pod     *corev1.Pod
+	Event   *corev1.Event
+}
+
+func (r *registry) GetContext(e *corev1.Event) TemplateContext {
+	result := TemplateContext{Event: e}
+
+	if obj := e.InvolvedObject; obj.Kind == "Pod" {
+		podID := id{
+			namespace: obj.Namespace,
+			name:      obj.Name,
+		}
+		result.Pod = r.idPodMap[podID]
+	}
+
+	if obj := e.InvolvedObject; obj.Kind == "Service" {
+		svcID := id{
+			namespace: obj.Namespace,
+			name:      obj.Name,
+		}
+		result.Service = r.idSvcMap[svcID]
+	}
+
+	return result
+}
+
+var Registry = &registry{
+	stopCh: make(chan struct{}),
+}
+
+func (r *registry) Init() error {
+	logger.Log.Debugf("initializing template context registry")
+
+	config, err := GetConfig()
+	if err != nil {
+		return err
+	}
+	client := kubernetes.NewForConfigOrDie(config)
+	factory := informers.NewSharedInformerFactoryWithOptions(client, 0, informers.WithNamespace(corev1.NamespaceAll))
+
+	r.informers = []cache.SharedIndexInformer{
+		factory.Core().V1().Endpoints().Informer(),
+		factory.Core().V1().Services().Informer(),
+		factory.Core().V1().Pods().Informer(),
+	}
+
+	for _, informer := range Registry.informers {
+		informer.AddEventHandler(Registry)
+	}
+
+	return nil
+}
diff --git a/pkg/pipe/pipe.go b/pkg/pipe/pipe.go
index 4b258e0..5f32c00 100644
--- a/pkg/pipe/pipe.go
+++ b/pkg/pipe/pipe.go
@@ -80,6 +80,10 @@ func (p *Pipe) Init() error {
 		}
 	}
 
+	if err := k8s.Registry.Init(); err != nil {
+		return err
+	}
+
 	logger.Log.Debugf("pipe has been initialized")
 
 	return nil
@@ -88,6 +92,8 @@ func (p *Pipe) Init() error {
 func (p *Pipe) Start() error {
 	p.Watcher.Start()
 
+	k8s.Registry.Start()
+
 	for _, wkfl := range p.workflows {
 		go wkfl.exporter.Export(wkfl.events)
 	}
@@ -120,6 +126,8 @@ func (p *Pipe) Stop() {
 		w.exporter.Stop()
 	}
 
+	k8s.Registry.Stop()
+
 	p.stopper <- struct{}{}
 	close(p.stopper)
 }