You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by li...@apache.org on 2021/06/29 08:22:13 UTC
[skywalking-satellite] 01/01: add jvm pipe
This is an automated email from the ASF dual-hosted git repository.
liujiapeng pushed a commit to branch add-jvm-pipe
in repository https://gitbox.apache.org/repos/asf/skywalking-satellite.git
commit ce094addf8e87c77984152bb9dbe43a1735bc0ad
Author: Evan <ev...@outlook.com>
AuthorDate: Tue Jun 29 16:20:36 2021 +0800
add jvm pipe
---
configs/satellite_config.yaml | 24 +++++-
.../plugins/forwarder_nativejvm-grpc-forwarder.md | 5 ++
docs/en/setup/plugins/plugin-list.md | 2 +
.../plugins/receiver_grpc-nativejvm-receiver.md | 5 ++
docs/menu.yml | 4 +
plugins/forwarder/forwarder_repository.go | 2 +
plugins/forwarder/grpc/nativejvm/sync_forwarder.go | 85 ++++++++++++++++++++
.../receiver/grpc/nativejvm/jvm_report_service.go | 49 ++++++++++++
plugins/receiver/grpc/nativejvm/receiver.go | 61 ++++++++++++++
plugins/receiver/grpc/nativejvm/receiver_test.go | 92 ++++++++++++++++++++++
plugins/receiver/receiver_repository.go | 2 +
11 files changed, 330 insertions(+), 1 deletion(-)
diff --git a/configs/satellite_config.yaml b/configs/satellite_config.yaml
index 7b53e35..d93892c 100644
--- a/configs/satellite_config.yaml
+++ b/configs/satellite_config.yaml
@@ -180,4 +180,26 @@ pipes:
plugin_name: none-fallbacker
client_name: grpc-client
forwarders:
- - plugin_name: nativecds-grpc-forwarder
\ No newline at end of file
+ - plugin_name: nativecds-grpc-forwarder
+ - common_config:
+ pipe_name: jvmpipe
+ gatherer:
+ server_name: "grpc-server"
+ receiver:
+ plugin_name: "grpc-nativejvm-receiver"
+ queue:
+ plugin_name: "memory-queue"
+ processor:
+ filters:
+ sender:
+ fallbacker:
+ plugin_name: none-fallbacker
+ # The time interval between two flush operations. And the time unit is millisecond.
+ flush_time: ${SATELLITE_TRACINGPIPE_SENDER_FLUSH_TIME:1000}
+ # The maximum buffer elements.
+ max_buffer_size: ${SATELLITE_TRACINGPIPE_SENDER_MAX_BUFFER_SIZE:200}
+ # The minimum flush elements.
+ min_flush_events: ${SATELLITE_TRACINGPIPE_SENDER_MIN_FLUSH_EVENTS:1}
+ client_name: grpc-client
+ forwarders:
+ - plugin_name: nativejvm-grpc-forwarder
\ No newline at end of file
diff --git a/docs/en/setup/plugins/forwarder_nativejvm-grpc-forwarder.md b/docs/en/setup/plugins/forwarder_nativejvm-grpc-forwarder.md
new file mode 100755
index 0000000..5999157
--- /dev/null
+++ b/docs/en/setup/plugins/forwarder_nativejvm-grpc-forwarder.md
@@ -0,0 +1,5 @@
+# Forwarder/nativejvm-grpc-forwarder
+## Description
+This is a synchronization grpc forwarder with the SkyWalking native Configuration Discovery Service protocol.
+## DefaultConfig
+```yaml```
diff --git a/docs/en/setup/plugins/plugin-list.md b/docs/en/setup/plugins/plugin-list.md
index b2f6bc7..a3d7833 100755
--- a/docs/en/setup/plugins/plugin-list.md
+++ b/docs/en/setup/plugins/plugin-list.md
@@ -11,6 +11,7 @@
- Forwarder
- [meter-grpc-forwarder](./forwarder_meter-grpc-forwarder.md)
- [nativecds-grpc-forwarder](./forwarder_nativecds-grpc-forwarder.md)
+ - [nativejvm-grpc-forwarder](./forwarder_nativejvm-grpc-forwarder.md)
- [nativelog-grpc-forwarder](./forwarder_nativelog-grpc-forwarder.md)
- [nativelog-kafka-forwarder](./forwarder_nativelog-kafka-forwarder.md)
- [nativemanagement-grpc-forwarder](./forwarder_nativemanagement-grpc-forwarder.md)
@@ -23,6 +24,7 @@
- [none-queue](./queue_none-queue.md)
- Receiver
- [grpc-nativecds-receiver](./receiver_grpc-nativecds-receiver.md)
+ - [grpc-nativejvm-receiver](./receiver_grpc-nativejvm-receiver.md)
- [grpc-nativelog-receiver](./receiver_grpc-nativelog-receiver.md)
- [grpc-nativemanagement-receiver](./receiver_grpc-nativemanagement-receiver.md)
- [grpc-nativeprofile-receiver](./receiver_grpc-nativeprofile-receiver.md)
diff --git a/docs/en/setup/plugins/receiver_grpc-nativejvm-receiver.md b/docs/en/setup/plugins/receiver_grpc-nativejvm-receiver.md
new file mode 100755
index 0000000..a5a8434
--- /dev/null
+++ b/docs/en/setup/plugins/receiver_grpc-nativejvm-receiver.md
@@ -0,0 +1,5 @@
+# Receiver/grpc-nativejvm-receiver
+## Description
+This is a receiver for SkyWalking native jvm format, which is defined at https://github.com/apache/skywalking-data-collect-protocol/blob/master/language-agent/JVMMetric.proto.
+## DefaultConfig
+```yaml```
diff --git a/docs/menu.yml b/docs/menu.yml
index 9f7d2e9..d0f6854 100644
--- a/docs/menu.yml
+++ b/docs/menu.yml
@@ -73,6 +73,8 @@ catalog:
path: /en/setup/plugins/forwarder_meter-grpc-forwarder
- name: nativecds-grpc-forwarder
path: /en/setup/plugins/forwarder_nativecds-grpc-forwarder
+ - name: nativejvm-grpc-forwarder
+ path: /en/setup/plugins/forwarder_nativejvm-grpc-forwarder
- name: nativelog-grpc-forwarder
path: /en/setup/plugins/forwarder_nativelog-grpc-forwarder
- name: nativelog-kafka-forwarder
@@ -95,6 +97,8 @@ catalog:
catalog:
- name: grpc-nativecds-receiver
path: /en/setup/plugins/receiver_grpc-nativecds-receiver
+ - name: grpc-nativejvm-receiver
+ path: /en/setup/plugins/receiver_grpc-nativejvm-receiver
- name: grpc-nativelog-receiver
path: /en/setup/plugins/receiver_grpc-nativelog-receiver
- name: grpc-nativemanagement-receiver
diff --git a/plugins/forwarder/forwarder_repository.go b/plugins/forwarder/forwarder_repository.go
index 3be46ba..bc6b181 100644
--- a/plugins/forwarder/forwarder_repository.go
+++ b/plugins/forwarder/forwarder_repository.go
@@ -22,6 +22,7 @@ import (
grpc_meter "github.com/apache/skywalking-satellite/plugins/forwarder/grpc/meter"
grpc_nativecds "github.com/apache/skywalking-satellite/plugins/forwarder/grpc/nativecds"
+ grpc_nativejvm "github.com/apache/skywalking-satellite/plugins/forwarder/grpc/nativejvm"
grpc_nativelog "github.com/apache/skywalking-satellite/plugins/forwarder/grpc/nativelog"
grpc_nativemanagement "github.com/apache/skywalking-satellite/plugins/forwarder/grpc/nativemanagement"
grpc_nativeprofile "github.com/apache/skywalking-satellite/plugins/forwarder/grpc/nativeprofile"
@@ -44,6 +45,7 @@ func RegisterForwarderPlugins() {
new(grpc_nativetracing.Forwarder),
new(grpc_nativeprofile.Forwarder),
new(grpc_nativecds.Forwarder),
+ new(grpc_nativejvm.Forwarder),
}
for _, forwarder := range forwarders {
plugin.RegisterPlugin(forwarder)
diff --git a/plugins/forwarder/grpc/nativejvm/sync_forwarder.go b/plugins/forwarder/grpc/nativejvm/sync_forwarder.go
new file mode 100644
index 0000000..1869dcb
--- /dev/null
+++ b/plugins/forwarder/grpc/nativejvm/sync_forwarder.go
@@ -0,0 +1,85 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package nativecds
+
+import (
+ "context"
+ "fmt"
+ "reflect"
+
+ "google.golang.org/grpc"
+
+ agent "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
+ v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
+
+ "github.com/apache/skywalking-satellite/internal/pkg/config"
+ "github.com/apache/skywalking-satellite/internal/satellite/event"
+)
+
+const Name = "nativejvm-grpc-forwarder"
+
+type Forwarder struct {
+ config.CommonFields
+
+ client agent.JVMMetricReportServiceClient
+}
+
+func (f *Forwarder) Name() string {
+ return Name
+}
+
+func (f *Forwarder) Description() string {
+ return "This is a synchronization grpc forwarder with the SkyWalking native Configuration Discovery Service protocol."
+}
+
+func (f *Forwarder) DefaultConfig() string {
+ return ``
+}
+
+func (f *Forwarder) Prepare(connection interface{}) error {
+ client, ok := connection.(*grpc.ClientConn)
+ if !ok {
+ return fmt.Errorf("the %s is only accepet the grpc client, but receive a %s",
+ f.Name(), reflect.TypeOf(connection).String())
+ }
+ f.client = agent.NewJVMMetricReportServiceClient(client)
+ return nil
+}
+
+func (f *Forwarder) Forward(batch event.BatchEvents) error {
+ for _, e := range batch {
+ jvm := e.GetJvm()
+ _, err := f.client.Collect(context.Background(), jvm)
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func (f *Forwarder) ForwardType() v1.SniffType {
+ return v1.SniffType_JVMMetricType
+}
+
+func (f *Forwarder) SyncForward(_ *v1.SniffData) (*v1.SniffData, error) {
+ return nil, fmt.Errorf("unsupport sync forward")
+}
+
+func (f *Forwarder) SupportedSyncInvoke() bool {
+ return false
+}
diff --git a/plugins/receiver/grpc/nativejvm/jvm_report_service.go b/plugins/receiver/grpc/nativejvm/jvm_report_service.go
new file mode 100644
index 0000000..22e56ad
--- /dev/null
+++ b/plugins/receiver/grpc/nativejvm/jvm_report_service.go
@@ -0,0 +1,49 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package nativelog
+
+import (
+ "context"
+ "time"
+
+ common "skywalking.apache.org/repo/goapi/collect/common/v3"
+ agent "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
+ v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
+)
+
+const eventName = "grpc-jvm-event"
+
+type JVMReportService struct {
+ receiveChannel chan *v1.SniffData
+ agent.UnimplementedJVMMetricReportServiceServer
+}
+
+func (j *JVMReportService) Collect(_ context.Context, jvm *agent.JVMMetricCollection) (*common.Commands, error) {
+ e := &v1.SniffData{
+ Name: eventName,
+ Timestamp: time.Now().UnixNano() / 1e6,
+ Meta: nil,
+ Type: v1.SniffType_JVMMetricType,
+ Remote: true,
+ Data: &v1.SniffData_Jvm{
+ Jvm: jvm,
+ },
+ }
+ j.receiveChannel <- e
+ return &common.Commands{}, nil
+}
diff --git a/plugins/receiver/grpc/nativejvm/receiver.go b/plugins/receiver/grpc/nativejvm/receiver.go
new file mode 100644
index 0000000..6a6fb61
--- /dev/null
+++ b/plugins/receiver/grpc/nativejvm/receiver.go
@@ -0,0 +1,61 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package nativelog
+
+import (
+ agent "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
+ v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
+
+ "github.com/apache/skywalking-satellite/internal/pkg/config"
+ module "github.com/apache/skywalking-satellite/internal/satellite/module/api"
+ grpcreceiver "github.com/apache/skywalking-satellite/plugins/receiver/grpc"
+)
+
+const Name = "grpc-nativejvm-receiver"
+
+type Receiver struct {
+ config.CommonFields
+ grpcreceiver.CommonGRPCReceiverFields
+ service *JVMReportService // The gRPC request handler for jvm data.
+}
+
+func (r *Receiver) Name() string {
+ return Name
+}
+
+func (r *Receiver) Description() string {
+ return "This is a receiver for SkyWalking native jvm format, " +
+ "which is defined at https://github.com/apache/skywalking-data-collect-protocol/blob/master/language-agent/JVMMetric.proto."
+}
+
+func (r *Receiver) DefaultConfig() string {
+ return ""
+}
+
+func (r *Receiver) RegisterHandler(server interface{}) {
+ r.CommonGRPCReceiverFields = *grpcreceiver.InitCommonGRPCReceiverFields(server)
+ r.service = &JVMReportService{receiveChannel: r.OutputChannel}
+ agent.RegisterJVMMetricReportServiceServer(r.Server, r.service)
+}
+
+func (r *Receiver) RegisterSyncInvoker(_ module.SyncInvoker) {
+}
+
+func (r *Receiver) Channel() <-chan *v1.SniffData {
+ return r.OutputChannel
+}
diff --git a/plugins/receiver/grpc/nativejvm/receiver_test.go b/plugins/receiver/grpc/nativejvm/receiver_test.go
new file mode 100644
index 0000000..25bc318
--- /dev/null
+++ b/plugins/receiver/grpc/nativejvm/receiver_test.go
@@ -0,0 +1,92 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package nativelog
+
+import (
+ "context"
+ "testing"
+ "time"
+
+ "google.golang.org/grpc"
+
+ common "skywalking.apache.org/repo/goapi/collect/common/v3"
+ agent "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
+ v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
+
+ _ "github.com/apache/skywalking-satellite/internal/satellite/test"
+ receiver_grpc "github.com/apache/skywalking-satellite/plugins/receiver/grpc"
+)
+
+func TestReceiver_RegisterHandler(t *testing.T) {
+ receiver_grpc.TestReceiver(new(Receiver), func(t *testing.T, sequence int, conn *grpc.ClientConn, ctx context.Context) string {
+ client := agent.NewJVMMetricReportServiceClient(conn)
+ data := initData()
+ _, err := client.Collect(ctx, data)
+ if err != nil {
+ t.Fatalf("cannot send data: %v", err)
+ }
+ return data.String()
+ }, func(data *v1.SniffData) string {
+ return data.GetJvm().String()
+ }, t)
+}
+
+func initData() *agent.JVMMetricCollection {
+ return &agent.JVMMetricCollection{
+ Service: "demo-service",
+ ServiceInstance: "demo-instance",
+ Metrics: []*agent.JVMMetric{
+ {
+ Time: time.Now().Unix() / 1e6,
+ Cpu: &common.CPU{
+ UsagePercent: 99.9,
+ },
+ Memory: []*agent.Memory{
+ {
+ IsHeap: true,
+ Init: 1,
+ Max: 2,
+ Used: 3,
+ Committed: 4,
+ },
+ },
+ MemoryPool: []*agent.MemoryPool{
+ {
+ Type: 0,
+ Init: 1,
+ Max: 2,
+ Used: 3,
+ Committed: 4,
+ },
+ },
+ Gc: []*agent.GC{
+ {
+ Phrase: 2,
+ Count: 3,
+ Time: 202106111010,
+ },
+ },
+ Thread: &agent.Thread{
+ LiveCount: 1,
+ PeakCount: 2,
+ DaemonCount: 3,
+ },
+ },
+ },
+ }
+}
diff --git a/plugins/receiver/receiver_repository.go b/plugins/receiver/receiver_repository.go
index d6ed000..aa6e7a4 100644
--- a/plugins/receiver/receiver_repository.go
+++ b/plugins/receiver/receiver_repository.go
@@ -23,6 +23,7 @@ import (
"github.com/apache/skywalking-satellite/internal/pkg/plugin"
"github.com/apache/skywalking-satellite/plugins/receiver/api"
grpcnativecds "github.com/apache/skywalking-satellite/plugins/receiver/grpc/nativecds"
+ grpcnativejvm "github.com/apache/skywalking-satellite/plugins/receiver/grpc/nativejvm"
grpcnavtivelog "github.com/apache/skywalking-satellite/plugins/receiver/grpc/nativelog"
grpcnativemanagement "github.com/apache/skywalking-satellite/plugins/receiver/grpc/nativemanagement"
grpcnativeprofile "github.com/apache/skywalking-satellite/plugins/receiver/grpc/nativeprofile"
@@ -41,6 +42,7 @@ func RegisterReceiverPlugins() {
new(grpcnativeprofile.Receiver),
new(grpcnativecds.Receiver),
new(httpnavtivelog.Receiver),
+ new(grpcnativejvm.Receiver),
}
for _, receiver := range receivers {
plugin.RegisterPlugin(receiver)