You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2021/06/04 02:31:59 UTC

[skywalking-satellite] branch main updated: feat: add meter forwarder plugin (#45)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-satellite.git


The following commit(s) were added to refs/heads/main by this push:
     new d0ecb8e  feat: add meter forwarder plugin (#45)
d0ecb8e is described below

commit d0ecb8e86e80433d777a470d4a1d84576e33be6d
Author: kv <gx...@163.com>
AuthorDate: Fri Jun 4 10:31:51 2021 +0800

    feat: add meter forwarder plugin (#45)
---
 CHANGES.md                                         |   1 +
 .../plugins/forwarder_meter-grpc-forwarder.md      |   5 ++
 docs/en/setup/plugins/plugin-list.md               |   1 +
 docs/menu.yml                                      |   6 ++
 plugins/forwarder/forwarder_repository.go          |   2 +
 plugins/forwarder/grpc/meter/sync_forwarder.go     | 100 +++++++++++++++++++++
 6 files changed, 115 insertions(+)

diff --git a/CHANGES.md b/CHANGES.md
index e599503..cd0bb3e 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -8,6 +8,7 @@ Release Notes.
 * Update protoc-gen-go version to 1.26.0.
 * Add grpc client plugin.
 * Add nativelog-grpc-forwarder plugin.
+* Add meter-grpc-forwarder plugin.
 
 #### Bug Fixes
 Fix the data race in mmap queue.
diff --git a/docs/en/setup/plugins/forwarder_meter-grpc-forwarder.md b/docs/en/setup/plugins/forwarder_meter-grpc-forwarder.md
new file mode 100644
index 0000000..4e70a54
--- /dev/null
+++ b/docs/en/setup/plugins/forwarder_meter-grpc-forwarder.md
@@ -0,0 +1,5 @@
+# Forwarder/meter-grpc-forwarder
+## Description
+This is a synchronization meter grpc forwarder with the SkyWalking meter protocol.
+## DefaultConfig
+```yaml```
\ No newline at end of file
diff --git a/docs/en/setup/plugins/plugin-list.md b/docs/en/setup/plugins/plugin-list.md
index 6760a9e..04064b7 100755
--- a/docs/en/setup/plugins/plugin-list.md
+++ b/docs/en/setup/plugins/plugin-list.md
@@ -10,6 +10,7 @@
 - Forwarder
 	- [nativelog-grpc-forwarder](./forwarder_nativelog-grpc-forwarder.md)
 	- [nativelog-kafka-forwarder](./forwarder_nativelog-kafka-forwarder.md)
+	- [meter-grpc-forwarder](./forwarder_meter-grpc-forwarder.md)
 - Parser
 - Queue
 	- [memory-queue](./queue_memory-queue.md)
diff --git a/docs/menu.yml b/docs/menu.yml
index dcf0055..9e4454b 100644
--- a/docs/menu.yml
+++ b/docs/menu.yml
@@ -57,6 +57,8 @@ catalog:
         catalog:
           - name: "client"
             catalog:
+              - name: "grpc-client"
+                path: "/en/setup/plugins/client_grpc-client"
               - name: "kafka-client"
                 path: "/en/setup/plugins/client_kafka-client"
           - name: "fallbacker"
@@ -67,8 +69,12 @@ catalog:
                 path: "/en/setup/plugins/fallbacker_timer-fallbacker"
           - name: "forwarder"
             catalog:
+              - name: "nativelog-grpc-forwarder"
+                path: "/en/setup/plugins/forwarder_nativelog-grpc-forwarder"
               - name: "nativelog-kafka-forwarder"
                 path: "/en/setup/plugins/forwarder_nativelog-kafka-forwarder"
+              - name: "meter-grpc-forwarder"
+                path: "/en/setup/plugins/forwarder_meter-grpc-forwarder"
           - name: "queue"
             catalog:
               - name: "memory-queue"
diff --git a/plugins/forwarder/forwarder_repository.go b/plugins/forwarder/forwarder_repository.go
index 87f9dc7..7fc8bb5 100644
--- a/plugins/forwarder/forwarder_repository.go
+++ b/plugins/forwarder/forwarder_repository.go
@@ -20,6 +20,7 @@ package forwarder
 import (
 	"reflect"
 
+	grpc_meter "github.com/apache/skywalking-satellite/plugins/forwarder/grpc/meter"
 	grpc_nativelog "github.com/apache/skywalking-satellite/plugins/forwarder/grpc/nativelog"
 	kafka_nativelog "github.com/apache/skywalking-satellite/plugins/forwarder/kafka/nativelog"
 
@@ -34,6 +35,7 @@ func RegisterForwarderPlugins() {
 		// Please register the forwarder plugins at here.
 		new(kafka_nativelog.Forwarder),
 		new(grpc_nativelog.Forwarder),
+		new(grpc_meter.Forwarder),
 	}
 	for _, forwarder := range forwarders {
 		plugin.RegisterPlugin(forwarder)
diff --git a/plugins/forwarder/grpc/meter/sync_forwarder.go b/plugins/forwarder/grpc/meter/sync_forwarder.go
new file mode 100644
index 0000000..0a52480
--- /dev/null
+++ b/plugins/forwarder/grpc/meter/sync_forwarder.go
@@ -0,0 +1,100 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package meter
+
+import (
+	"context"
+	"fmt"
+	"io"
+	"reflect"
+
+	"github.com/apache/skywalking-satellite/protocol/gen-codes/satellite/protocol"
+
+	"google.golang.org/grpc"
+
+	v3 "skywalking/network/language/agent/v3"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/config"
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+	"github.com/apache/skywalking-satellite/internal/satellite/event"
+)
+
+const Name = "meter-grpc-forwarder"
+
+type Forwarder struct {
+	config.CommonFields
+	meterClient v3.MeterReportServiceClient
+}
+
+func (f *Forwarder) Name() string {
+	return Name
+}
+
+func (f *Forwarder) Description() string {
+	return "This is a synchronization meter grpc forwarder with the SkyWalking meter protocol."
+}
+
+func (f *Forwarder) DefaultConfig() string {
+	return ``
+}
+
+func (f *Forwarder) Prepare(connection interface{}) error {
+	client, ok := connection.(*grpc.ClientConn)
+	if !ok {
+		return fmt.Errorf("the %s is only accepet the grpc client, but receive a %s",
+			f.Name(), reflect.TypeOf(connection).String())
+	}
+	f.meterClient = v3.NewMeterReportServiceClient(client)
+	return nil
+}
+
+func (f *Forwarder) Forward(batch event.BatchEvents) error {
+	stream, err := f.meterClient.Collect(context.Background())
+	if err != nil {
+		log.Logger.Errorf("open grpc stream error %v", err)
+		return err
+	}
+	for _, e := range batch {
+		data, ok := e.GetData().(*protocol.Event_Meter)
+		if !ok {
+			continue
+		}
+		err := stream.Send(data.Meter)
+		if err != nil {
+			log.Logger.Errorf("%s send meter data error: %v", f.Name(), err)
+			err = closeStream(stream)
+			if err != nil {
+				log.Logger.Errorf("%s close stream error: %v", f.Name(), err)
+			}
+			return err
+		}
+	}
+	return closeStream(stream)
+}
+
+func closeStream(stream v3.MeterReportService_CollectClient) error {
+	_, err := stream.CloseAndRecv()
+	if err != nil && err != io.EOF {
+		return err
+	}
+	return nil
+}
+
+func (f *Forwarder) ForwardType() protocol.EventType {
+	return protocol.EventType_MeterType
+}