You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2021/06/04 02:31:59 UTC
[skywalking-satellite] branch main updated: feat: add meter
forwarder plugin (#45)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-satellite.git
The following commit(s) were added to refs/heads/main by this push:
new d0ecb8e feat: add meter forwarder plugin (#45)
d0ecb8e is described below
commit d0ecb8e86e80433d777a470d4a1d84576e33be6d
Author: kv <gx...@163.com>
AuthorDate: Fri Jun 4 10:31:51 2021 +0800
feat: add meter forwarder plugin (#45)
---
CHANGES.md | 1 +
.../plugins/forwarder_meter-grpc-forwarder.md | 5 ++
docs/en/setup/plugins/plugin-list.md | 1 +
docs/menu.yml | 6 ++
plugins/forwarder/forwarder_repository.go | 2 +
plugins/forwarder/grpc/meter/sync_forwarder.go | 100 +++++++++++++++++++++
6 files changed, 115 insertions(+)
diff --git a/CHANGES.md b/CHANGES.md
index e599503..cd0bb3e 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -8,6 +8,7 @@ Release Notes.
* Update protoc-gen-go version to 1.26.0.
* Add grpc client plugin.
* Add nativelog-grpc-forwarder plugin.
+* Add meter-grpc-forwarder plugin.
#### Bug Fixes
Fix the data race in mmap queue.
diff --git a/docs/en/setup/plugins/forwarder_meter-grpc-forwarder.md b/docs/en/setup/plugins/forwarder_meter-grpc-forwarder.md
new file mode 100644
index 0000000..4e70a54
--- /dev/null
+++ b/docs/en/setup/plugins/forwarder_meter-grpc-forwarder.md
@@ -0,0 +1,5 @@
+# Forwarder/meter-grpc-forwarder
+## Description
+This is a synchronization meter grpc forwarder with the SkyWalking meter protocol.
+## DefaultConfig
+```yaml```
\ No newline at end of file
diff --git a/docs/en/setup/plugins/plugin-list.md b/docs/en/setup/plugins/plugin-list.md
index 6760a9e..04064b7 100755
--- a/docs/en/setup/plugins/plugin-list.md
+++ b/docs/en/setup/plugins/plugin-list.md
@@ -10,6 +10,7 @@
- Forwarder
- [nativelog-grpc-forwarder](./forwarder_nativelog-grpc-forwarder.md)
- [nativelog-kafka-forwarder](./forwarder_nativelog-kafka-forwarder.md)
+ - [meter-grpc-forwarder](./forwarder_meter-grpc-forwarder.md)
- Parser
- Queue
- [memory-queue](./queue_memory-queue.md)
diff --git a/docs/menu.yml b/docs/menu.yml
index dcf0055..9e4454b 100644
--- a/docs/menu.yml
+++ b/docs/menu.yml
@@ -57,6 +57,8 @@ catalog:
catalog:
- name: "client"
catalog:
+ - name: "grpc-client"
+ path: "/en/setup/plugins/client_grpc-client"
- name: "kafka-client"
path: "/en/setup/plugins/client_kafka-client"
- name: "fallbacker"
@@ -67,8 +69,12 @@ catalog:
path: "/en/setup/plugins/fallbacker_timer-fallbacker"
- name: "forwarder"
catalog:
+ - name: "nativelog-grpc-forwarder"
+ path: "/en/setup/plugins/forwarder_nativelog-grpc-forwarder"
- name: "nativelog-kafka-forwarder"
path: "/en/setup/plugins/forwarder_nativelog-kafka-forwarder"
+ - name: "meter-grpc-forwarder"
+ path: "/en/setup/plugins/forwarder_meter-grpc-forwarder"
- name: "queue"
catalog:
- name: "memory-queue"
diff --git a/plugins/forwarder/forwarder_repository.go b/plugins/forwarder/forwarder_repository.go
index 87f9dc7..7fc8bb5 100644
--- a/plugins/forwarder/forwarder_repository.go
+++ b/plugins/forwarder/forwarder_repository.go
@@ -20,6 +20,7 @@ package forwarder
import (
"reflect"
+ grpc_meter "github.com/apache/skywalking-satellite/plugins/forwarder/grpc/meter"
grpc_nativelog "github.com/apache/skywalking-satellite/plugins/forwarder/grpc/nativelog"
kafka_nativelog "github.com/apache/skywalking-satellite/plugins/forwarder/kafka/nativelog"
@@ -34,6 +35,7 @@ func RegisterForwarderPlugins() {
// Please register the forwarder plugins at here.
new(kafka_nativelog.Forwarder),
new(grpc_nativelog.Forwarder),
+ new(grpc_meter.Forwarder),
}
for _, forwarder := range forwarders {
plugin.RegisterPlugin(forwarder)
diff --git a/plugins/forwarder/grpc/meter/sync_forwarder.go b/plugins/forwarder/grpc/meter/sync_forwarder.go
new file mode 100644
index 0000000..0a52480
--- /dev/null
+++ b/plugins/forwarder/grpc/meter/sync_forwarder.go
@@ -0,0 +1,100 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package meter
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "reflect"
+
+ "github.com/apache/skywalking-satellite/protocol/gen-codes/satellite/protocol"
+
+ "google.golang.org/grpc"
+
+ v3 "skywalking/network/language/agent/v3"
+
+ "github.com/apache/skywalking-satellite/internal/pkg/config"
+ "github.com/apache/skywalking-satellite/internal/pkg/log"
+ "github.com/apache/skywalking-satellite/internal/satellite/event"
+)
+
+const Name = "meter-grpc-forwarder"
+
+type Forwarder struct {
+ config.CommonFields
+ meterClient v3.MeterReportServiceClient
+}
+
+func (f *Forwarder) Name() string {
+ return Name
+}
+
+func (f *Forwarder) Description() string {
+ return "This is a synchronization meter grpc forwarder with the SkyWalking meter protocol."
+}
+
+func (f *Forwarder) DefaultConfig() string {
+ return ``
+}
+
+func (f *Forwarder) Prepare(connection interface{}) error {
+ client, ok := connection.(*grpc.ClientConn)
+ if !ok {
+ return fmt.Errorf("the %s is only accepet the grpc client, but receive a %s",
+ f.Name(), reflect.TypeOf(connection).String())
+ }
+ f.meterClient = v3.NewMeterReportServiceClient(client)
+ return nil
+}
+
+func (f *Forwarder) Forward(batch event.BatchEvents) error {
+ stream, err := f.meterClient.Collect(context.Background())
+ if err != nil {
+ log.Logger.Errorf("open grpc stream error %v", err)
+ return err
+ }
+ for _, e := range batch {
+ data, ok := e.GetData().(*protocol.Event_Meter)
+ if !ok {
+ continue
+ }
+ err := stream.Send(data.Meter)
+ if err != nil {
+ log.Logger.Errorf("%s send meter data error: %v", f.Name(), err)
+ err = closeStream(stream)
+ if err != nil {
+ log.Logger.Errorf("%s close stream error: %v", f.Name(), err)
+ }
+ return err
+ }
+ }
+ return closeStream(stream)
+}
+
+func closeStream(stream v3.MeterReportService_CollectClient) error {
+ _, err := stream.CloseAndRecv()
+ if err != nil && err != io.EOF {
+ return err
+ }
+ return nil
+}
+
+func (f *Forwarder) ForwardType() protocol.EventType {
+ return protocol.EventType_MeterType
+}