You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ke...@apache.org on 2021/04/01 12:54:42 UTC
[skywalking-kubernetes-event-exporter] 01/01: feature: implement
the real "filter" logics, and cache template and regexps
This is an automated email from the ASF dual-hosted git repository.
kezhenxu94 pushed a commit to branch feature/filter-cache
in repository https://gitbox.apache.org/repos/asf/skywalking-kubernetes-event-exporter.git
commit b17081f56784efd106b65e0ed93e644954023fc5
Author: kezhenxu94 <ke...@apache.org>
AuthorDate: Thu Apr 1 20:54:24 2021 +0800
feature: implement the real "filter" logics, and cache template and regexps
- Implement the real "filter" logics.
- Cache RegExp in the filter configs.
- Cache Template in the exporters.
---
assets/default-config.yaml | 2 +-
cmd/start.go | 3 +-
configs/config.go | 59 +++++++++++++++++++++++++++++------
pkg/exporter/exporter.go | 48 +++++++++++++++++++++++++---
pkg/exporter/skywalking.go | 51 +++++++++---------------------
pkg/exporter/{exporter.go => util.go} | 32 +++++--------------
pkg/pipe/pipe.go | 44 ++++++++++++++------------
7 files changed, 141 insertions(+), 98 deletions(-)
diff --git a/assets/default-config.yaml b/assets/default-config.yaml
index 0038981..dce9b85 100644
--- a/assets/default-config.yaml
+++ b/assets/default-config.yaml
@@ -17,7 +17,7 @@
#
filters:
- - namespace: default
+ - namespace: istio-system
exporter: skywalking
exporters:
diff --git a/cmd/start.go b/cmd/start.go
index b9a54c6..cf7e624 100644
--- a/cmd/start.go
+++ b/cmd/start.go
@@ -25,6 +25,7 @@ import (
"syscall"
"github.com/spf13/cobra"
+ v1 "k8s.io/api/core/v1"
_ "k8s.io/client-go/plugin/pkg/client/auth"
"github.com/apache/skywalking-kubernetes-event-exporter/pkg/k8s"
@@ -39,7 +40,7 @@ var startCmd = &cobra.Command{
Use: "start",
Short: "Start skywalking-kubernetes-event-exporter",
RunE: func(cmd *cobra.Command, args []string) error {
- watcher, err := k8s.WatchEvents("default")
+ watcher, err := k8s.WatchEvents(v1.NamespaceAll)
if err != nil {
return err
}
diff --git a/configs/config.go b/configs/config.go
index 1a93b2a..5397e76 100644
--- a/configs/config.go
+++ b/configs/config.go
@@ -20,6 +20,8 @@
package configs
import (
+ "regexp"
+
"gopkg.in/yaml.v3"
v1 "k8s.io/api/core/v1"
@@ -27,23 +29,62 @@ import (
)
type FilterConfig struct {
- Reason string `yaml:"reason"`
- Message string `yaml:"message"`
- MinCount int32 `yaml:"min-count"`
- Type string `yaml:"type"`
- Action string `yaml:"action"`
+ Reason string `yaml:"reason"`
+ reasonRegExp *regexp.Regexp
+ Message string `yaml:"message"`
+ messageRegExp *regexp.Regexp
+ MinCount int32 `yaml:"min-count"`
+ Type string `yaml:"type"`
+ typeRegExp *regexp.Regexp
+ Action string `yaml:"action"`
+ actionRegExp *regexp.Regexp
+ Kind string `yaml:"kind"`
+ kindRegExp *regexp.Regexp
+ Namespace string `yaml:"namespace"`
+ namespaceRegExp *regexp.Regexp
+ Name string `yaml:"name"`
+ nameRegExp *regexp.Regexp
- Kind string `yaml:"kind"`
- Namespace string `yaml:"namespace"`
- Name string `yaml:"name"`
+ Exporters []string `yaml:"exporters"`
+}
- Exporter string `yaml:"exporter"`
+func (filter *FilterConfig) Init() {
+ filter.reasonRegExp = regexp.MustCompile(filter.Reason)
+ filter.messageRegExp = regexp.MustCompile(filter.Message)
+ filter.typeRegExp = regexp.MustCompile(filter.Type)
+ filter.actionRegExp = regexp.MustCompile(filter.Action)
+ filter.kindRegExp = regexp.MustCompile(filter.Kind)
+ filter.namespaceRegExp = regexp.MustCompile(filter.Namespace)
+ filter.nameRegExp = regexp.MustCompile(filter.Name)
}
+// Filter the given event with this filter instance.
+// Return true if the event is filtered, return false otherwise.
func (filter *FilterConfig) Filter(event *v1.Event) bool {
if event == evnt.Stopper {
return false
}
+ if filter.Reason != "" && !filter.reasonRegExp.MatchString(event.Reason) {
+ return true
+ }
+ if filter.Message != "" && !filter.messageRegExp.MatchString(event.Message) {
+ return true
+ }
+ if filter.Type != "" && !filter.typeRegExp.MatchString(event.Type) {
+ return true
+ }
+ if filter.Action != "" && !filter.actionRegExp.MatchString(event.Action) {
+ return true
+ }
+ if filter.Kind != "" && !filter.kindRegExp.MatchString(event.Kind) {
+ return true
+ }
+ if filter.Namespace != "" && !filter.namespaceRegExp.MatchString(event.Namespace) {
+ return true
+ }
+ if filter.Name != "" && !filter.nameRegExp.MatchString(event.Name) {
+ return true
+ }
return false
}
diff --git a/pkg/exporter/exporter.go b/pkg/exporter/exporter.go
index a3f2284..7ec53e3 100644
--- a/pkg/exporter/exporter.go
+++ b/pkg/exporter/exporter.go
@@ -20,6 +20,8 @@
package exporter
import (
+ "html/template"
+
v1 "k8s.io/api/core/v1"
"github.com/apache/skywalking-kubernetes-event-exporter/internal/pkg/logger"
@@ -33,11 +35,6 @@ type Exporter interface {
Stop()
}
-type MessageTemplate struct {
- Source *event.Source `mapstructure:"source"`
- Message string `mapstructure:"message"`
-}
-
var exporters = map[string]Exporter{}
func RegisterExporter(name string, exporter Exporter) {
@@ -51,3 +48,44 @@ func RegisterExporter(name string, exporter Exporter) {
func GetExporter(name string) Exporter {
return exporters[name]
}
+
+type SourceTemplate struct {
+ serviceTemplate *template.Template
+ serviceInstanceTemplate *template.Template
+ endpointTemplate *template.Template
+}
+
+type EventTemplate struct {
+ Source *event.Source `mapstructure:"source"`
+ sourceTemplate *SourceTemplate
+ Message string `mapstructure:"message"`
+ messageTemplate *template.Template
+}
+
+func (tmplt *EventTemplate) Init() (err error) {
+ if tmplt.Message != "" {
+ if tmplt.messageTemplate, err = template.New("EventMessageTemplate").Parse(tmplt.Message); err != nil {
+ return err
+ }
+ }
+
+ if tmplt.Source != nil {
+ if tmplt.Source.Service != "" {
+ if tmplt.sourceTemplate.serviceTemplate, err = template.New("EventSourceServiceTemplate").Parse(tmplt.Source.Service); err != nil {
+ return err
+ }
+ }
+ if tmplt.Source.ServiceInstance != "" {
+ if tmplt.sourceTemplate.serviceInstanceTemplate, err = template.New("EventServiceInstanceTemplate").Parse(tmplt.Source.ServiceInstance); err != nil {
+ return err
+ }
+ }
+ if tmplt.Source.Endpoint != "" {
+ if tmplt.sourceTemplate.endpointTemplate, err = template.New("EventEndpointTemplate").Parse(tmplt.Source.Endpoint); err != nil {
+ return err
+ }
+ }
+ }
+
+ return err
+}
diff --git a/pkg/exporter/skywalking.go b/pkg/exporter/skywalking.go
index 75cabab..178fb16 100644
--- a/pkg/exporter/skywalking.go
+++ b/pkg/exporter/skywalking.go
@@ -24,7 +24,6 @@ import (
"context"
"encoding/json"
"fmt"
- "html/template"
"time"
sw "skywalking.apache.org/repo/goapi/collect/event/v3"
@@ -45,8 +44,8 @@ type SkyWalking struct {
}
type SkyWalkingConfig struct {
- Address string `mapstructure:"address"`
- Template *MessageTemplate `mapstructure:"template"`
+ Address string `mapstructure:"address"`
+ Template *EventTemplate `mapstructure:"template"`
}
func init() {
@@ -67,6 +66,10 @@ func (exporter *SkyWalking) Init() error {
return err
}
+ if err := config.Template.Init(); err != nil {
+ return err
+ }
+
conn, err := grpc.Dial(config.Address, grpc.WithInsecure())
if err != nil {
return err
@@ -89,12 +92,7 @@ func (exporter *SkyWalking) Export(events chan *k8score.Event) {
for err != nil {
select {
case <-exporter.stopper:
- logger.Log.Debugf("draining event channel")
- for e := range events {
- if e == event.Stopper {
- break
- }
- }
+ drain(events)
return
default:
logger.Log.Errorf("failed to connect to SkyWalking server. %+v", err)
@@ -113,12 +111,7 @@ func (exporter *SkyWalking) Export(events chan *k8score.Event) {
for {
select {
case <-exporter.stopper:
- logger.Log.Debugf("draining event channel")
- for e := range events {
- if e == event.Stopper {
- break
- }
- }
+ drain(events)
return
case kEvent := <-events:
if kEvent == event.Stopper {
@@ -152,16 +145,12 @@ func (exporter *SkyWalking) Export(events chan *k8score.Event) {
}()
}
-func (tmplt *MessageTemplate) Render(event *sw.Event, data *k8score.Event) error {
+func (tmplt *EventTemplate) Render(event *sw.Event, data *k8score.Event) error {
var buf bytes.Buffer
// Render Event Message
- if tmplt.Message != "" {
+ if t := tmplt.messageTemplate; t != nil {
buf.Reset()
- t, err := template.New("EventMsg").Parse(tmplt.Message)
- if err != nil {
- return err
- }
if err := t.Execute(&buf, data); err != nil {
return err
}
@@ -171,14 +160,10 @@ func (tmplt *MessageTemplate) Render(event *sw.Event, data *k8score.Event) error
}
// Render Event Source
- if tmplt.Source != nil {
+ if tmplt.sourceTemplate != nil {
// Render Event Source Service
- if tmplt.Source.Service != "" {
+ if t := tmplt.sourceTemplate.serviceTemplate; t != nil {
buf.Reset()
- t, err := template.New("EventSourceService").Parse(tmplt.Source.Service)
- if err != nil {
- return err
- }
if err := t.Execute(&buf, data); err != nil {
return err
}
@@ -187,12 +172,8 @@ func (tmplt *MessageTemplate) Render(event *sw.Event, data *k8score.Event) error
}
}
// Render Event Source Service
- if tmplt.Source.ServiceInstance != "" {
+ if t := tmplt.sourceTemplate.serviceInstanceTemplate; t != nil {
buf.Reset()
- t, err := template.New("EventSourceServiceInstance").Parse(tmplt.Source.ServiceInstance)
- if err != nil {
- return err
- }
if err := t.Execute(&buf, data); err != nil {
return err
}
@@ -201,12 +182,8 @@ func (tmplt *MessageTemplate) Render(event *sw.Event, data *k8score.Event) error
}
}
// Render Event Source Endpoint
- if tmplt.Source.Endpoint != "" {
+ if t := tmplt.sourceTemplate.endpointTemplate; t != nil {
buf.Reset()
- t, err := template.New("EventSourceEndpoint").Parse(tmplt.Source.Endpoint)
- if err != nil {
- return err
- }
if err := t.Execute(&buf, data); err != nil {
return err
}
diff --git a/pkg/exporter/exporter.go b/pkg/exporter/util.go
similarity index 64%
copy from pkg/exporter/exporter.go
copy to pkg/exporter/util.go
index a3f2284..d02859e 100644
--- a/pkg/exporter/exporter.go
+++ b/pkg/exporter/util.go
@@ -20,34 +20,16 @@
package exporter
import (
- v1 "k8s.io/api/core/v1"
-
"github.com/apache/skywalking-kubernetes-event-exporter/internal/pkg/logger"
"github.com/apache/skywalking-kubernetes-event-exporter/pkg/event"
+ k8score "k8s.io/api/core/v1"
)
-type Exporter interface {
- Name() string
- Init() error
- Export(events chan *v1.Event)
- Stop()
-}
-
-type MessageTemplate struct {
- Source *event.Source `mapstructure:"source"`
- Message string `mapstructure:"message"`
-}
-
-var exporters = map[string]Exporter{}
-
-func RegisterExporter(name string, exporter Exporter) {
- if _, ok := exporters[name]; ok {
- logger.Log.Panicf("exporter with name %v has already existed", name)
+func drain(events chan *k8score.Event) {
+ logger.Log.Debugf("draining event channel")
+ for e := range events {
+ if e == event.Stopper {
+ break
+ }
}
-
- exporters[name] = exporter
-}
-
-func GetExporter(name string) Exporter {
- return exporters[name]
}
diff --git a/pkg/pipe/pipe.go b/pkg/pipe/pipe.go
index 8ff7d02..e27aa14 100644
--- a/pkg/pipe/pipe.go
+++ b/pkg/pipe/pipe.go
@@ -49,28 +49,32 @@ func (p *Pipe) Init() error {
initialized := map[string]bool{}
for _, filter := range configs.GlobalConfig.Filters {
- if _, ok := configs.GlobalConfig.Exporters[filter.Exporter]; !ok {
- return fmt.Errorf("exporter %v is not defined", filter.Exporter)
- }
- exporter := exp.GetExporter(filter.Exporter)
- if exporter == nil {
- return fmt.Errorf("exporter %v is not defined", filter.Exporter)
- }
- if initialized[filter.Exporter] {
- continue
- }
- if err := exporter.Init(); err != nil {
- return err
- }
- initialized[filter.Exporter] = true
+ filter.Init()
+
+ for _, name := range filter.Exporters {
+ if _, ok := configs.GlobalConfig.Exporters[name]; !ok {
+ return fmt.Errorf("exporter %v is not defined", filter.Exporters)
+ }
+ exporter := exp.GetExporter(name)
+ if exporter == nil {
+ return fmt.Errorf("exporter %v is not defined", filter.Exporters)
+ }
+ if initialized[name] {
+ continue
+ }
+ if err := exporter.Init(); err != nil {
+ return err
+ }
+ initialized[name] = true
- events := make(chan *v1.Event)
+ events := make(chan *v1.Event)
- p.workflows = append(p.workflows, workflow{
- filter: filter,
- exporter: exporter,
- events: events,
- })
+ p.workflows = append(p.workflows, workflow{
+ filter: filter,
+ exporter: exporter,
+ events: events,
+ })
+ }
}
return nil