You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by li...@apache.org on 2021/06/29 11:33:07 UTC

[skywalking-satellite] branch add-meter-pipe created (now fbc6250)

This is an automated email from the ASF dual-hosted git repository.

liujiapeng pushed a change to branch add-meter-pipe
in repository https://gitbox.apache.org/repos/asf/skywalking-satellite.git.


      at fbc6250  add meter pipe

This branch includes the following new commits:

     new fbc6250  add meter pipe

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[skywalking-satellite] 01/01: add meter pipe

Posted by li...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

liujiapeng pushed a commit to branch add-meter-pipe
in repository https://gitbox.apache.org/repos/asf/skywalking-satellite.git

commit fbc62502520fbec3f47fae17b8a0462df73c3138
Author: Evan <ev...@outlook.com>
AuthorDate: Tue Jun 29 19:32:37 2021 +0800

    add meter pipe
---
 configs/satellite_config.yaml                      | 24 ++++++-
 ....md => forwarder_nativemeter-grpc-forwarder.md} |  2 +-
 docs/en/setup/plugins/plugin-list.md               |  3 +-
 .../plugins/receiver_grpc-nativemeter-receiver.md  |  5 ++
 docs/menu.yml                                      |  6 +-
 plugins/forwarder/forwarder_repository.go          |  2 +-
 .../nativecds/{sync_forwarder.go => forwarder.go}  |  0
 .../nativelog/{sync_forwarder.go => forwarder.go}  |  0
 .../{sync_forwarder.go => forwarder.go}            |  0
 .../sync_forwarder.go => nativemeter/forwarder.go} |  4 +-
 .../{sync_forwarder.go => forwarder.go}            |  0
 .../{sync_forwarder.go => forwarder.go}            |  0
 .../nativelog/{sync_forwarder.go => forwarder.go}  |  0
 plugins/receiver/grpc/nativemeter/meter_service.go | 57 +++++++++++++++++
 plugins/receiver/grpc/nativemeter/receiver.go      | 61 ++++++++++++++++++
 plugins/receiver/grpc/nativemeter/receiver_test.go | 74 ++++++++++++++++++++++
 plugins/receiver/receiver_repository.go            |  2 +
 17 files changed, 232 insertions(+), 8 deletions(-)

diff --git a/configs/satellite_config.yaml b/configs/satellite_config.yaml
index 7b53e35..355a16c 100644
--- a/configs/satellite_config.yaml
+++ b/configs/satellite_config.yaml
@@ -180,4 +180,26 @@ pipes:
         plugin_name: none-fallbacker
       client_name: grpc-client
       forwarders:
-        - plugin_name: nativecds-grpc-forwarder
\ No newline at end of file
+        - plugin_name: nativecds-grpc-forwarder
+  - common_config:
+      pipe_name: meterpipe
+    gatherer:
+      server_name: "grpc-server"
+      receiver:
+        plugin_name: "grpc-nativemeter-receiver"
+      queue:
+        plugin_name: "memory-queue"
+    processor:
+      filters:
+    sender:
+      fallbacker:
+        plugin_name: none-fallbacker
+      # The time interval between two flush operations. And the time unit is millisecond.
+      flush_time: ${SATELLITE_TRACINGPIPE_SENDER_FLUSH_TIME:1000}
+      # The maximum buffer elements.
+      max_buffer_size: ${SATELLITE_TRACINGPIPE_SENDER_MAX_BUFFER_SIZE:200}
+      # The minimum flush elements.
+      min_flush_events: ${SATELLITE_TRACINGPIPE_SENDER_MIN_FLUSH_EVENTS:1}
+      client_name: grpc-client
+      forwarders:
+        - plugin_name: nativemeter-grpc-forwarder
\ No newline at end of file
diff --git a/docs/en/setup/plugins/forwarder_meter-grpc-forwarder.md b/docs/en/setup/plugins/forwarder_nativemeter-grpc-forwarder.md
similarity index 76%
rename from docs/en/setup/plugins/forwarder_meter-grpc-forwarder.md
rename to docs/en/setup/plugins/forwarder_nativemeter-grpc-forwarder.md
index 83005b0..3a8d083 100755
--- a/docs/en/setup/plugins/forwarder_meter-grpc-forwarder.md
+++ b/docs/en/setup/plugins/forwarder_nativemeter-grpc-forwarder.md
@@ -1,4 +1,4 @@
-# Forwarder/meter-grpc-forwarder
+# Forwarder/nativemeter-grpc-forwarder
 ## Description
 This is a synchronization meter grpc forwarder with the SkyWalking meter protocol.
 ## DefaultConfig
diff --git a/docs/en/setup/plugins/plugin-list.md b/docs/en/setup/plugins/plugin-list.md
index b2f6bc7..1041d6e 100755
--- a/docs/en/setup/plugins/plugin-list.md
+++ b/docs/en/setup/plugins/plugin-list.md
@@ -9,11 +9,11 @@
 	- [prometheus-metrics-fetcher](./fetcher_prometheus-metrics-fetcher.md)
 - Filter
 - Forwarder
-	- [meter-grpc-forwarder](./forwarder_meter-grpc-forwarder.md)
 	- [nativecds-grpc-forwarder](./forwarder_nativecds-grpc-forwarder.md)
 	- [nativelog-grpc-forwarder](./forwarder_nativelog-grpc-forwarder.md)
 	- [nativelog-kafka-forwarder](./forwarder_nativelog-kafka-forwarder.md)
 	- [nativemanagement-grpc-forwarder](./forwarder_nativemanagement-grpc-forwarder.md)
+	- [nativemeter-grpc-forwarder](./forwarder_nativemeter-grpc-forwarder.md)
 	- [nativeprofile-grpc-forwarder](./forwarder_nativeprofile-grpc-forwarder.md)
 	- [nativetracing-grpc-forwarder](./forwarder_nativetracing-grpc-forwarder.md)
 - Parser
@@ -25,6 +25,7 @@
 	- [grpc-nativecds-receiver](./receiver_grpc-nativecds-receiver.md)
 	- [grpc-nativelog-receiver](./receiver_grpc-nativelog-receiver.md)
 	- [grpc-nativemanagement-receiver](./receiver_grpc-nativemanagement-receiver.md)
+	- [grpc-nativemeter-receiver](./receiver_grpc-nativemeter-receiver.md)
 	- [grpc-nativeprofile-receiver](./receiver_grpc-nativeprofile-receiver.md)
 	- [grpc-nativetracing-receiver](./receiver_grpc-nativetracing-receiver.md)
 	- [http-nativelog-receiver](./receiver_http-nativelog-receiver.md)
diff --git a/docs/en/setup/plugins/receiver_grpc-nativemeter-receiver.md b/docs/en/setup/plugins/receiver_grpc-nativemeter-receiver.md
new file mode 100755
index 0000000..8988fcd
--- /dev/null
+++ b/docs/en/setup/plugins/receiver_grpc-nativemeter-receiver.md
@@ -0,0 +1,5 @@
+# Receiver/grpc-nativemeter-receiver
+## Description
+This is a receiver for SkyWalking native meter format, which is defined at https://github.com/apache/skywalking-data-collect-protocol/blob/master/event/Event.proto.
+## DefaultConfig
+```yaml```
diff --git a/docs/menu.yml b/docs/menu.yml
index 9f7d2e9..cff2164 100644
--- a/docs/menu.yml
+++ b/docs/menu.yml
@@ -69,8 +69,6 @@ catalog:
                   path: /en/setup/plugins/fetcher_prometheus-metrics-fetcher
             - name: forwarder
               catalog:
-                - name: meter-grpc-forwarder
-                  path: /en/setup/plugins/forwarder_meter-grpc-forwarder
                 - name: nativecds-grpc-forwarder
                   path: /en/setup/plugins/forwarder_nativecds-grpc-forwarder
                 - name: nativelog-grpc-forwarder
@@ -79,6 +77,8 @@ catalog:
                   path: /en/setup/plugins/forwarder_nativelog-kafka-forwarder
                 - name: nativemanagement-grpc-forwarder
                   path: /en/setup/plugins/forwarder_nativemanagement-grpc-forwarder
+                - name: nativemeter-grpc-forwarder
+                  path: /en/setup/plugins/forwarder_nativemeter-grpc-forwarder
                 - name: nativeprofile-grpc-forwarder
                   path: /en/setup/plugins/forwarder_nativeprofile-grpc-forwarder
                 - name: nativetracing-grpc-forwarder
@@ -99,6 +99,8 @@ catalog:
                   path: /en/setup/plugins/receiver_grpc-nativelog-receiver
                 - name: grpc-nativemanagement-receiver
                   path: /en/setup/plugins/receiver_grpc-nativemanagement-receiver
+                - name: grpc-nativemeter-receiver
+                  path: /en/setup/plugins/receiver_grpc-nativemeter-receiver
                 - name: grpc-nativeprofile-receiver
                   path: /en/setup/plugins/receiver_grpc-nativeprofile-receiver
                 - name: grpc-nativetracing-receiver
diff --git a/plugins/forwarder/forwarder_repository.go b/plugins/forwarder/forwarder_repository.go
index 3be46ba..ca02c2b 100644
--- a/plugins/forwarder/forwarder_repository.go
+++ b/plugins/forwarder/forwarder_repository.go
@@ -20,10 +20,10 @@ package forwarder
 import (
 	"reflect"
 
-	grpc_meter "github.com/apache/skywalking-satellite/plugins/forwarder/grpc/meter"
 	grpc_nativecds "github.com/apache/skywalking-satellite/plugins/forwarder/grpc/nativecds"
 	grpc_nativelog "github.com/apache/skywalking-satellite/plugins/forwarder/grpc/nativelog"
 	grpc_nativemanagement "github.com/apache/skywalking-satellite/plugins/forwarder/grpc/nativemanagement"
+	grpc_meter "github.com/apache/skywalking-satellite/plugins/forwarder/grpc/nativemeter"
 	grpc_nativeprofile "github.com/apache/skywalking-satellite/plugins/forwarder/grpc/nativeprofile"
 	grpc_nativetracing "github.com/apache/skywalking-satellite/plugins/forwarder/grpc/nativetracing"
 	kafka_nativelog "github.com/apache/skywalking-satellite/plugins/forwarder/kafka/nativelog"
diff --git a/plugins/forwarder/grpc/nativecds/sync_forwarder.go b/plugins/forwarder/grpc/nativecds/forwarder.go
similarity index 100%
rename from plugins/forwarder/grpc/nativecds/sync_forwarder.go
rename to plugins/forwarder/grpc/nativecds/forwarder.go
diff --git a/plugins/forwarder/grpc/nativelog/sync_forwarder.go b/plugins/forwarder/grpc/nativelog/forwarder.go
similarity index 100%
rename from plugins/forwarder/grpc/nativelog/sync_forwarder.go
rename to plugins/forwarder/grpc/nativelog/forwarder.go
diff --git a/plugins/forwarder/grpc/nativemanagement/sync_forwarder.go b/plugins/forwarder/grpc/nativemanagement/forwarder.go
similarity index 100%
rename from plugins/forwarder/grpc/nativemanagement/sync_forwarder.go
rename to plugins/forwarder/grpc/nativemanagement/forwarder.go
diff --git a/plugins/forwarder/grpc/meter/sync_forwarder.go b/plugins/forwarder/grpc/nativemeter/forwarder.go
similarity index 97%
rename from plugins/forwarder/grpc/meter/sync_forwarder.go
rename to plugins/forwarder/grpc/nativemeter/forwarder.go
index 5df933d..13ec551 100644
--- a/plugins/forwarder/grpc/meter/sync_forwarder.go
+++ b/plugins/forwarder/grpc/nativemeter/forwarder.go
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package meter
+package nativemeter
 
 import (
 	"context"
@@ -33,7 +33,7 @@ import (
 	"github.com/apache/skywalking-satellite/internal/satellite/event"
 )
 
-const Name = "meter-grpc-forwarder"
+const Name = "nativemeter-grpc-forwarder"
 
 type Forwarder struct {
 	config.CommonFields
diff --git a/plugins/forwarder/grpc/nativeprofile/sync_forwarder.go b/plugins/forwarder/grpc/nativeprofile/forwarder.go
similarity index 100%
rename from plugins/forwarder/grpc/nativeprofile/sync_forwarder.go
rename to plugins/forwarder/grpc/nativeprofile/forwarder.go
diff --git a/plugins/forwarder/grpc/nativetracing/sync_forwarder.go b/plugins/forwarder/grpc/nativetracing/forwarder.go
similarity index 100%
rename from plugins/forwarder/grpc/nativetracing/sync_forwarder.go
rename to plugins/forwarder/grpc/nativetracing/forwarder.go
diff --git a/plugins/forwarder/kafka/nativelog/sync_forwarder.go b/plugins/forwarder/kafka/nativelog/forwarder.go
similarity index 100%
rename from plugins/forwarder/kafka/nativelog/sync_forwarder.go
rename to plugins/forwarder/kafka/nativelog/forwarder.go
diff --git a/plugins/receiver/grpc/nativemeter/meter_service.go b/plugins/receiver/grpc/nativemeter/meter_service.go
new file mode 100644
index 0000000..51415d3
--- /dev/null
+++ b/plugins/receiver/grpc/nativemeter/meter_service.go
@@ -0,0 +1,57 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package nativeevent
+
+import (
+	"io"
+	"time"
+
+	common "skywalking.apache.org/repo/goapi/collect/common/v3"
+	meter "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
+	v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
+)
+
+const eventName = "grpc-nativemeter-event"
+
+type MeterService struct {
+	receiveChannel chan *v1.SniffData
+	meter.UnimplementedMeterReportServiceServer
+}
+
+func (m *MeterService) Collect(stream meter.MeterReportService_CollectServer) error {
+	for {
+		item, err := stream.Recv()
+		if err == io.EOF {
+			return stream.SendAndClose(&common.Commands{})
+		}
+		if err != nil {
+			return err
+		}
+		d := &v1.SniffData{
+			Name:      eventName,
+			Timestamp: time.Now().UnixNano() / 1e6,
+			Meta:      nil,
+			Type:      v1.SniffType_MeterType,
+			Remote:    true,
+			Data: &v1.SniffData_Meter{
+				Meter: item,
+			},
+		}
+		m.receiveChannel <- d
+	}
+}
diff --git a/plugins/receiver/grpc/nativemeter/receiver.go b/plugins/receiver/grpc/nativemeter/receiver.go
new file mode 100644
index 0000000..346a230
--- /dev/null
+++ b/plugins/receiver/grpc/nativemeter/receiver.go
@@ -0,0 +1,61 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package nativeevent
+
+import (
+	"github.com/apache/skywalking-satellite/internal/pkg/config"
+	module "github.com/apache/skywalking-satellite/internal/satellite/module/api"
+	"github.com/apache/skywalking-satellite/plugins/receiver/grpc"
+
+	meter "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
+	v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
+)
+
+const Name = "grpc-nativemeter-receiver"
+
+type Receiver struct {
+	config.CommonFields
+	grpc.CommonGRPCReceiverFields
+	service *MeterService
+}
+
+func (r *Receiver) Name() string {
+	return Name
+}
+
+func (r *Receiver) Description() string {
+	return "This is a receiver for SkyWalking native meter format, " +
+		"which is defined at https://github.com/apache/skywalking-data-collect-protocol/blob/master/event/Event.proto."
+}
+
+func (r *Receiver) DefaultConfig() string {
+	return ""
+}
+
+func (r *Receiver) RegisterHandler(server interface{}) {
+	r.CommonGRPCReceiverFields = *grpc.InitCommonGRPCReceiverFields(server)
+	r.service = &MeterService{receiveChannel: r.OutputChannel}
+	meter.RegisterMeterReportServiceServer(r.Server, r.service)
+}
+
+func (r *Receiver) RegisterSyncInvoker(_ module.SyncInvoker) {
+}
+
+func (r *Receiver) Channel() <-chan *v1.SniffData {
+	return r.OutputChannel
+}
diff --git a/plugins/receiver/grpc/nativemeter/receiver_test.go b/plugins/receiver/grpc/nativemeter/receiver_test.go
new file mode 100644
index 0000000..91c6a2d
--- /dev/null
+++ b/plugins/receiver/grpc/nativemeter/receiver_test.go
@@ -0,0 +1,74 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package nativeevent
+
+import (
+	"context"
+	"strconv"
+	"testing"
+	"time"
+
+	"google.golang.org/grpc"
+
+	meter "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
+	v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
+
+	_ "github.com/apache/skywalking-satellite/internal/satellite/test"
+	receiver_grpc "github.com/apache/skywalking-satellite/plugins/receiver/grpc"
+)
+
+func TestReceiver_RegisterHandler(t *testing.T) {
+	receiver_grpc.TestReceiver(new(Receiver), func(t *testing.T, sequence int, conn *grpc.ClientConn, ctx context.Context) string {
+		client := meter.NewMeterReportServiceClient(conn)
+		data := initData(sequence)
+		collect, err := client.Collect(ctx)
+		if err != nil {
+			t.Fatalf("cannot open the stream send mode: %v", err)
+		}
+		if err := collect.Send(data); err != nil {
+			t.Fatalf("cannot send the data to the server: %v", err)
+		}
+		if err := collect.CloseSend(); err != nil {
+			t.Fatalf("cannot close the stream mode: %v", err)
+		}
+		return data.String()
+	}, func(data *v1.SniffData) string {
+		return data.GetMeter().String()
+	}, t)
+}
+
+func initData(sequence int) *meter.MeterData {
+	seq := strconv.Itoa(sequence)
+	return &meter.MeterData{
+		Service:         "demo-service" + seq,
+		ServiceInstance: "demo-instance" + seq,
+		Timestamp:       time.Now().Unix() / 1e6,
+		Metric: &meter.MeterData_SingleValue{
+			SingleValue: &meter.MeterSingleValue{
+				Name:  "name" + seq,
+				Value: float64(sequence),
+				Labels: []*meter.Label{
+					{
+						Name:  "label-name" + seq,
+						Value: "label-value" + seq,
+					},
+				},
+			},
+		},
+	}
+}
diff --git a/plugins/receiver/receiver_repository.go b/plugins/receiver/receiver_repository.go
index d6ed000..5cbc52c 100644
--- a/plugins/receiver/receiver_repository.go
+++ b/plugins/receiver/receiver_repository.go
@@ -25,6 +25,7 @@ import (
 	grpcnativecds "github.com/apache/skywalking-satellite/plugins/receiver/grpc/nativecds"
 	grpcnavtivelog "github.com/apache/skywalking-satellite/plugins/receiver/grpc/nativelog"
 	grpcnativemanagement "github.com/apache/skywalking-satellite/plugins/receiver/grpc/nativemanagement"
+	grpcnativemeter "github.com/apache/skywalking-satellite/plugins/receiver/grpc/nativemeter"
 	grpcnativeprofile "github.com/apache/skywalking-satellite/plugins/receiver/grpc/nativeprofile"
 	grpcnativetracing "github.com/apache/skywalking-satellite/plugins/receiver/grpc/nativetracing"
 	httpnavtivelog "github.com/apache/skywalking-satellite/plugins/receiver/http/nativcelog"
@@ -41,6 +42,7 @@ func RegisterReceiverPlugins() {
 		new(grpcnativeprofile.Receiver),
 		new(grpcnativecds.Receiver),
 		new(httpnavtivelog.Receiver),
+		new(grpcnativemeter.Receiver),
 	}
 	for _, receiver := range receivers {
 		plugin.RegisterPlugin(receiver)