You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ke...@apache.org on 2021/04/02 13:26:52 UTC
[skywalking-kubernetes-event-exporter] 01/01: feature: provide more
template context objects
This is an automated email from the ASF dual-hosted git repository.
kezhenxu94 pushed a commit to branch feature/more-ctx
in repository https://gitbox.apache.org/repos/asf/skywalking-kubernetes-event-exporter.git
commit 9a2d18e8535c384400a0b11cd00cd864e048cade
Author: kezhenxu94 <ke...@apache.org>
AuthorDate: Fri Apr 2 21:26:38 2021 +0800
feature: provide more template context objects
---
pkg/exporter/skywalking.go | 61 ++++++++----------
pkg/filter/filter.go | 34 ----------
pkg/k8s/registry.go | 153 +++++++++++++++++++++++++++++++++++++++++++++
pkg/pipe/pipe.go | 8 +++
4 files changed, 187 insertions(+), 69 deletions(-)
diff --git a/pkg/exporter/skywalking.go b/pkg/exporter/skywalking.go
index f713a14..41aa11b 100644
--- a/pkg/exporter/skywalking.go
+++ b/pkg/exporter/skywalking.go
@@ -24,8 +24,10 @@ import (
"context"
"encoding/json"
"fmt"
+ "html/template"
"time"
+ "github.com/apache/skywalking-kubernetes-event-exporter/pkg/k8s"
sw "skywalking.apache.org/repo/goapi/collect/event/v3"
"google.golang.org/grpc"
@@ -147,51 +149,40 @@ func (exporter *SkyWalking) Export(events chan *k8score.Event) {
}()
}
-func (tmplt *EventTemplate) Render(event *sw.Event, data *k8score.Event) error {
- var buf bytes.Buffer
+func (tmplt *EventTemplate) Render(event *sw.Event, kEvent *k8score.Event) error {
+ templateCtx := k8s.Registry.GetContext(kEvent)
- // Render Event Message
- if t := tmplt.messageTemplate; t != nil {
- buf.Reset()
- if err := t.Execute(&buf, data); err != nil {
+ render := func(t *template.Template, destination *string) error {
+ if t == nil {
+ return nil
+ }
+
+ var buf bytes.Buffer
+
+ if err := t.Execute(&buf, templateCtx); err != nil {
return err
}
+
if buf.Len() > 0 {
- event.Message = buf.String()
+ *destination = buf.String()
}
+
+ return nil
+ }
+
+ if err := render(tmplt.messageTemplate, &event.Message); err != nil {
+ return err
}
- // Render Event Source
if tmplt.sourceTemplate != nil {
- // Render Event Source Service
- if t := tmplt.sourceTemplate.serviceTemplate; t != nil {
- buf.Reset()
- if err := t.Execute(&buf, data); err != nil {
- return err
- }
- if buf.Len() > 0 {
- event.Source.Service = buf.String()
- }
+ if err := render(tmplt.sourceTemplate.serviceTemplate, &event.Source.Service); err != nil {
+ return err
}
- // Render Event Source Service
- if t := tmplt.sourceTemplate.serviceInstanceTemplate; t != nil {
- buf.Reset()
- if err := t.Execute(&buf, data); err != nil {
- return err
- }
- if buf.Len() > 0 {
- event.Source.ServiceInstance = buf.String()
- }
+ if err := render(tmplt.sourceTemplate.serviceInstanceTemplate, &event.Source.ServiceInstance); err != nil {
+ return err
}
- // Render Event Source Endpoint
- if t := tmplt.sourceTemplate.endpointTemplate; t != nil {
- buf.Reset()
- if err := t.Execute(&buf, data); err != nil {
- return err
- }
- if buf.Len() > 0 {
- event.Source.Endpoint = buf.String()
- }
+ if err := render(tmplt.sourceTemplate.endpointTemplate, &event.Source.Endpoint); err != nil {
+ return err
}
}
diff --git a/pkg/filter/filter.go b/pkg/filter/filter.go
deleted file mode 100644
index e23e6fa..0000000
--- a/pkg/filter/filter.go
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to Apache Software Foundation (ASF) under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Apache Software Foundation (ASF) licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package filter
-
-type Filter struct {
- Reason string `yaml:"reason"`
- Message string `yaml:"message"`
- MinCount int32 `yaml:"min-count"`
- Type string `yaml:"type"`
- Action string `yaml:"action"`
-
- Kind string `yaml:"kind"`
- Namespace string `yaml:"namespace"`
- Name string `yaml:"name"`
-
- Exporter string `yaml:"exporter"`
-}
diff --git a/pkg/k8s/registry.go b/pkg/k8s/registry.go
new file mode 100644
index 0000000..99857e8
--- /dev/null
+++ b/pkg/k8s/registry.go
@@ -0,0 +1,153 @@
+/*
+ * Licensed to Apache Software Foundation (ASF) under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Apache Software Foundation (ASF) licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package k8s
+
+import (
+ "github.com/apache/skywalking-kubernetes-event-exporter/internal/pkg/logger"
+ corev1 "k8s.io/api/core/v1"
+ "k8s.io/client-go/informers"
+ "k8s.io/client-go/kubernetes"
+ "k8s.io/client-go/tools/cache"
+)
+
+type id struct {
+ namespace string
+ name string
+}
+
+type registry struct {
+ informers []cache.SharedIndexInformer
+ stopCh chan struct{}
+
+ podIdIpMap map[id]string
+ idSvcMap map[id]*corev1.Service
+ idPodMap map[id]*corev1.Pod
+ ipSvcIdMap map[string]id
+}
+
+func (r registry) OnAdd(obj interface{}) {
+ if pod, ok := obj.(*corev1.Pod); ok {
+ podID := id{namespace: pod.Namespace, name: pod.Name}
+ r.podIdIpMap[podID] = pod.Status.PodIP
+ r.idPodMap[podID] = pod
+ } else if svc, ok := obj.(*corev1.Service); ok {
+ r.idSvcMap[id{namespace: svc.Namespace, name: svc.Name}] = svc
+ } else if endpoints, ok := obj.(*corev1.Endpoints); ok {
+ for _, subset := range endpoints.Subsets {
+ for _, address := range subset.Addresses {
+ r.ipSvcIdMap[address.IP] = id{
+ namespace: endpoints.ObjectMeta.Namespace,
+ name: endpoints.ObjectMeta.Name,
+ }
+ }
+ }
+ }
+}
+
+func (r registry) OnUpdate(oldObj, newObj interface{}) {
+ r.OnDelete(oldObj)
+ r.OnAdd(newObj)
+}
+
+func (r registry) OnDelete(obj interface{}) {
+ if pod, ok := obj.(*corev1.Pod); ok {
+ podID := id{namespace: pod.Namespace, name: pod.Name}
+ delete(r.podIdIpMap, podID)
+ delete(r.idPodMap, podID)
+ } else if svc, ok := obj.(*corev1.Service); ok {
+ delete(r.idSvcMap, id{namespace: svc.Namespace, name: svc.Name})
+ } else if endpoints, ok := obj.(*corev1.Endpoints); ok {
+ for _, subset := range endpoints.Subsets {
+ for _, address := range subset.Addresses {
+ delete(r.ipSvcIdMap, address.IP)
+ }
+ }
+ }
+}
+
+func (r *registry) Start() {
+ logger.Log.Debugf("starting registry")
+
+ for _, informer := range r.informers {
+ go informer.Run(r.stopCh)
+ }
+}
+
+func (r *registry) Stop() {
+ logger.Log.Debugf("stopping registry")
+
+ r.stopCh <- struct{}{}
+ close(r.stopCh)
+}
+
+type TemplateContext struct {
+ Service *corev1.Service
+ Pod *corev1.Pod
+ Event *corev1.Event
+}
+
+func (r *registry) GetContext(e *corev1.Event) TemplateContext {
+ result := TemplateContext{Event: e}
+
+ if obj := e.InvolvedObject; obj.Kind == "Pod" {
+ podID := id{
+ namespace: obj.Namespace,
+ name: obj.Name,
+ }
+ result.Pod = r.idPodMap[podID]
+ }
+
+ if obj := e.InvolvedObject; obj.Kind == "Service" {
+ svcID := id{
+ namespace: obj.Namespace,
+ name: obj.Name,
+ }
+ result.Service = r.idSvcMap[svcID]
+ }
+
+ return result
+}
+
+var Registry = ®istry{
+ stopCh: make(chan struct{}),
+}
+
+func (r *registry) Init() error {
+ logger.Log.Debugf("initializing template context registry")
+
+ config, err := GetConfig()
+ if err != nil {
+ return err
+ }
+ client := kubernetes.NewForConfigOrDie(config)
+ factory := informers.NewSharedInformerFactoryWithOptions(client, 0, informers.WithNamespace(corev1.NamespaceAll))
+
+ r.informers = []cache.SharedIndexInformer{
+ factory.Core().V1().Endpoints().Informer(),
+ factory.Core().V1().Services().Informer(),
+ factory.Core().V1().Pods().Informer(),
+ }
+
+ for _, informer := range Registry.informers {
+ informer.AddEventHandler(Registry)
+ }
+
+ return nil
+}
diff --git a/pkg/pipe/pipe.go b/pkg/pipe/pipe.go
index 4b258e0..5f32c00 100644
--- a/pkg/pipe/pipe.go
+++ b/pkg/pipe/pipe.go
@@ -80,6 +80,10 @@ func (p *Pipe) Init() error {
}
}
+ if err := k8s.Registry.Init(); err != nil {
+ return err
+ }
+
logger.Log.Debugf("pipe has been initialized")
return nil
@@ -88,6 +92,8 @@ func (p *Pipe) Init() error {
func (p *Pipe) Start() error {
p.Watcher.Start()
+ k8s.Registry.Start()
+
for _, wkfl := range p.workflows {
go wkfl.exporter.Export(wkfl.events)
}
@@ -120,6 +126,8 @@ func (p *Pipe) Stop() {
w.exporter.Stop()
}
+ k8s.Registry.Stop()
+
p.stopper <- struct{}{}
close(p.stopper)
}