You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ke...@apache.org on 2021/04/03 02:13:37 UTC

[skywalking-kubernetes-event-exporter] branch feature/more-ctx updated (a508171 -> d03d435)

This is an automated email from the ASF dual-hosted git repository.

kezhenxu94 pushed a change to branch feature/more-ctx
in repository https://gitbox.apache.org/repos/asf/skywalking-kubernetes-event-exporter.git.


 discard a508171  feature: provide more template context objects
     new d03d435  feature: provide more template context objects

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (a508171)
            \
             N -- N -- N   refs/heads/feature/more-ctx (d03d435)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 assets/default-config.yaml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

[skywalking-kubernetes-event-exporter] 01/01: feature: provide more template context objects

Posted by ke...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kezhenxu94 pushed a commit to branch feature/more-ctx
in repository https://gitbox.apache.org/repos/asf/skywalking-kubernetes-event-exporter.git

commit d03d435c31f5684df6f5fd84d5b214ed42647878
Author: kezhenxu94 <ke...@apache.org>
AuthorDate: Fri Apr 2 21:26:38 2021 +0800

    feature: provide more template context objects
---
 .github/workflows/build-and-test.yaml |   2 +-
 assets/default-config.yaml            |   6 +-
 configs/config.go                     |   5 +-
 configs/config_test.go                |   2 +-
 pkg/event/event.go                    |   6 +-
 pkg/exporter/exporter.go              |  28 +++---
 pkg/exporter/skywalking.go            |  75 +++++++--------
 pkg/exporter/util.go                  |   3 +-
 pkg/filter/filter.go                  |  34 -------
 pkg/k8s/event.go                      |   3 +-
 pkg/k8s/registry.go                   | 176 ++++++++++++++++++++++++++++++++++
 pkg/pipe/pipe.go                      |   8 ++
 12 files changed, 244 insertions(+), 104 deletions(-)

diff --git a/.github/workflows/build-and-test.yaml b/.github/workflows/build-and-test.yaml
index fb60450..7f2913f 100644
--- a/.github/workflows/build-and-test.yaml
+++ b/.github/workflows/build-and-test.yaml
@@ -43,7 +43,7 @@ jobs:
         run: make test
 
       - name: Build
-        run: make build
+        run: make
 
   gateway:
     name: Gateway
diff --git a/assets/default-config.yaml b/assets/default-config.yaml
index 556ba43..fa8fa32 100644
--- a/assets/default-config.yaml
+++ b/assets/default-config.yaml
@@ -25,8 +25,8 @@ exporters:
   skywalking:
     template:
       source:
-        service: ""
-        service-instance: ""
+        service: "{{ .Service.Name }}"
+        serviceInstance: "{{ .Pod.Name }}"
         endpoint: ""
-      message: ""
+      message: "{{ .Event.Message }}" # this is default, just to demonstrate the context
     address: "127.0.0.1:11800"
diff --git a/configs/config.go b/configs/config.go
index 4a7c262..13a69f4 100644
--- a/configs/config.go
+++ b/configs/config.go
@@ -22,10 +22,11 @@ package configs
 import (
 	"regexp"
 
-	"github.com/apache/skywalking-kubernetes-event-exporter/internal/pkg/logger"
 	"gopkg.in/yaml.v3"
 	v1 "k8s.io/api/core/v1"
 
+	"github.com/apache/skywalking-kubernetes-event-exporter/internal/pkg/logger"
+
 	evnt "github.com/apache/skywalking-kubernetes-event-exporter/pkg/event"
 )
 
@@ -50,7 +51,7 @@ type FilterConfig struct {
 }
 
 func (filter *FilterConfig) Init() {
-	logger.Log.Debugf("initalizing filter config")
+	logger.Log.Debugf("initializing filter config")
 
 	filter.reasonRegExp = regexp.MustCompile(filter.Reason)
 	filter.messageRegExp = regexp.MustCompile(filter.Message)
diff --git a/configs/config_test.go b/configs/config_test.go
index 2b9d467..288edf3 100644
--- a/configs/config_test.go
+++ b/configs/config_test.go
@@ -22,7 +22,7 @@ package configs
 import (
 	"testing"
 
-	"k8s.io/api/core/v1"
+	v1 "k8s.io/api/core/v1"
 )
 
 func TestFilterConfig_Filter(t *testing.T) {
diff --git a/pkg/event/event.go b/pkg/event/event.go
index 7720ce9..b461205 100644
--- a/pkg/event/event.go
+++ b/pkg/event/event.go
@@ -22,9 +22,9 @@ package event
 import v1 "k8s.io/api/core/v1"
 
 type Source struct {
-	Service         string `json:"service"`
-	ServiceInstance string `json:"serviceInstance"`
-	Endpoint        string `json:"endpoint"`
+	Service         string `mapstructure:"service"`
+	ServiceInstance string `mapstructure:"serviceInstance"`
+	Endpoint        string `mapstructure:"endpoint"`
 }
 
 var Stopper = (*v1.Event)(nil)
diff --git a/pkg/exporter/exporter.go b/pkg/exporter/exporter.go
index 7ec53e3..bc3ba15 100644
--- a/pkg/exporter/exporter.go
+++ b/pkg/exporter/exporter.go
@@ -56,8 +56,8 @@ type SourceTemplate struct {
 }
 
 type EventTemplate struct {
-	Source          *event.Source `mapstructure:"source"`
-	sourceTemplate  *SourceTemplate
+	Source          event.Source `mapstructure:"source"`
+	sourceTemplate  SourceTemplate
 	Message         string `mapstructure:"message"`
 	messageTemplate *template.Template
 }
@@ -69,21 +69,19 @@ func (tmplt *EventTemplate) Init() (err error) {
 		}
 	}
 
-	if tmplt.Source != nil {
-		if tmplt.Source.Service != "" {
-			if tmplt.sourceTemplate.serviceTemplate, err = template.New("EventSourceServiceTemplate").Parse(tmplt.Source.Service); err != nil {
-				return err
-			}
+	if t := tmplt.Source.Service; t != "" {
+		if tmplt.sourceTemplate.serviceTemplate, err = template.New("EventSourceServiceTemplate").Parse(t); err != nil {
+			return err
 		}
-		if tmplt.Source.ServiceInstance != "" {
-			if tmplt.sourceTemplate.serviceInstanceTemplate, err = template.New("EventServiceInstanceTemplate").Parse(tmplt.Source.ServiceInstance); err != nil {
-				return err
-			}
+	}
+	if t := tmplt.Source.ServiceInstance; t != "" {
+		if tmplt.sourceTemplate.serviceInstanceTemplate, err = template.New("EventServiceInstanceTemplate").Parse(t); err != nil {
+			return err
 		}
-		if tmplt.Source.Endpoint != "" {
-			if tmplt.sourceTemplate.endpointTemplate, err = template.New("EventEndpointTemplate").Parse(tmplt.Source.Endpoint); err != nil {
-				return err
-			}
+	}
+	if t := tmplt.Source.Endpoint; t != "" {
+		if tmplt.sourceTemplate.endpointTemplate, err = template.New("EventEndpointTemplate").Parse(t); err != nil {
+			return err
 		}
 	}
 
diff --git a/pkg/exporter/skywalking.go b/pkg/exporter/skywalking.go
index f713a14..9fcd392 100644
--- a/pkg/exporter/skywalking.go
+++ b/pkg/exporter/skywalking.go
@@ -24,10 +24,13 @@ import (
 	"context"
 	"encoding/json"
 	"fmt"
+	"html/template"
 	"time"
 
 	sw "skywalking.apache.org/repo/goapi/collect/event/v3"
 
+	"github.com/apache/skywalking-kubernetes-event-exporter/pkg/k8s"
+
 	"google.golang.org/grpc"
 	k8score "k8s.io/api/core/v1"
 
@@ -135,9 +138,8 @@ func (exporter *SkyWalking) Export(events chan *k8score.Event) {
 					EndTime:   kEvent.LastTimestamp.Unix() / 1000000,
 				}
 				if exporter.config.Template != nil {
-					if err := exporter.config.Template.Render(swEvent, kEvent); err != nil {
-						logger.Log.Warnf("failed to render the template, using the default event content. %+v", err)
-					}
+					exporter.config.Template.Render(swEvent, kEvent)
+					logger.Log.Debugf("rendered event is: %+v", swEvent)
 				}
 				if err := stream.Send(swEvent); err != nil {
 					logger.Log.Errorf("failed to send event to %+v. %+v", exporter.Name(), err)
@@ -147,55 +149,42 @@ func (exporter *SkyWalking) Export(events chan *k8score.Event) {
 	}()
 }
 
-func (tmplt *EventTemplate) Render(event *sw.Event, data *k8score.Event) error {
-	var buf bytes.Buffer
+func (tmplt *EventTemplate) Render(swEvent *sw.Event, kEvent *k8score.Event) {
+	templateCtx := k8s.Registry.GetContext(kEvent)
+
+	logger.Log.Debugf("template context %+v", templateCtx)
+
+	render := func(t *template.Template, destination *string) error {
+		if t == nil {
+			return nil
+		}
+
+		var buf bytes.Buffer
 
-	// Render Event Message
-	if t := tmplt.messageTemplate; t != nil {
-		buf.Reset()
-		if err := t.Execute(&buf, data); err != nil {
+		if err := t.Execute(&buf, templateCtx); err != nil {
 			return err
 		}
+
 		if buf.Len() > 0 {
-			event.Message = buf.String()
+			*destination = buf.String()
 		}
+
+		return nil
 	}
 
-	// Render Event Source
-	if tmplt.sourceTemplate != nil {
-		// Render Event Source Service
-		if t := tmplt.sourceTemplate.serviceTemplate; t != nil {
-			buf.Reset()
-			if err := t.Execute(&buf, data); err != nil {
-				return err
-			}
-			if buf.Len() > 0 {
-				event.Source.Service = buf.String()
-			}
-		}
-		// Render Event Source Service
-		if t := tmplt.sourceTemplate.serviceInstanceTemplate; t != nil {
-			buf.Reset()
-			if err := t.Execute(&buf, data); err != nil {
-				return err
-			}
-			if buf.Len() > 0 {
-				event.Source.ServiceInstance = buf.String()
-			}
-		}
-		// Render Event Source Endpoint
-		if t := tmplt.sourceTemplate.endpointTemplate; t != nil {
-			buf.Reset()
-			if err := t.Execute(&buf, data); err != nil {
-				return err
-			}
-			if buf.Len() > 0 {
-				event.Source.Endpoint = buf.String()
-			}
-		}
+	if err := render(tmplt.messageTemplate, &swEvent.Message); err != nil {
+		logger.Log.Warnf("failed to render the template, using the default event content. %+v", err)
 	}
 
-	return nil
+	if err := render(tmplt.sourceTemplate.serviceTemplate, &swEvent.Source.Service); err != nil {
+		logger.Log.Warnf("failed to render service template, using the default event content. %+v", err)
+	}
+	if err := render(tmplt.sourceTemplate.serviceInstanceTemplate, &swEvent.Source.ServiceInstance); err != nil {
+		logger.Log.Warnf("failed to render service instance template, using the default event content. %+v", err)
+	}
+	if err := render(tmplt.sourceTemplate.endpointTemplate, &swEvent.Source.Endpoint); err != nil {
+		logger.Log.Warnf("failed to render endpoin template, using the default event content. %+v", err)
+	}
 }
 
 func (exporter *SkyWalking) Stop() {
diff --git a/pkg/exporter/util.go b/pkg/exporter/util.go
index d02859e..b389f2d 100644
--- a/pkg/exporter/util.go
+++ b/pkg/exporter/util.go
@@ -20,9 +20,10 @@
 package exporter
 
 import (
+	k8score "k8s.io/api/core/v1"
+
 	"github.com/apache/skywalking-kubernetes-event-exporter/internal/pkg/logger"
 	"github.com/apache/skywalking-kubernetes-event-exporter/pkg/event"
-	k8score "k8s.io/api/core/v1"
 )
 
 func drain(events chan *k8score.Event) {
diff --git a/pkg/filter/filter.go b/pkg/filter/filter.go
deleted file mode 100644
index e23e6fa..0000000
--- a/pkg/filter/filter.go
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to Apache Software Foundation (ASF) under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Apache Software Foundation (ASF) licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package filter
-
-type Filter struct {
-	Reason   string `yaml:"reason"`
-	Message  string `yaml:"message"`
-	MinCount int32  `yaml:"min-count"`
-	Type     string `yaml:"type"`
-	Action   string `yaml:"action"`
-
-	Kind      string `yaml:"kind"`
-	Namespace string `yaml:"namespace"`
-	Name      string `yaml:"name"`
-
-	Exporter string `yaml:"exporter"`
-}
diff --git a/pkg/k8s/event.go b/pkg/k8s/event.go
index 4d54502..32a2b55 100644
--- a/pkg/k8s/event.go
+++ b/pkg/k8s/event.go
@@ -20,11 +20,12 @@
 package k8s
 
 import (
-	"github.com/apache/skywalking-kubernetes-event-exporter/internal/pkg/logger"
 	v1 "k8s.io/api/core/v1"
 	"k8s.io/client-go/informers"
 	"k8s.io/client-go/kubernetes"
 	"k8s.io/client-go/tools/cache"
+
+	"github.com/apache/skywalking-kubernetes-event-exporter/internal/pkg/logger"
 )
 
 type EventWatcher struct {
diff --git a/pkg/k8s/registry.go b/pkg/k8s/registry.go
new file mode 100644
index 0000000..a1caea5
--- /dev/null
+++ b/pkg/k8s/registry.go
@@ -0,0 +1,176 @@
+/*
+ * Licensed to Apache Software Foundation (ASF) under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Apache Software Foundation (ASF) licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package k8s
+
+import (
+	"time"
+
+	corev1 "k8s.io/api/core/v1"
+	"k8s.io/client-go/informers"
+	"k8s.io/client-go/kubernetes"
+	"k8s.io/client-go/tools/cache"
+
+	"github.com/apache/skywalking-kubernetes-event-exporter/internal/pkg/logger"
+)
+
+type id struct {
+	namespace string
+	name      string
+}
+
+type registry struct {
+	informers []cache.SharedIndexInformer
+	stopCh    chan struct{}
+
+	podIDIpMap map[id]string
+	idSvcMap   map[id]*corev1.Service
+	idPodMap   map[id]*corev1.Pod
+	ipSvcIDMap map[string]id
+}
+
+func (r registry) OnAdd(obj interface{}) {
+	switch o := obj.(type) {
+	case *corev1.Pod:
+		podID := id{namespace: o.Namespace, name: o.Name}
+		r.podIDIpMap[podID] = o.Status.PodIP
+		r.idPodMap[podID] = o
+	case *corev1.Service:
+		r.idSvcMap[id{namespace: o.Namespace, name: o.Name}] = o
+	case *corev1.Endpoints:
+		for _, subset := range o.Subsets {
+			for _, address := range subset.Addresses {
+				r.ipSvcIDMap[address.IP] = id{
+					namespace: o.ObjectMeta.Namespace,
+					name:      o.ObjectMeta.Name,
+				}
+			}
+		}
+	}
+}
+
+func (r registry) OnUpdate(oldObj, newObj interface{}) {
+	r.OnDelete(oldObj)
+	r.OnAdd(newObj)
+}
+
+func (r registry) OnDelete(obj interface{}) {
+	switch o := obj.(type) {
+	case *corev1.Pod:
+		podID := id{namespace: o.Namespace, name: o.Name}
+		go func() {
+			time.Sleep(3 * time.Second)
+			delete(r.podIDIpMap, podID)
+			delete(r.idPodMap, podID)
+		}()
+	case *corev1.Service:
+		go func() {
+			time.Sleep(3 * time.Second)
+			delete(r.idSvcMap, id{namespace: o.Namespace, name: o.Name})
+		}()
+	case *corev1.Endpoints:
+		go func() {
+			for _, subset := range o.Subsets {
+				for _, address := range subset.Addresses {
+					time.Sleep(3 * time.Second)
+					delete(r.ipSvcIDMap, address.IP)
+				}
+			}
+		}()
+	}
+}
+
+func (r *registry) Start() {
+	logger.Log.Debugf("starting registry")
+
+	for _, informer := range r.informers {
+		go informer.Run(r.stopCh)
+	}
+}
+
+func (r *registry) Stop() {
+	logger.Log.Debugf("stopping registry")
+
+	r.stopCh <- struct{}{}
+	close(r.stopCh)
+}
+
+type TemplateContext struct {
+	Service *corev1.Service
+	Pod     *corev1.Pod
+	Event   *corev1.Event
+}
+
+func (r *registry) GetContext(e *corev1.Event) TemplateContext {
+	result := TemplateContext{Event: e}
+
+	if obj := e.InvolvedObject; obj.Kind == "Pod" {
+		podID := id{
+			namespace: obj.Namespace,
+			name:      obj.Name,
+		}
+		podIP := r.podIDIpMap[podID]
+		svcID := r.ipSvcIDMap[podIP]
+
+		result.Pod = r.idPodMap[podID]
+		result.Service = r.idSvcMap[svcID]
+	}
+
+	if obj := e.InvolvedObject; obj.Kind == "Service" {
+		svcID := id{
+			namespace: obj.Namespace,
+			name:      obj.Name,
+		}
+		result.Service = r.idSvcMap[svcID]
+	}
+
+	return result
+}
+
+var Registry = &registry{
+	stopCh: make(chan struct{}),
+
+	podIDIpMap: make(map[id]string),
+	idSvcMap:   make(map[id]*corev1.Service),
+	idPodMap:   make(map[id]*corev1.Pod),
+	ipSvcIDMap: make(map[string]id),
+}
+
+func (r *registry) Init() error {
+	logger.Log.Debugf("initializing template context registry")
+
+	config, err := GetConfig()
+	if err != nil {
+		return err
+	}
+	client := kubernetes.NewForConfigOrDie(config)
+	factory := informers.NewSharedInformerFactoryWithOptions(client, 0, informers.WithNamespace(corev1.NamespaceAll))
+
+	r.informers = []cache.SharedIndexInformer{
+		factory.Core().V1().Endpoints().Informer(),
+		factory.Core().V1().Services().Informer(),
+		factory.Core().V1().Pods().Informer(),
+	}
+
+	for _, informer := range Registry.informers {
+		informer.AddEventHandler(Registry)
+	}
+
+	return nil
+}
diff --git a/pkg/pipe/pipe.go b/pkg/pipe/pipe.go
index 4b258e0..5f32c00 100644
--- a/pkg/pipe/pipe.go
+++ b/pkg/pipe/pipe.go
@@ -80,6 +80,10 @@ func (p *Pipe) Init() error {
 		}
 	}
 
+	if err := k8s.Registry.Init(); err != nil {
+		return err
+	}
+
 	logger.Log.Debugf("pipe has been initialized")
 
 	return nil
@@ -88,6 +92,8 @@ func (p *Pipe) Init() error {
 func (p *Pipe) Start() error {
 	p.Watcher.Start()
 
+	k8s.Registry.Start()
+
 	for _, wkfl := range p.workflows {
 		go wkfl.exporter.Export(wkfl.events)
 	}
@@ -120,6 +126,8 @@ func (p *Pipe) Stop() {
 		w.exporter.Stop()
 	}
 
+	k8s.Registry.Stop()
+
 	p.stopper <- struct{}{}
 	close(p.stopper)
 }