You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by li...@apache.org on 2021/06/29 11:50:53 UTC

[skywalking-satellite] 01/01: add event pipe

This is an automated email from the ASF dual-hosted git repository.

liujiapeng pushed a commit to branch add-event-pipe
in repository https://gitbox.apache.org/repos/asf/skywalking-satellite.git

commit 3dc4fbc9b2c397ab331a4fc4644427624feb626a
Author: Evan <ev...@outlook.com>
AuthorDate: Tue Jun 29 19:50:32 2021 +0800

    add event pipe
---
 configs/satellite_config.yaml                      |  24 ++++-
 .../en/setup/plugins/fetcher_prometheus_fetcher.md |  13 ---
 .../forwarder_nativeevent-grpc-forwarder.md        |   5 +
 docs/en/setup/plugins/plugin-list.md               |   2 +
 .../plugins/receiver_grpc-nativeevent-receiver.md  |   5 +
 docs/menu.yml                                      |   4 +
 internal/satellite/tools/generate_plugin_doc.go    |   4 +
 plugins/forwarder/forwarder_repository.go          |   2 +
 .../forwarder/grpc/nativeevent/sync_forwarder.go   | 107 +++++++++++++++++++++
 plugins/receiver/grpc/nativeevent/event_service.go |  57 +++++++++++
 plugins/receiver/grpc/nativeevent/receiver.go      |  61 ++++++++++++
 plugins/receiver/grpc/nativeevent/receiver_test.go |  70 ++++++++++++++
 plugins/receiver/receiver_repository.go            |   2 +
 13 files changed, 342 insertions(+), 14 deletions(-)

diff --git a/configs/satellite_config.yaml b/configs/satellite_config.yaml
index 7b53e35..4981fdf 100644
--- a/configs/satellite_config.yaml
+++ b/configs/satellite_config.yaml
@@ -180,4 +180,26 @@ pipes:
         plugin_name: none-fallbacker
       client_name: grpc-client
       forwarders:
-        - plugin_name: nativecds-grpc-forwarder
\ No newline at end of file
+        - plugin_name: nativecds-grpc-forwarder
+  - common_config:
+      pipe_name: eventpipe
+    gatherer:
+      server_name: "grpc-server"
+      receiver:
+        plugin_name: "grpc-nativeevent-receiver"
+      queue:
+        plugin_name: "memory-queue"
+    processor:
+      filters:
+    sender:
+      fallbacker:
+        plugin_name: none-fallbacker
+      # The time interval between two flush operations. And the time unit is millisecond.
+      flush_time: ${SATELLITE_TRACINGPIPE_SENDER_FLUSH_TIME:1000}
+      # The maximum buffer elements.
+      max_buffer_size: ${SATELLITE_TRACINGPIPE_SENDER_MAX_BUFFER_SIZE:200}
+      # The minimum flush elements.
+      min_flush_events: ${SATELLITE_TRACINGPIPE_SENDER_MIN_FLUSH_EVENTS:1}
+      client_name: grpc-client
+      forwarders:
+        - plugin_name: nativeevent-grpc-forwarder
\ No newline at end of file
diff --git a/docs/en/setup/plugins/fetcher_prometheus_fetcher.md b/docs/en/setup/plugins/fetcher_prometheus_fetcher.md
deleted file mode 100644
index 1dccdbe..0000000
--- a/docs/en/setup/plugins/fetcher_prometheus_fetcher.md
+++ /dev/null
@@ -1,13 +0,0 @@
-# Fetcher/prometheus-fetcher
-## Description
-This is a Prometheus fetcher for SkyWalking meter format, which is defined at https://github.com/apache/skywalking-data-collect-protocol/blob/master/language-agent/Meter.proto.
-## DefaultConfig
-```yaml
-## Prometheus scrape configure
-scrape_configs:
-  - job_name: 'prometheus'
-    metrics_path: '/metrics'
-    scrape_interval: 10s
-    static_configs:
-      - targets: ['127.0.0.1:2020']
-```
diff --git a/docs/en/setup/plugins/forwarder_nativeevent-grpc-forwarder.md b/docs/en/setup/plugins/forwarder_nativeevent-grpc-forwarder.md
new file mode 100755
index 0000000..ace06e8
--- /dev/null
+++ b/docs/en/setup/plugins/forwarder_nativeevent-grpc-forwarder.md
@@ -0,0 +1,5 @@
+# Forwarder/nativeevent-grpc-forwarder
+## Description
+This is a synchronization grpc forwarder with the SkyWalking native event protocol.
+## DefaultConfig
+```yaml```
diff --git a/docs/en/setup/plugins/plugin-list.md b/docs/en/setup/plugins/plugin-list.md
index b2f6bc7..dc1f67f 100755
--- a/docs/en/setup/plugins/plugin-list.md
+++ b/docs/en/setup/plugins/plugin-list.md
@@ -11,6 +11,7 @@
 - Forwarder
 	- [meter-grpc-forwarder](./forwarder_meter-grpc-forwarder.md)
 	- [nativecds-grpc-forwarder](./forwarder_nativecds-grpc-forwarder.md)
+	- [nativeevent-grpc-forwarder](./forwarder_nativeevent-grpc-forwarder.md)
 	- [nativelog-grpc-forwarder](./forwarder_nativelog-grpc-forwarder.md)
 	- [nativelog-kafka-forwarder](./forwarder_nativelog-kafka-forwarder.md)
 	- [nativemanagement-grpc-forwarder](./forwarder_nativemanagement-grpc-forwarder.md)
@@ -23,6 +24,7 @@
 	- [none-queue](./queue_none-queue.md)
 - Receiver
 	- [grpc-nativecds-receiver](./receiver_grpc-nativecds-receiver.md)
+	- [grpc-nativeevent-receiver](./receiver_grpc-nativeevent-receiver.md)
 	- [grpc-nativelog-receiver](./receiver_grpc-nativelog-receiver.md)
 	- [grpc-nativemanagement-receiver](./receiver_grpc-nativemanagement-receiver.md)
 	- [grpc-nativeprofile-receiver](./receiver_grpc-nativeprofile-receiver.md)
diff --git a/docs/en/setup/plugins/receiver_grpc-nativeevent-receiver.md b/docs/en/setup/plugins/receiver_grpc-nativeevent-receiver.md
new file mode 100755
index 0000000..5b0be81
--- /dev/null
+++ b/docs/en/setup/plugins/receiver_grpc-nativeevent-receiver.md
@@ -0,0 +1,5 @@
+# Receiver/grpc-nativeevent-receiver
+## Description
+This is a receiver for SkyWalking native meter format, which is defined at https://github.com/apache/skywalking-data-collect-protocol/blob/master/event/Event.proto.
+## DefaultConfig
+```yaml```
diff --git a/docs/menu.yml b/docs/menu.yml
index 9f7d2e9..fa8f23a 100644
--- a/docs/menu.yml
+++ b/docs/menu.yml
@@ -73,6 +73,8 @@ catalog:
                   path: /en/setup/plugins/forwarder_meter-grpc-forwarder
                 - name: nativecds-grpc-forwarder
                   path: /en/setup/plugins/forwarder_nativecds-grpc-forwarder
+                - name: nativeevent-grpc-forwarder
+                  path: /en/setup/plugins/forwarder_nativeevent-grpc-forwarder
                 - name: nativelog-grpc-forwarder
                   path: /en/setup/plugins/forwarder_nativelog-grpc-forwarder
                 - name: nativelog-kafka-forwarder
@@ -95,6 +97,8 @@ catalog:
               catalog:
                 - name: grpc-nativecds-receiver
                   path: /en/setup/plugins/receiver_grpc-nativecds-receiver
+                - name: grpc-nativeevent-receiver
+                  path: /en/setup/plugins/receiver_grpc-nativeevent-receiver
                 - name: grpc-nativelog-receiver
                   path: /en/setup/plugins/receiver_grpc-nativelog-receiver
                 - name: grpc-nativemanagement-receiver
diff --git a/internal/satellite/tools/generate_plugin_doc.go b/internal/satellite/tools/generate_plugin_doc.go
index 0d7b1e1..8a9d8a1 100644
--- a/internal/satellite/tools/generate_plugin_doc.go
+++ b/internal/satellite/tools/generate_plugin_doc.go
@@ -162,6 +162,10 @@ func writeDoc(doc []byte, docFileName string) error {
 }
 
 func createDir(path string) error {
+	err := os.RemoveAll(path)
+	if err != nil {
+		return err
+	}
 	fileInfo, err := os.Stat(path)
 	if os.IsNotExist(err) || fileInfo.Size() == 0 {
 		return os.Mkdir(path, os.ModePerm)
diff --git a/plugins/forwarder/forwarder_repository.go b/plugins/forwarder/forwarder_repository.go
index 3be46ba..7aa7469 100644
--- a/plugins/forwarder/forwarder_repository.go
+++ b/plugins/forwarder/forwarder_repository.go
@@ -22,6 +22,7 @@ import (
 
 	grpc_meter "github.com/apache/skywalking-satellite/plugins/forwarder/grpc/meter"
 	grpc_nativecds "github.com/apache/skywalking-satellite/plugins/forwarder/grpc/nativecds"
+	grpc_nativeevent "github.com/apache/skywalking-satellite/plugins/forwarder/grpc/nativeevent"
 	grpc_nativelog "github.com/apache/skywalking-satellite/plugins/forwarder/grpc/nativelog"
 	grpc_nativemanagement "github.com/apache/skywalking-satellite/plugins/forwarder/grpc/nativemanagement"
 	grpc_nativeprofile "github.com/apache/skywalking-satellite/plugins/forwarder/grpc/nativeprofile"
@@ -44,6 +45,7 @@ func RegisterForwarderPlugins() {
 		new(grpc_nativetracing.Forwarder),
 		new(grpc_nativeprofile.Forwarder),
 		new(grpc_nativecds.Forwarder),
+		new(grpc_nativeevent.Forwarder),
 	}
 	for _, forwarder := range forwarders {
 		plugin.RegisterPlugin(forwarder)
diff --git a/plugins/forwarder/grpc/nativeevent/sync_forwarder.go b/plugins/forwarder/grpc/nativeevent/sync_forwarder.go
new file mode 100644
index 0000000..da7005a
--- /dev/null
+++ b/plugins/forwarder/grpc/nativeevent/sync_forwarder.go
@@ -0,0 +1,107 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package nativetracing
+
+import (
+	"context"
+	"fmt"
+	"io"
+	"reflect"
+
+	"google.golang.org/grpc"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/config"
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+	"github.com/apache/skywalking-satellite/internal/satellite/event"
+
+	nativeevent "skywalking.apache.org/repo/goapi/collect/event/v3"
+	v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
+)
+
+const Name = "nativeevent-grpc-forwarder"
+
+type Forwarder struct {
+	config.CommonFields
+	client nativeevent.EventServiceClient
+}
+
+func (f *Forwarder) Name() string {
+	return Name
+}
+
+func (f *Forwarder) Description() string {
+	return "This is a synchronization grpc forwarder with the SkyWalking native event protocol."
+}
+
+func (f *Forwarder) DefaultConfig() string {
+	return ``
+}
+
+func (f *Forwarder) Prepare(connection interface{}) error {
+	client, ok := connection.(*grpc.ClientConn)
+	if !ok {
+		return fmt.Errorf("the %s is only accepet the grpc client, but receive a %s",
+			f.Name(), reflect.TypeOf(connection).String())
+	}
+	f.client = nativeevent.NewEventServiceClient(client)
+	return nil
+}
+
+func (f *Forwarder) Forward(batch event.BatchEvents) error {
+	stream, err := f.client.Collect(context.Background())
+	if err != nil {
+		log.Logger.Errorf("open grpc stream error %v", err)
+		return err
+	}
+	for _, e := range batch {
+		data, ok := e.GetData().(*v1.SniffData_Event)
+		if !ok {
+			continue
+		}
+		err := stream.Send(data.Event)
+		if err != nil {
+			log.Logger.Errorf("%s send log data error: %v", f.Name(), err)
+			err = closeStream(stream)
+			if err != nil {
+				log.Logger.Errorf("%s close stream error: %v", f.Name(), err)
+			}
+			return err
+		}
+	}
+	return closeStream(stream)
+}
+
+func closeStream(stream nativeevent.EventService_CollectClient) error {
+	_, err := stream.CloseAndRecv()
+	if err != nil && err != io.EOF {
+		return err
+	}
+	return nil
+}
+
+func (f *Forwarder) ForwardType() v1.SniffType {
+	return v1.SniffType_EventType
+}
+
+func (f *Forwarder) SyncForward(_ *v1.SniffData) (*v1.SniffData, error) {
+	return nil, fmt.Errorf("unsupport sync forward")
+}
+
+func (f *Forwarder) SupportedSyncInvoke() bool {
+	return false
+}
diff --git a/plugins/receiver/grpc/nativeevent/event_service.go b/plugins/receiver/grpc/nativeevent/event_service.go
new file mode 100644
index 0000000..7dfa7b4
--- /dev/null
+++ b/plugins/receiver/grpc/nativeevent/event_service.go
@@ -0,0 +1,57 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package nativeevent
+
+import (
+	"io"
+	"time"
+
+	common "skywalking.apache.org/repo/goapi/collect/common/v3"
+	nativeevent "skywalking.apache.org/repo/goapi/collect/event/v3"
+	v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
+)
+
+const eventName = "grpc-nativeevent-event"
+
+type EventService struct {
+	receiveChannel chan *v1.SniffData
+	nativeevent.UnimplementedEventServiceServer
+}
+
+func (e *EventService) Collect(stream nativeevent.EventService_CollectServer) error {
+	for {
+		item, err := stream.Recv()
+		if err == io.EOF {
+			return stream.SendAndClose(&common.Commands{})
+		}
+		if err != nil {
+			return err
+		}
+		d := &v1.SniffData{
+			Name:      eventName,
+			Timestamp: time.Now().UnixNano() / 1e6,
+			Meta:      nil,
+			Type:      v1.SniffType_EventType,
+			Remote:    true,
+			Data: &v1.SniffData_Event{
+				Event: item,
+			},
+		}
+		e.receiveChannel <- d
+	}
+}
diff --git a/plugins/receiver/grpc/nativeevent/receiver.go b/plugins/receiver/grpc/nativeevent/receiver.go
new file mode 100644
index 0000000..247df61
--- /dev/null
+++ b/plugins/receiver/grpc/nativeevent/receiver.go
@@ -0,0 +1,61 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package nativeevent
+
+import (
+	"github.com/apache/skywalking-satellite/internal/pkg/config"
+	module "github.com/apache/skywalking-satellite/internal/satellite/module/api"
+	"github.com/apache/skywalking-satellite/plugins/receiver/grpc"
+
+	nativeevent "skywalking.apache.org/repo/goapi/collect/event/v3"
+	v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
+)
+
+const Name = "grpc-nativeevent-receiver"
+
+type Receiver struct {
+	config.CommonFields
+	grpc.CommonGRPCReceiverFields
+	service *EventService
+}
+
+func (r *Receiver) Name() string {
+	return Name
+}
+
+func (r *Receiver) Description() string {
+	return "This is a receiver for SkyWalking native meter format, " +
+		"which is defined at https://github.com/apache/skywalking-data-collect-protocol/blob/master/event/Event.proto."
+}
+
+func (r *Receiver) DefaultConfig() string {
+	return ""
+}
+
+func (r *Receiver) RegisterHandler(server interface{}) {
+	r.CommonGRPCReceiverFields = *grpc.InitCommonGRPCReceiverFields(server)
+	r.service = &EventService{receiveChannel: r.OutputChannel}
+	nativeevent.RegisterEventServiceServer(r.Server, r.service)
+}
+
+func (r *Receiver) RegisterSyncInvoker(_ module.SyncInvoker) {
+}
+
+func (r *Receiver) Channel() <-chan *v1.SniffData {
+	return r.OutputChannel
+}
diff --git a/plugins/receiver/grpc/nativeevent/receiver_test.go b/plugins/receiver/grpc/nativeevent/receiver_test.go
new file mode 100644
index 0000000..a87472c
--- /dev/null
+++ b/plugins/receiver/grpc/nativeevent/receiver_test.go
@@ -0,0 +1,70 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package nativeevent
+
+import (
+	"context"
+	"strconv"
+	"testing"
+	"time"
+
+	"google.golang.org/grpc"
+
+	nativeevent "skywalking.apache.org/repo/goapi/collect/event/v3"
+	v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
+
+	_ "github.com/apache/skywalking-satellite/internal/satellite/test"
+	receiver_grpc "github.com/apache/skywalking-satellite/plugins/receiver/grpc"
+)
+
+func TestReceiver_RegisterHandler(t *testing.T) {
+	receiver_grpc.TestReceiver(new(Receiver), func(t *testing.T, sequence int, conn *grpc.ClientConn, ctx context.Context) string {
+		client := nativeevent.NewEventServiceClient(conn)
+		data := initData(sequence)
+		collect, err := client.Collect(ctx)
+		if err != nil {
+			t.Fatalf("cannot open the stream send mode: %v", err)
+		}
+		if err := collect.Send(data); err != nil {
+			t.Fatalf("cannot send the data to the server: %v", err)
+		}
+		if err := collect.CloseSend(); err != nil {
+			t.Fatalf("cannot close the stream mode: %v", err)
+		}
+		return data.String()
+	}, func(data *v1.SniffData) string {
+		return data.GetEvent().String()
+	}, t)
+}
+
+func initData(sequence int) *nativeevent.Event {
+	seq := strconv.Itoa(sequence)
+	return &nativeevent.Event{
+		StartTime: time.Now().Unix() / 1e6,
+		EndTime:   time.Now().Unix() / 1e6,
+		Uuid:      "12345" + seq,
+		Source: &nativeevent.Source{
+			Service:         "demo-service" + seq,
+			ServiceInstance: "demo-instance" + seq,
+			Endpoint:        "demo-endpoint" + seq,
+		},
+		Name:    "test-name" + seq,
+		Type:    nativeevent.Type_Error,
+		Message: "test message" + seq,
+	}
+}
diff --git a/plugins/receiver/receiver_repository.go b/plugins/receiver/receiver_repository.go
index d6ed000..386193e 100644
--- a/plugins/receiver/receiver_repository.go
+++ b/plugins/receiver/receiver_repository.go
@@ -23,6 +23,7 @@ import (
 	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
 	"github.com/apache/skywalking-satellite/plugins/receiver/api"
 	grpcnativecds "github.com/apache/skywalking-satellite/plugins/receiver/grpc/nativecds"
+	grpcnativeevent "github.com/apache/skywalking-satellite/plugins/receiver/grpc/nativeevent"
 	grpcnavtivelog "github.com/apache/skywalking-satellite/plugins/receiver/grpc/nativelog"
 	grpcnativemanagement "github.com/apache/skywalking-satellite/plugins/receiver/grpc/nativemanagement"
 	grpcnativeprofile "github.com/apache/skywalking-satellite/plugins/receiver/grpc/nativeprofile"
@@ -41,6 +42,7 @@ func RegisterReceiverPlugins() {
 		new(grpcnativeprofile.Receiver),
 		new(grpcnativecds.Receiver),
 		new(httpnavtivelog.Receiver),
+		new(grpcnativeevent.Receiver),
 	}
 	for _, receiver := range receivers {
 		plugin.RegisterPlugin(receiver)