You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by li...@apache.org on 2021/06/29 08:22:12 UTC

[skywalking-satellite] branch add-jvm-pipe created (now ce094ad)

This is an automated email from the ASF dual-hosted git repository.

liujiapeng pushed a change to branch add-jvm-pipe
in repository https://gitbox.apache.org/repos/asf/skywalking-satellite.git.


      at ce094ad  add jvm pipe

This branch includes the following new commits:

     new ce094ad  add jvm pipe

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[skywalking-satellite] 01/01: add jvm pipe

Posted by li...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

liujiapeng pushed a commit to branch add-jvm-pipe
in repository https://gitbox.apache.org/repos/asf/skywalking-satellite.git

commit ce094addf8e87c77984152bb9dbe43a1735bc0ad
Author: Evan <ev...@outlook.com>
AuthorDate: Tue Jun 29 16:20:36 2021 +0800

    add jvm pipe
---
 configs/satellite_config.yaml                      | 24 +++++-
 .../plugins/forwarder_nativejvm-grpc-forwarder.md  |  5 ++
 docs/en/setup/plugins/plugin-list.md               |  2 +
 .../plugins/receiver_grpc-nativejvm-receiver.md    |  5 ++
 docs/menu.yml                                      |  4 +
 plugins/forwarder/forwarder_repository.go          |  2 +
 plugins/forwarder/grpc/nativejvm/sync_forwarder.go | 85 ++++++++++++++++++++
 .../receiver/grpc/nativejvm/jvm_report_service.go  | 49 ++++++++++++
 plugins/receiver/grpc/nativejvm/receiver.go        | 61 ++++++++++++++
 plugins/receiver/grpc/nativejvm/receiver_test.go   | 92 ++++++++++++++++++++++
 plugins/receiver/receiver_repository.go            |  2 +
 11 files changed, 330 insertions(+), 1 deletion(-)

diff --git a/configs/satellite_config.yaml b/configs/satellite_config.yaml
index 7b53e35..d93892c 100644
--- a/configs/satellite_config.yaml
+++ b/configs/satellite_config.yaml
@@ -180,4 +180,26 @@ pipes:
         plugin_name: none-fallbacker
       client_name: grpc-client
       forwarders:
-        - plugin_name: nativecds-grpc-forwarder
\ No newline at end of file
+        - plugin_name: nativecds-grpc-forwarder
+  - common_config:
+      pipe_name: jvmpipe
+    gatherer:
+      server_name: "grpc-server"
+      receiver:
+        plugin_name: "grpc-nativejvm-receiver"
+      queue:
+        plugin_name: "memory-queue"
+    processor:
+      filters:
+    sender:
+      fallbacker:
+        plugin_name: none-fallbacker
+      # The time interval between two flush operations. And the time unit is millisecond.
+      flush_time: ${SATELLITE_TRACINGPIPE_SENDER_FLUSH_TIME:1000}
+      # The maximum buffer elements.
+      max_buffer_size: ${SATELLITE_TRACINGPIPE_SENDER_MAX_BUFFER_SIZE:200}
+      # The minimum flush elements.
+      min_flush_events: ${SATELLITE_TRACINGPIPE_SENDER_MIN_FLUSH_EVENTS:1}
+      client_name: grpc-client
+      forwarders:
+        - plugin_name: nativejvm-grpc-forwarder
\ No newline at end of file
diff --git a/docs/en/setup/plugins/forwarder_nativejvm-grpc-forwarder.md b/docs/en/setup/plugins/forwarder_nativejvm-grpc-forwarder.md
new file mode 100755
index 0000000..5999157
--- /dev/null
+++ b/docs/en/setup/plugins/forwarder_nativejvm-grpc-forwarder.md
@@ -0,0 +1,5 @@
+# Forwarder/nativejvm-grpc-forwarder
+## Description
+This is a synchronization grpc forwarder with the SkyWalking native Configuration Discovery Service protocol.
+## DefaultConfig
+```yaml```
diff --git a/docs/en/setup/plugins/plugin-list.md b/docs/en/setup/plugins/plugin-list.md
index b2f6bc7..a3d7833 100755
--- a/docs/en/setup/plugins/plugin-list.md
+++ b/docs/en/setup/plugins/plugin-list.md
@@ -11,6 +11,7 @@
 - Forwarder
 	- [meter-grpc-forwarder](./forwarder_meter-grpc-forwarder.md)
 	- [nativecds-grpc-forwarder](./forwarder_nativecds-grpc-forwarder.md)
+	- [nativejvm-grpc-forwarder](./forwarder_nativejvm-grpc-forwarder.md)
 	- [nativelog-grpc-forwarder](./forwarder_nativelog-grpc-forwarder.md)
 	- [nativelog-kafka-forwarder](./forwarder_nativelog-kafka-forwarder.md)
 	- [nativemanagement-grpc-forwarder](./forwarder_nativemanagement-grpc-forwarder.md)
@@ -23,6 +24,7 @@
 	- [none-queue](./queue_none-queue.md)
 - Receiver
 	- [grpc-nativecds-receiver](./receiver_grpc-nativecds-receiver.md)
+	- [grpc-nativejvm-receiver](./receiver_grpc-nativejvm-receiver.md)
 	- [grpc-nativelog-receiver](./receiver_grpc-nativelog-receiver.md)
 	- [grpc-nativemanagement-receiver](./receiver_grpc-nativemanagement-receiver.md)
 	- [grpc-nativeprofile-receiver](./receiver_grpc-nativeprofile-receiver.md)
diff --git a/docs/en/setup/plugins/receiver_grpc-nativejvm-receiver.md b/docs/en/setup/plugins/receiver_grpc-nativejvm-receiver.md
new file mode 100755
index 0000000..a5a8434
--- /dev/null
+++ b/docs/en/setup/plugins/receiver_grpc-nativejvm-receiver.md
@@ -0,0 +1,5 @@
+# Receiver/grpc-nativejvm-receiver
+## Description
+This is a receiver for SkyWalking native jvm format, which is defined at https://github.com/apache/skywalking-data-collect-protocol/blob/master/language-agent/JVMMetric.proto.
+## DefaultConfig
+```yaml```
diff --git a/docs/menu.yml b/docs/menu.yml
index 9f7d2e9..d0f6854 100644
--- a/docs/menu.yml
+++ b/docs/menu.yml
@@ -73,6 +73,8 @@ catalog:
                   path: /en/setup/plugins/forwarder_meter-grpc-forwarder
                 - name: nativecds-grpc-forwarder
                   path: /en/setup/plugins/forwarder_nativecds-grpc-forwarder
+                - name: nativejvm-grpc-forwarder
+                  path: /en/setup/plugins/forwarder_nativejvm-grpc-forwarder
                 - name: nativelog-grpc-forwarder
                   path: /en/setup/plugins/forwarder_nativelog-grpc-forwarder
                 - name: nativelog-kafka-forwarder
@@ -95,6 +97,8 @@ catalog:
               catalog:
                 - name: grpc-nativecds-receiver
                   path: /en/setup/plugins/receiver_grpc-nativecds-receiver
+                - name: grpc-nativejvm-receiver
+                  path: /en/setup/plugins/receiver_grpc-nativejvm-receiver
                 - name: grpc-nativelog-receiver
                   path: /en/setup/plugins/receiver_grpc-nativelog-receiver
                 - name: grpc-nativemanagement-receiver
diff --git a/plugins/forwarder/forwarder_repository.go b/plugins/forwarder/forwarder_repository.go
index 3be46ba..bc6b181 100644
--- a/plugins/forwarder/forwarder_repository.go
+++ b/plugins/forwarder/forwarder_repository.go
@@ -22,6 +22,7 @@ import (
 
 	grpc_meter "github.com/apache/skywalking-satellite/plugins/forwarder/grpc/meter"
 	grpc_nativecds "github.com/apache/skywalking-satellite/plugins/forwarder/grpc/nativecds"
+	grpc_nativejvm "github.com/apache/skywalking-satellite/plugins/forwarder/grpc/nativejvm"
 	grpc_nativelog "github.com/apache/skywalking-satellite/plugins/forwarder/grpc/nativelog"
 	grpc_nativemanagement "github.com/apache/skywalking-satellite/plugins/forwarder/grpc/nativemanagement"
 	grpc_nativeprofile "github.com/apache/skywalking-satellite/plugins/forwarder/grpc/nativeprofile"
@@ -44,6 +45,7 @@ func RegisterForwarderPlugins() {
 		new(grpc_nativetracing.Forwarder),
 		new(grpc_nativeprofile.Forwarder),
 		new(grpc_nativecds.Forwarder),
+		new(grpc_nativejvm.Forwarder),
 	}
 	for _, forwarder := range forwarders {
 		plugin.RegisterPlugin(forwarder)
diff --git a/plugins/forwarder/grpc/nativejvm/sync_forwarder.go b/plugins/forwarder/grpc/nativejvm/sync_forwarder.go
new file mode 100644
index 0000000..1869dcb
--- /dev/null
+++ b/plugins/forwarder/grpc/nativejvm/sync_forwarder.go
@@ -0,0 +1,85 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package nativecds
+
+import (
+	"context"
+	"fmt"
+	"reflect"
+
+	"google.golang.org/grpc"
+
+	agent "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
+	v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/config"
+	"github.com/apache/skywalking-satellite/internal/satellite/event"
+)
+
+const Name = "nativejvm-grpc-forwarder"
+
+type Forwarder struct {
+	config.CommonFields
+
+	client agent.JVMMetricReportServiceClient
+}
+
+func (f *Forwarder) Name() string {
+	return Name
+}
+
+func (f *Forwarder) Description() string {
+	return "This is a synchronization grpc forwarder with the SkyWalking native Configuration Discovery Service protocol."
+}
+
+func (f *Forwarder) DefaultConfig() string {
+	return ``
+}
+
+func (f *Forwarder) Prepare(connection interface{}) error {
+	client, ok := connection.(*grpc.ClientConn)
+	if !ok {
+		return fmt.Errorf("the %s is only accepet the grpc client, but receive a %s",
+			f.Name(), reflect.TypeOf(connection).String())
+	}
+	f.client = agent.NewJVMMetricReportServiceClient(client)
+	return nil
+}
+
+func (f *Forwarder) Forward(batch event.BatchEvents) error {
+	for _, e := range batch {
+		jvm := e.GetJvm()
+		_, err := f.client.Collect(context.Background(), jvm)
+		if err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
+func (f *Forwarder) ForwardType() v1.SniffType {
+	return v1.SniffType_JVMMetricType
+}
+
+func (f *Forwarder) SyncForward(_ *v1.SniffData) (*v1.SniffData, error) {
+	return nil, fmt.Errorf("unsupport sync forward")
+}
+
+func (f *Forwarder) SupportedSyncInvoke() bool {
+	return false
+}
diff --git a/plugins/receiver/grpc/nativejvm/jvm_report_service.go b/plugins/receiver/grpc/nativejvm/jvm_report_service.go
new file mode 100644
index 0000000..22e56ad
--- /dev/null
+++ b/plugins/receiver/grpc/nativejvm/jvm_report_service.go
@@ -0,0 +1,49 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package nativelog
+
+import (
+	"context"
+	"time"
+
+	common "skywalking.apache.org/repo/goapi/collect/common/v3"
+	agent "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
+	v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
+)
+
+const eventName = "grpc-jvm-event"
+
+type JVMReportService struct {
+	receiveChannel chan *v1.SniffData
+	agent.UnimplementedJVMMetricReportServiceServer
+}
+
+func (j *JVMReportService) Collect(_ context.Context, jvm *agent.JVMMetricCollection) (*common.Commands, error) {
+	e := &v1.SniffData{
+		Name:      eventName,
+		Timestamp: time.Now().UnixNano() / 1e6,
+		Meta:      nil,
+		Type:      v1.SniffType_JVMMetricType,
+		Remote:    true,
+		Data: &v1.SniffData_Jvm{
+			Jvm: jvm,
+		},
+	}
+	j.receiveChannel <- e
+	return &common.Commands{}, nil
+}
diff --git a/plugins/receiver/grpc/nativejvm/receiver.go b/plugins/receiver/grpc/nativejvm/receiver.go
new file mode 100644
index 0000000..6a6fb61
--- /dev/null
+++ b/plugins/receiver/grpc/nativejvm/receiver.go
@@ -0,0 +1,61 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package nativelog
+
+import (
+	agent "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
+	v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/config"
+	module "github.com/apache/skywalking-satellite/internal/satellite/module/api"
+	grpcreceiver "github.com/apache/skywalking-satellite/plugins/receiver/grpc"
+)
+
+const Name = "grpc-nativejvm-receiver"
+
+type Receiver struct {
+	config.CommonFields
+	grpcreceiver.CommonGRPCReceiverFields
+	service *JVMReportService // The gRPC request handler for jvm data.
+}
+
+func (r *Receiver) Name() string {
+	return Name
+}
+
+func (r *Receiver) Description() string {
+	return "This is a receiver for SkyWalking native jvm format, " +
+		"which is defined at https://github.com/apache/skywalking-data-collect-protocol/blob/master/language-agent/JVMMetric.proto."
+}
+
+func (r *Receiver) DefaultConfig() string {
+	return ""
+}
+
+func (r *Receiver) RegisterHandler(server interface{}) {
+	r.CommonGRPCReceiverFields = *grpcreceiver.InitCommonGRPCReceiverFields(server)
+	r.service = &JVMReportService{receiveChannel: r.OutputChannel}
+	agent.RegisterJVMMetricReportServiceServer(r.Server, r.service)
+}
+
+func (r *Receiver) RegisterSyncInvoker(_ module.SyncInvoker) {
+}
+
+func (r *Receiver) Channel() <-chan *v1.SniffData {
+	return r.OutputChannel
+}
diff --git a/plugins/receiver/grpc/nativejvm/receiver_test.go b/plugins/receiver/grpc/nativejvm/receiver_test.go
new file mode 100644
index 0000000..25bc318
--- /dev/null
+++ b/plugins/receiver/grpc/nativejvm/receiver_test.go
@@ -0,0 +1,92 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package nativelog
+
+import (
+	"context"
+	"testing"
+	"time"
+
+	"google.golang.org/grpc"
+
+	common "skywalking.apache.org/repo/goapi/collect/common/v3"
+	agent "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
+	v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
+
+	_ "github.com/apache/skywalking-satellite/internal/satellite/test"
+	receiver_grpc "github.com/apache/skywalking-satellite/plugins/receiver/grpc"
+)
+
+func TestReceiver_RegisterHandler(t *testing.T) {
+	receiver_grpc.TestReceiver(new(Receiver), func(t *testing.T, sequence int, conn *grpc.ClientConn, ctx context.Context) string {
+		client := agent.NewJVMMetricReportServiceClient(conn)
+		data := initData()
+		_, err := client.Collect(ctx, data)
+		if err != nil {
+			t.Fatalf("cannot send data: %v", err)
+		}
+		return data.String()
+	}, func(data *v1.SniffData) string {
+		return data.GetJvm().String()
+	}, t)
+}
+
+func initData() *agent.JVMMetricCollection {
+	return &agent.JVMMetricCollection{
+		Service:         "demo-service",
+		ServiceInstance: "demo-instance",
+		Metrics: []*agent.JVMMetric{
+			{
+				Time: time.Now().Unix() / 1e6,
+				Cpu: &common.CPU{
+					UsagePercent: 99.9,
+				},
+				Memory: []*agent.Memory{
+					{
+						IsHeap:    true,
+						Init:      1,
+						Max:       2,
+						Used:      3,
+						Committed: 4,
+					},
+				},
+				MemoryPool: []*agent.MemoryPool{
+					{
+						Type:      0,
+						Init:      1,
+						Max:       2,
+						Used:      3,
+						Committed: 4,
+					},
+				},
+				Gc: []*agent.GC{
+					{
+						Phrase: 2,
+						Count:  3,
+						Time:   202106111010,
+					},
+				},
+				Thread: &agent.Thread{
+					LiveCount:   1,
+					PeakCount:   2,
+					DaemonCount: 3,
+				},
+			},
+		},
+	}
+}
diff --git a/plugins/receiver/receiver_repository.go b/plugins/receiver/receiver_repository.go
index d6ed000..aa6e7a4 100644
--- a/plugins/receiver/receiver_repository.go
+++ b/plugins/receiver/receiver_repository.go
@@ -23,6 +23,7 @@ import (
 	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
 	"github.com/apache/skywalking-satellite/plugins/receiver/api"
 	grpcnativecds "github.com/apache/skywalking-satellite/plugins/receiver/grpc/nativecds"
+	grpcnativejvm "github.com/apache/skywalking-satellite/plugins/receiver/grpc/nativejvm"
 	grpcnavtivelog "github.com/apache/skywalking-satellite/plugins/receiver/grpc/nativelog"
 	grpcnativemanagement "github.com/apache/skywalking-satellite/plugins/receiver/grpc/nativemanagement"
 	grpcnativeprofile "github.com/apache/skywalking-satellite/plugins/receiver/grpc/nativeprofile"
@@ -41,6 +42,7 @@ func RegisterReceiverPlugins() {
 		new(grpcnativeprofile.Receiver),
 		new(grpcnativecds.Receiver),
 		new(httpnavtivelog.Receiver),
+		new(grpcnativejvm.Receiver),
 	}
 	for _, receiver := range receivers {
 		plugin.RegisterPlugin(receiver)