You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ke...@apache.org on 2021/04/01 12:54:42 UTC

[skywalking-kubernetes-event-exporter] 01/01: feature: implement the real "filter" logics, and cache template and regexps

This is an automated email from the ASF dual-hosted git repository.

kezhenxu94 pushed a commit to branch feature/filter-cache
in repository https://gitbox.apache.org/repos/asf/skywalking-kubernetes-event-exporter.git

commit b17081f56784efd106b65e0ed93e644954023fc5
Author: kezhenxu94 <ke...@apache.org>
AuthorDate: Thu Apr 1 20:54:24 2021 +0800

    feature: implement the real "filter" logics, and cache template and regexps
    
    - Implement the real "filter" logics.
    - Cache RegExp in the filter configs.
    - Cache Template in the exporters.
---
 assets/default-config.yaml            |  2 +-
 cmd/start.go                          |  3 +-
 configs/config.go                     | 59 +++++++++++++++++++++++++++++------
 pkg/exporter/exporter.go              | 48 +++++++++++++++++++++++++---
 pkg/exporter/skywalking.go            | 51 +++++++++---------------------
 pkg/exporter/{exporter.go => util.go} | 32 +++++--------------
 pkg/pipe/pipe.go                      | 44 ++++++++++++++------------
 7 files changed, 141 insertions(+), 98 deletions(-)

diff --git a/assets/default-config.yaml b/assets/default-config.yaml
index 0038981..dce9b85 100644
--- a/assets/default-config.yaml
+++ b/assets/default-config.yaml
@@ -17,7 +17,7 @@
 #
 
 filters:
-  - namespace: default
+  - namespace: istio-system
     exporter: skywalking
 
 exporters:
diff --git a/cmd/start.go b/cmd/start.go
index b9a54c6..cf7e624 100644
--- a/cmd/start.go
+++ b/cmd/start.go
@@ -25,6 +25,7 @@ import (
 	"syscall"
 
 	"github.com/spf13/cobra"
+	v1 "k8s.io/api/core/v1"
 	_ "k8s.io/client-go/plugin/pkg/client/auth"
 
 	"github.com/apache/skywalking-kubernetes-event-exporter/pkg/k8s"
@@ -39,7 +40,7 @@ var startCmd = &cobra.Command{
 	Use:   "start",
 	Short: "Start skywalking-kubernetes-event-exporter",
 	RunE: func(cmd *cobra.Command, args []string) error {
-		watcher, err := k8s.WatchEvents("default")
+		watcher, err := k8s.WatchEvents(v1.NamespaceAll)
 		if err != nil {
 			return err
 		}
diff --git a/configs/config.go b/configs/config.go
index 1a93b2a..5397e76 100644
--- a/configs/config.go
+++ b/configs/config.go
@@ -20,6 +20,8 @@
 package configs
 
 import (
+	"regexp"
+
 	"gopkg.in/yaml.v3"
 	v1 "k8s.io/api/core/v1"
 
@@ -27,23 +29,62 @@ import (
 )
 
 type FilterConfig struct {
-	Reason   string `yaml:"reason"`
-	Message  string `yaml:"message"`
-	MinCount int32  `yaml:"min-count"`
-	Type     string `yaml:"type"`
-	Action   string `yaml:"action"`
+	Reason          string `yaml:"reason"`
+	reasonRegExp    *regexp.Regexp
+	Message         string `yaml:"message"`
+	messageRegExp   *regexp.Regexp
+	MinCount        int32  `yaml:"min-count"`
+	Type            string `yaml:"type"`
+	typeRegExp      *regexp.Regexp
+	Action          string `yaml:"action"`
+	actionRegExp    *regexp.Regexp
+	Kind            string `yaml:"kind"`
+	kindRegExp      *regexp.Regexp
+	Namespace       string `yaml:"namespace"`
+	namespaceRegExp *regexp.Regexp
+	Name            string `yaml:"name"`
+	nameRegExp      *regexp.Regexp
 
-	Kind      string `yaml:"kind"`
-	Namespace string `yaml:"namespace"`
-	Name      string `yaml:"name"`
+	Exporters []string `yaml:"exporters"`
+}
 
-	Exporter string `yaml:"exporter"`
+func (filter *FilterConfig) Init() {
+	filter.reasonRegExp = regexp.MustCompile(filter.Reason)
+	filter.messageRegExp = regexp.MustCompile(filter.Message)
+	filter.typeRegExp = regexp.MustCompile(filter.Type)
+	filter.actionRegExp = regexp.MustCompile(filter.Action)
+	filter.kindRegExp = regexp.MustCompile(filter.Kind)
+	filter.namespaceRegExp = regexp.MustCompile(filter.Namespace)
+	filter.nameRegExp = regexp.MustCompile(filter.Name)
 }
 
+// Filter the given event with this filter instance.
+// Return true if the event is filtered, return false otherwise.
 func (filter *FilterConfig) Filter(event *v1.Event) bool {
 	if event == evnt.Stopper {
 		return false
 	}
+	if filter.Reason != "" && !filter.reasonRegExp.MatchString(event.Reason) {
+		return true
+	}
+	if filter.Message != "" && !filter.messageRegExp.MatchString(event.Message) {
+		return true
+	}
+	if filter.Type != "" && !filter.typeRegExp.MatchString(event.Type) {
+		return true
+	}
+	if filter.Action != "" && !filter.actionRegExp.MatchString(event.Action) {
+		return true
+	}
+	if filter.Kind != "" && !filter.kindRegExp.MatchString(event.Kind) {
+		return true
+	}
+	if filter.Namespace != "" && !filter.namespaceRegExp.MatchString(event.Namespace) {
+		return true
+	}
+	if filter.Name != "" && !filter.nameRegExp.MatchString(event.Name) {
+		return true
+	}
 	return false
 }
 
diff --git a/pkg/exporter/exporter.go b/pkg/exporter/exporter.go
index a3f2284..7ec53e3 100644
--- a/pkg/exporter/exporter.go
+++ b/pkg/exporter/exporter.go
@@ -20,6 +20,8 @@
 package exporter
 
 import (
+	"html/template"
+
 	v1 "k8s.io/api/core/v1"
 
 	"github.com/apache/skywalking-kubernetes-event-exporter/internal/pkg/logger"
@@ -33,11 +35,6 @@ type Exporter interface {
 	Stop()
 }
 
-type MessageTemplate struct {
-	Source  *event.Source `mapstructure:"source"`
-	Message string        `mapstructure:"message"`
-}
-
 var exporters = map[string]Exporter{}
 
 func RegisterExporter(name string, exporter Exporter) {
@@ -51,3 +48,44 @@ func RegisterExporter(name string, exporter Exporter) {
 func GetExporter(name string) Exporter {
 	return exporters[name]
 }
+
+type SourceTemplate struct {
+	serviceTemplate         *template.Template
+	serviceInstanceTemplate *template.Template
+	endpointTemplate        *template.Template
+}
+
+type EventTemplate struct {
+	Source          *event.Source `mapstructure:"source"`
+	sourceTemplate  *SourceTemplate
+	Message         string `mapstructure:"message"`
+	messageTemplate *template.Template
+}
+
+func (tmplt *EventTemplate) Init() (err error) {
+	if tmplt.Message != "" {
+		if tmplt.messageTemplate, err = template.New("EventMessageTemplate").Parse(tmplt.Message); err != nil {
+			return err
+		}
+	}
+
+	if tmplt.Source != nil {
+		if tmplt.Source.Service != "" {
+			if tmplt.sourceTemplate.serviceTemplate, err = template.New("EventSourceServiceTemplate").Parse(tmplt.Source.Service); err != nil {
+				return err
+			}
+		}
+		if tmplt.Source.ServiceInstance != "" {
+			if tmplt.sourceTemplate.serviceInstanceTemplate, err = template.New("EventServiceInstanceTemplate").Parse(tmplt.Source.ServiceInstance); err != nil {
+				return err
+			}
+		}
+		if tmplt.Source.Endpoint != "" {
+			if tmplt.sourceTemplate.endpointTemplate, err = template.New("EventEndpointTemplate").Parse(tmplt.Source.Endpoint); err != nil {
+				return err
+			}
+		}
+	}
+
+	return err
+}
diff --git a/pkg/exporter/skywalking.go b/pkg/exporter/skywalking.go
index 75cabab..178fb16 100644
--- a/pkg/exporter/skywalking.go
+++ b/pkg/exporter/skywalking.go
@@ -24,7 +24,6 @@ import (
 	"context"
 	"encoding/json"
 	"fmt"
-	"html/template"
 	"time"
 
 	sw "skywalking.apache.org/repo/goapi/collect/event/v3"
@@ -45,8 +44,8 @@ type SkyWalking struct {
 }
 
 type SkyWalkingConfig struct {
-	Address  string           `mapstructure:"address"`
-	Template *MessageTemplate `mapstructure:"template"`
+	Address  string         `mapstructure:"address"`
+	Template *EventTemplate `mapstructure:"template"`
 }
 
 func init() {
@@ -67,6 +66,10 @@ func (exporter *SkyWalking) Init() error {
 		return err
 	}
 
+	if err := config.Template.Init(); err != nil {
+		return err
+	}
+
 	conn, err := grpc.Dial(config.Address, grpc.WithInsecure())
 	if err != nil {
 		return err
@@ -89,12 +92,7 @@ func (exporter *SkyWalking) Export(events chan *k8score.Event) {
 	for err != nil {
 		select {
 		case <-exporter.stopper:
-			logger.Log.Debugf("draining event channel")
-			for e := range events {
-				if e == event.Stopper {
-					break
-				}
-			}
+			drain(events)
 			return
 		default:
 			logger.Log.Errorf("failed to connect to SkyWalking server. %+v", err)
@@ -113,12 +111,7 @@ func (exporter *SkyWalking) Export(events chan *k8score.Event) {
 		for {
 			select {
 			case <-exporter.stopper:
-				logger.Log.Debugf("draining event channel")
-				for e := range events {
-					if e == event.Stopper {
-						break
-					}
-				}
+				drain(events)
 				return
 			case kEvent := <-events:
 				if kEvent == event.Stopper {
@@ -152,16 +145,12 @@ func (exporter *SkyWalking) Export(events chan *k8score.Event) {
 	}()
 }
 
-func (tmplt *MessageTemplate) Render(event *sw.Event, data *k8score.Event) error {
+func (tmplt *EventTemplate) Render(event *sw.Event, data *k8score.Event) error {
 	var buf bytes.Buffer
 
 	// Render Event Message
-	if tmplt.Message != "" {
+	if t := tmplt.messageTemplate; t != nil {
 		buf.Reset()
-		t, err := template.New("EventMsg").Parse(tmplt.Message)
-		if err != nil {
-			return err
-		}
 		if err := t.Execute(&buf, data); err != nil {
 			return err
 		}
@@ -171,14 +160,10 @@ func (tmplt *MessageTemplate) Render(event *sw.Event, data *k8score.Event) error
 	}
 
 	// Render Event Source
-	if tmplt.Source != nil {
+	if tmplt.sourceTemplate != nil {
 		// Render Event Source Service
-		if tmplt.Source.Service != "" {
+		if t := tmplt.sourceTemplate.serviceTemplate; t != nil {
 			buf.Reset()
-			t, err := template.New("EventSourceService").Parse(tmplt.Source.Service)
-			if err != nil {
-				return err
-			}
 			if err := t.Execute(&buf, data); err != nil {
 				return err
 			}
@@ -187,12 +172,8 @@ func (tmplt *MessageTemplate) Render(event *sw.Event, data *k8score.Event) error
 			}
 		}
 		// Render Event Source Service
-		if tmplt.Source.ServiceInstance != "" {
+		if t := tmplt.sourceTemplate.serviceInstanceTemplate; t != nil {
 			buf.Reset()
-			t, err := template.New("EventSourceServiceInstance").Parse(tmplt.Source.ServiceInstance)
-			if err != nil {
-				return err
-			}
 			if err := t.Execute(&buf, data); err != nil {
 				return err
 			}
@@ -201,12 +182,8 @@ func (tmplt *MessageTemplate) Render(event *sw.Event, data *k8score.Event) error
 			}
 		}
 		// Render Event Source Endpoint
-		if tmplt.Source.Endpoint != "" {
+		if t := tmplt.sourceTemplate.endpointTemplate; t != nil {
 			buf.Reset()
-			t, err := template.New("EventSourceEndpoint").Parse(tmplt.Source.Endpoint)
-			if err != nil {
-				return err
-			}
 			if err := t.Execute(&buf, data); err != nil {
 				return err
 			}
diff --git a/pkg/exporter/exporter.go b/pkg/exporter/util.go
similarity index 64%
copy from pkg/exporter/exporter.go
copy to pkg/exporter/util.go
index a3f2284..d02859e 100644
--- a/pkg/exporter/exporter.go
+++ b/pkg/exporter/util.go
@@ -20,34 +20,16 @@
 package exporter
 
 import (
-	v1 "k8s.io/api/core/v1"
-
 	"github.com/apache/skywalking-kubernetes-event-exporter/internal/pkg/logger"
 	"github.com/apache/skywalking-kubernetes-event-exporter/pkg/event"
+	k8score "k8s.io/api/core/v1"
 )
 
-type Exporter interface {
-	Name() string
-	Init() error
-	Export(events chan *v1.Event)
-	Stop()
-}
-
-type MessageTemplate struct {
-	Source  *event.Source `mapstructure:"source"`
-	Message string        `mapstructure:"message"`
-}
-
-var exporters = map[string]Exporter{}
-
-func RegisterExporter(name string, exporter Exporter) {
-	if _, ok := exporters[name]; ok {
-		logger.Log.Panicf("exporter with name %v has already existed", name)
+func drain(events chan *k8score.Event) {
+	logger.Log.Debugf("draining event channel")
+	for e := range events {
+		if e == event.Stopper {
+			break
+		}
 	}
-
-	exporters[name] = exporter
-}
-
-func GetExporter(name string) Exporter {
-	return exporters[name]
 }
diff --git a/pkg/pipe/pipe.go b/pkg/pipe/pipe.go
index 8ff7d02..e27aa14 100644
--- a/pkg/pipe/pipe.go
+++ b/pkg/pipe/pipe.go
@@ -49,28 +49,32 @@ func (p *Pipe) Init() error {
 
 	initialized := map[string]bool{}
 	for _, filter := range configs.GlobalConfig.Filters {
-		if _, ok := configs.GlobalConfig.Exporters[filter.Exporter]; !ok {
-			return fmt.Errorf("exporter %v is not defined", filter.Exporter)
-		}
-		exporter := exp.GetExporter(filter.Exporter)
-		if exporter == nil {
-			return fmt.Errorf("exporter %v is not defined", filter.Exporter)
-		}
-		if initialized[filter.Exporter] {
-			continue
-		}
-		if err := exporter.Init(); err != nil {
-			return err
-		}
-		initialized[filter.Exporter] = true
+		filter.Init()
+
+		for _, name := range filter.Exporters {
+			if _, ok := configs.GlobalConfig.Exporters[name]; !ok {
+				return fmt.Errorf("exporter %v is not defined", filter.Exporters)
+			}
+			exporter := exp.GetExporter(name)
+			if exporter == nil {
+				return fmt.Errorf("exporter %v is not defined", filter.Exporters)
+			}
+			if initialized[name] {
+				continue
+			}
+			if err := exporter.Init(); err != nil {
+				return err
+			}
+			initialized[name] = true
 
-		events := make(chan *v1.Event)
+			events := make(chan *v1.Event)
 
-		p.workflows = append(p.workflows, workflow{
-			filter:   filter,
-			exporter: exporter,
-			events:   events,
-		})
+			p.workflows = append(p.workflows, workflow{
+				filter:   filter,
+				exporter: exporter,
+				events:   events,
+			})
+		}
 	}
 
 	return nil