You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ke...@apache.org on 2021/04/02 13:26:51 UTC

[skywalking-kubernetes-event-exporter] branch feature/more-ctx created (now 9a2d18e)

This is an automated email from the ASF dual-hosted git repository.

kezhenxu94 pushed a change to branch feature/more-ctx
in repository https://gitbox.apache.org/repos/asf/skywalking-kubernetes-event-exporter.git.


      at 9a2d18e  feature: provide more template context objects

This branch includes the following new commits:

     new 9a2d18e  feature: provide more template context objects

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[skywalking-kubernetes-event-exporter] 01/01: feature: provide more template context objects

Posted by ke...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kezhenxu94 pushed a commit to branch feature/more-ctx
in repository https://gitbox.apache.org/repos/asf/skywalking-kubernetes-event-exporter.git

commit 9a2d18e8535c384400a0b11cd00cd864e048cade
Author: kezhenxu94 <ke...@apache.org>
AuthorDate: Fri Apr 2 21:26:38 2021 +0800

    feature: provide more template context objects
---
 pkg/exporter/skywalking.go |  61 ++++++++----------
 pkg/filter/filter.go       |  34 ----------
 pkg/k8s/registry.go        | 153 +++++++++++++++++++++++++++++++++++++++++++++
 pkg/pipe/pipe.go           |   8 +++
 4 files changed, 187 insertions(+), 69 deletions(-)

diff --git a/pkg/exporter/skywalking.go b/pkg/exporter/skywalking.go
index f713a14..41aa11b 100644
--- a/pkg/exporter/skywalking.go
+++ b/pkg/exporter/skywalking.go
@@ -24,8 +24,10 @@ import (
 	"context"
 	"encoding/json"
 	"fmt"
+	"html/template"
 	"time"
 
+	"github.com/apache/skywalking-kubernetes-event-exporter/pkg/k8s"
 	sw "skywalking.apache.org/repo/goapi/collect/event/v3"
 
 	"google.golang.org/grpc"
@@ -147,51 +149,40 @@ func (exporter *SkyWalking) Export(events chan *k8score.Event) {
 	}()
 }
 
-func (tmplt *EventTemplate) Render(event *sw.Event, data *k8score.Event) error {
-	var buf bytes.Buffer
+func (tmplt *EventTemplate) Render(event *sw.Event, kEvent *k8score.Event) error {
+	templateCtx := k8s.Registry.GetContext(kEvent)
 
-	// Render Event Message
-	if t := tmplt.messageTemplate; t != nil {
-		buf.Reset()
-		if err := t.Execute(&buf, data); err != nil {
+	render := func(t *template.Template, destination *string) error {
+		if t == nil {
+			return nil
+		}
+
+		var buf bytes.Buffer
+
+		if err := t.Execute(&buf, templateCtx); err != nil {
 			return err
 		}
+
 		if buf.Len() > 0 {
-			event.Message = buf.String()
+			*destination = buf.String()
 		}
+
+		return nil
+	}
+
+	if err := render(tmplt.messageTemplate, &event.Message); err != nil {
+		return err
 	}
 
-	// Render Event Source
 	if tmplt.sourceTemplate != nil {
-		// Render Event Source Service
-		if t := tmplt.sourceTemplate.serviceTemplate; t != nil {
-			buf.Reset()
-			if err := t.Execute(&buf, data); err != nil {
-				return err
-			}
-			if buf.Len() > 0 {
-				event.Source.Service = buf.String()
-			}
+		if err := render(tmplt.sourceTemplate.serviceTemplate, &event.Source.Service); err != nil {
+			return err
 		}
-		// Render Event Source Service
-		if t := tmplt.sourceTemplate.serviceInstanceTemplate; t != nil {
-			buf.Reset()
-			if err := t.Execute(&buf, data); err != nil {
-				return err
-			}
-			if buf.Len() > 0 {
-				event.Source.ServiceInstance = buf.String()
-			}
+		if err := render(tmplt.sourceTemplate.serviceInstanceTemplate, &event.Source.ServiceInstance); err != nil {
+			return err
 		}
-		// Render Event Source Endpoint
-		if t := tmplt.sourceTemplate.endpointTemplate; t != nil {
-			buf.Reset()
-			if err := t.Execute(&buf, data); err != nil {
-				return err
-			}
-			if buf.Len() > 0 {
-				event.Source.Endpoint = buf.String()
-			}
+		if err := render(tmplt.sourceTemplate.endpointTemplate, &event.Source.Endpoint); err != nil {
+			return err
 		}
 	}
 
diff --git a/pkg/filter/filter.go b/pkg/filter/filter.go
deleted file mode 100644
index e23e6fa..0000000
--- a/pkg/filter/filter.go
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to Apache Software Foundation (ASF) under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Apache Software Foundation (ASF) licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package filter
-
-type Filter struct {
-	Reason   string `yaml:"reason"`
-	Message  string `yaml:"message"`
-	MinCount int32  `yaml:"min-count"`
-	Type     string `yaml:"type"`
-	Action   string `yaml:"action"`
-
-	Kind      string `yaml:"kind"`
-	Namespace string `yaml:"namespace"`
-	Name      string `yaml:"name"`
-
-	Exporter string `yaml:"exporter"`
-}
diff --git a/pkg/k8s/registry.go b/pkg/k8s/registry.go
new file mode 100644
index 0000000..99857e8
--- /dev/null
+++ b/pkg/k8s/registry.go
@@ -0,0 +1,153 @@
+/*
+ * Licensed to Apache Software Foundation (ASF) under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Apache Software Foundation (ASF) licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package k8s
+
+import (
+	"github.com/apache/skywalking-kubernetes-event-exporter/internal/pkg/logger"
+	corev1 "k8s.io/api/core/v1"
+	"k8s.io/client-go/informers"
+	"k8s.io/client-go/kubernetes"
+	"k8s.io/client-go/tools/cache"
+)
+
+type id struct {
+	namespace string
+	name      string
+}
+
+type registry struct {
+	informers []cache.SharedIndexInformer
+	stopCh    chan struct{}
+
+	podIdIpMap map[id]string
+	idSvcMap   map[id]*corev1.Service
+	idPodMap   map[id]*corev1.Pod
+	ipSvcIdMap map[string]id
+}
+
+func (r registry) OnAdd(obj interface{}) {
+	if pod, ok := obj.(*corev1.Pod); ok {
+		podID := id{namespace: pod.Namespace, name: pod.Name}
+		r.podIdIpMap[podID] = pod.Status.PodIP
+		r.idPodMap[podID] = pod
+	} else if svc, ok := obj.(*corev1.Service); ok {
+		r.idSvcMap[id{namespace: svc.Namespace, name: svc.Name}] = svc
+	} else if endpoints, ok := obj.(*corev1.Endpoints); ok {
+		for _, subset := range endpoints.Subsets {
+			for _, address := range subset.Addresses {
+				r.ipSvcIdMap[address.IP] = id{
+					namespace: endpoints.ObjectMeta.Namespace,
+					name:      endpoints.ObjectMeta.Name,
+				}
+			}
+		}
+	}
+}
+
+func (r registry) OnUpdate(oldObj, newObj interface{}) {
+	r.OnDelete(oldObj)
+	r.OnAdd(newObj)
+}
+
+func (r registry) OnDelete(obj interface{}) {
+	if pod, ok := obj.(*corev1.Pod); ok {
+		podID := id{namespace: pod.Namespace, name: pod.Name}
+		delete(r.podIdIpMap, podID)
+		delete(r.idPodMap, podID)
+	} else if svc, ok := obj.(*corev1.Service); ok {
+		delete(r.idSvcMap, id{namespace: svc.Namespace, name: svc.Name})
+	} else if endpoints, ok := obj.(*corev1.Endpoints); ok {
+		for _, subset := range endpoints.Subsets {
+			for _, address := range subset.Addresses {
+				delete(r.ipSvcIdMap, address.IP)
+			}
+		}
+	}
+}
+
+func (r *registry) Start() {
+	logger.Log.Debugf("starting registry")
+
+	for _, informer := range r.informers {
+		go informer.Run(r.stopCh)
+	}
+}
+
+func (r *registry) Stop() {
+	logger.Log.Debugf("stopping registry")
+
+	r.stopCh <- struct{}{}
+	close(r.stopCh)
+}
+
+type TemplateContext struct {
+	Service *corev1.Service
+	Pod     *corev1.Pod
+	Event   *corev1.Event
+}
+
+func (r *registry) GetContext(e *corev1.Event) TemplateContext {
+	result := TemplateContext{Event: e}
+
+	if obj := e.InvolvedObject; obj.Kind == "Pod" {
+		podID := id{
+			namespace: obj.Namespace,
+			name:      obj.Name,
+		}
+		result.Pod = r.idPodMap[podID]
+	}
+
+	if obj := e.InvolvedObject; obj.Kind == "Service" {
+		svcID := id{
+			namespace: obj.Namespace,
+			name:      obj.Name,
+		}
+		result.Service = r.idSvcMap[svcID]
+	}
+
+	return result
+}
+
+var Registry = &registry{
+	stopCh: make(chan struct{}),
+}
+
+func (r *registry) Init() error {
+	logger.Log.Debugf("initializing template context registry")
+
+	config, err := GetConfig()
+	if err != nil {
+		return err
+	}
+	client := kubernetes.NewForConfigOrDie(config)
+	factory := informers.NewSharedInformerFactoryWithOptions(client, 0, informers.WithNamespace(corev1.NamespaceAll))
+
+	r.informers = []cache.SharedIndexInformer{
+		factory.Core().V1().Endpoints().Informer(),
+		factory.Core().V1().Services().Informer(),
+		factory.Core().V1().Pods().Informer(),
+	}
+
+	for _, informer := range Registry.informers {
+		informer.AddEventHandler(Registry)
+	}
+
+	return nil
+}
diff --git a/pkg/pipe/pipe.go b/pkg/pipe/pipe.go
index 4b258e0..5f32c00 100644
--- a/pkg/pipe/pipe.go
+++ b/pkg/pipe/pipe.go
@@ -80,6 +80,10 @@ func (p *Pipe) Init() error {
 		}
 	}
 
+	if err := k8s.Registry.Init(); err != nil {
+		return err
+	}
+
 	logger.Log.Debugf("pipe has been initialized")
 
 	return nil
@@ -88,6 +92,8 @@ func (p *Pipe) Init() error {
 func (p *Pipe) Start() error {
 	p.Watcher.Start()
 
+	k8s.Registry.Start()
+
 	for _, wkfl := range p.workflows {
 		go wkfl.exporter.Export(wkfl.events)
 	}
@@ -120,6 +126,8 @@ func (p *Pipe) Stop() {
 		w.exporter.Stop()
 	}
 
+	k8s.Registry.Stop()
+
 	p.stopper <- struct{}{}
 	close(p.stopper)
 }