You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2023/04/02 09:40:54 UTC

[skywalking-satellite] branch main updated: Enhance debuggability to help identify issues in the environment (#131)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-satellite.git


The following commit(s) were added to refs/heads/main by this push:
     new 8ac6289  Enhance debuggability to help identify issues in the environment (#131)
8ac6289 is described below

commit 8ac6289542eeb10a85af2b450f267156a22af50a
Author: mrproliu <74...@qq.com>
AuthorDate: Sun Apr 2 17:40:49 2023 +0800

    Enhance debuggability to help identify issues in the environment (#131)
---
 CHANGES.md                                         |   3 +
 configs/satellite_config.yaml                      |   8 +-
 docker/Dockerfile                                  |   2 +-
 .../examples/feature/telemetry-exporter/README.md  |  24 ++-
 internal/satellite/boot/boot.go                    |   1 +
 internal/satellite/module/sender/sender.go         |   8 +
 internal/satellite/telemetry/pprof/server.go       |  81 ++++++++++
 internal/satellite/telemetry/telemetry.go          | 163 ++++++++++++++++++++-
 plugins/client/grpc/lb/load_balancer.go            |  10 +-
 9 files changed, 285 insertions(+), 15 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index ce1713f..b622d42 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -5,6 +5,9 @@ Release Notes.
 1.2.0
 ------------------
 #### Features
+* Introduce `pprof` module.
+* Support export multiple `telemetry` service.
+* Update the base docker image.
 
 #### Bug Fixes
 * Fix [CVE-2022-41721](https://avd.aquasec.com/nvd/cve-2022-41721).
diff --git a/configs/satellite_config.yaml b/configs/satellite_config.yaml
index 0eec3e6..938dcf8 100644
--- a/configs/satellite_config.yaml
+++ b/configs/satellite_config.yaml
@@ -32,8 +32,8 @@ telemetry:
   service: ${SATELLITE_TELEMETRY_SERVICE:satellite-service}
   # The minimum running unit, such as the pod concept in the Kubernetes.
   instance: ${SATELLITE_TELEMETRY_INSTANCE:satellite-instance}
-  # Telemetry export type, support "prometheus", "metrics_service" or "none"
-  export_type: ${SATELLITE_TELEMETRY_EXPORT_TYPE:prometheus}
+  # Telemetry export type, support "prometheus", "metrics_service", "pprof" or "none", multiple split by ","
+  export_type: ${SATELLITE_TELEMETRY_EXPORT_TYPE:prometheus,pprof}
   # Export telemetry data through Prometheus server, only works on "export_type=prometheus".
   prometheus:
     # The prometheus server address.
@@ -48,6 +48,10 @@ telemetry:
     interval: ${SATELLITE_TELEMETRY_METRICS_SERVICE_INTERVAL:10}
     # The prefix of telemetry metric name
     metric_prefix: ${SATELLITE_TELEMETRY_METRICS_SERVICE_METRIC_PREFIX:sw_stl_}
+  # Export pprof service for detect performance issue
+  pprof:
+    # The pprof server address.
+    address: ${SATELLITE_TELEMETRY_PPROF_ADDRESS::6060}
 
 # The sharing plugins referenced by the specific plugins in the different pipes.
 sharing:
diff --git a/docker/Dockerfile b/docker/Dockerfile
index 7d638c5..683d400 100644
--- a/docker/Dockerfile
+++ b/docker/Dockerfile
@@ -27,7 +27,7 @@ COPY . .
 RUN VERSION=$VERSION make linux
 RUN mv /src/bin/skywalking-satellite-${VERSION}-linux-amd64 /src/bin/skywalking-satellite
 
-FROM scratch
+FROM debian
 
 VOLUME /skywalking/configs
 
diff --git a/docs/en/setup/examples/feature/telemetry-exporter/README.md b/docs/en/setup/examples/feature/telemetry-exporter/README.md
index da82ab7..929656b 100644
--- a/docs/en/setup/examples/feature/telemetry-exporter/README.md
+++ b/docs/en/setup/examples/feature/telemetry-exporter/README.md
@@ -1,6 +1,8 @@
 # Telemetry Exporter
 
-Satellite supports two ways to export its own telemetry data, `prometheus` or `metrics-service`.
+Satellite supports three ways to export its own telemetry data, `prometheus`, `metrics-service` or `pprof`.
+
+Multiple export methods are supported simultaneously, separated by commas.
 
 ## Prometheus
 
@@ -18,7 +20,7 @@ telemetry:
   service: ${SATELLITE_TELEMETRY_SERVICE:satellite-service}
   # The minimum running unit, such as the pod concept in the Kubernetes.
   instance: ${SATELLITE_TELEMETRY_SERVICE:satellite-instance}
-  # Telemetry export type, support "prometheus", "metrics_service" or "none"
+  # Telemetry export type, support "prometheus", "metrics_service", "pprof" or "none"
   export_type: ${SATELLITE_TELEMETRY_EXPORT_TYPE:prometheus}
   # Export telemetry data through Prometheus server, only works on "export_type=prometheus".
   prometheus:
@@ -45,7 +47,7 @@ telemetry:
   service: ${SATELLITE_TELEMETRY_SERVICE:satellite-service}
   # The minimum running unit, such as the pod concept in the Kubernetes.
   instance: ${SATELLITE_TELEMETRY_SERVICE:satellite-instance}
-  # Telemetry export type, support "prometheus", "metrics_service" or "none"
+  # Telemetry export type, support "prometheus", "metrics_service", "pprof" or "none"
   export_type: ${SATELLITE_TELEMETRY_EXPORT_TYPE:metrics_service}
   # Export telemetry data through native meter format to OAP backend, only works on "export_type=metrics_service".
   metrics_service:
@@ -55,4 +57,20 @@ telemetry:
     interval: ${SATELLITE_TELEMETRY_METRICS_SERVICE_INTERVAL:10}
     # The prefix of telemetry metric name
     metric_prefix: ${SATELLITE_TELEMETRY_METRICS_SERVICE_METRIC_PREFIX:sw_stl_}
+```
+
+## pprof
+
+pprof can provide HTTP services to allow remote viewing of service execution status, helping you discover performance issues.
+
+```xml
+
+# The Satellite self telemetry configuration.
+telemetry:
+  # Telemetry export type, support "prometheus", "metrics_service", "pprof" or "none"
+  export_type: ${SATELLITE_TELEMETRY_EXPORT_TYPE:pprof}
+  # Export pprof service for detect performance issue
+  pprof:
+    # The pprof server address.
+    address: ${SATELLITE_TELEMETRY_PPROF_ADDRESS::6060}
 ```
\ No newline at end of file
diff --git a/internal/satellite/boot/boot.go b/internal/satellite/boot/boot.go
index d441f0f..8e18b8d 100644
--- a/internal/satellite/boot/boot.go
+++ b/internal/satellite/boot/boot.go
@@ -43,6 +43,7 @@ import (
 	// import the telemetry implements
 	_ "github.com/apache/skywalking-satellite/internal/satellite/telemetry/metricservice"
 	_ "github.com/apache/skywalking-satellite/internal/satellite/telemetry/none"
+	_ "github.com/apache/skywalking-satellite/internal/satellite/telemetry/pprof"
 	_ "github.com/apache/skywalking-satellite/internal/satellite/telemetry/prometheus"
 )
 
diff --git a/internal/satellite/module/sender/sender.go b/internal/satellite/module/sender/sender.go
index 6bd1941..8409c85 100644
--- a/internal/satellite/module/sender/sender.go
+++ b/internal/satellite/module/sender/sender.go
@@ -119,6 +119,8 @@ func (s *Sender) store(ctx context.Context, partition int, wg *sync.WaitGroup) {
 		// blocking output when disconnecting.
 		if atomic.LoadInt32(&s.blocking) == 1 {
 			time.Sleep(100 * time.Millisecond)
+			log.Logger.WithField("pipe", s.config.PipeName).
+				Debugf("the client connection is disconnect, blocking the buffer")
 			continue
 		}
 		select {
@@ -245,6 +247,12 @@ func (s *Sender) consume(batch *buffer.BatchBuffer) {
 			if err := f.Forward(batchEvents); err == nil {
 				s.sendCounter.Add(float64(len(batchEvents)), s.config.PipeName, "success", f.ForwardType().String())
 				continue
+			} else {
+				log.Logger.WithFields(logrus.Fields{
+					"pipe":   s.config.PipeName,
+					"offset": batch.Last(),
+					"size":   len(batchEvents),
+				}).Warnf("forward event failure: %v", err)
 			}
 			if !s.runningFallbacker.FallBack(batchEvents, f.Forward) {
 				s.sendCounter.Add(float64(len(batchEvents)), s.config.PipeName, "failure", f.ForwardType().String())
diff --git a/internal/satellite/telemetry/pprof/server.go b/internal/satellite/telemetry/pprof/server.go
new file mode 100644
index 0000000..c80b67b
--- /dev/null
+++ b/internal/satellite/telemetry/pprof/server.go
@@ -0,0 +1,81 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pprof
+
+import (
+	"net/http"
+	"net/http/pprof"
+	"time"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+	"github.com/apache/skywalking-satellite/internal/satellite/telemetry"
+	"github.com/apache/skywalking-satellite/internal/satellite/telemetry/none"
+)
+
+func init() {
+	telemetry.Register("pprof", &Server{}, false)
+}
+
+type Server struct {
+	svr *http.Server
+}
+
+func (s *Server) Start(config *telemetry.Config) error {
+	mux := http.NewServeMux()
+	mux.HandleFunc("/debug/pprof/", pprof.Index)
+	mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
+	mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
+	mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
+	mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
+	s.svr = &http.Server{
+		Addr:              config.PProfService.Address,
+		ReadHeaderTimeout: 3 * time.Second,
+		Handler:           mux,
+	}
+	go func() {
+		log.Logger.WithField("addr", config.PProfService.Address).Debugf("start pprof server")
+		if err := s.svr.ListenAndServe(); err != nil {
+			log.Logger.WithField("addr", config.PProfService.Address).Warnf("starting pprof server failure: %v", err)
+		}
+	}()
+	return nil
+}
+
+func (s *Server) AfterSharingStart() error {
+	return nil
+}
+
+func (s *Server) Close() error {
+	return s.svr.Close()
+}
+
+func (s *Server) NewCounter(name, help string, labels ...string) telemetry.Counter {
+	return &none.Counter{}
+}
+
+func (s *Server) NewGauge(name, help string, getter func() float64, labels ...string) telemetry.Gauge {
+	return &none.Gauge{}
+}
+
+func (s *Server) NewDynamicGauge(name, help string, labels ...string) telemetry.DynamicGauge {
+	return &none.DynamicGauge{}
+}
+
+func (s *Server) NewTimer(name, help string, labels ...string) telemetry.Timer {
+	return &none.Timer{}
+}
diff --git a/internal/satellite/telemetry/telemetry.go b/internal/satellite/telemetry/telemetry.go
index 9b60183..27a7cf2 100644
--- a/internal/satellite/telemetry/telemetry.go
+++ b/internal/satellite/telemetry/telemetry.go
@@ -19,6 +19,10 @@ package telemetry
 
 import (
 	"fmt"
+	"strings"
+	"time"
+
+	"github.com/hashicorp/go-multierror"
 )
 
 var (
@@ -33,12 +37,14 @@ type Config struct {
 	Service  string `mapstructure:"service"`  // The service name.
 	Instance string `mapstructure:"instance"` // The instance name.
 
-	// Telemetry export type, support "prometheus", "metrics_service" or "none"
+	// Telemetry export type, support "prometheus", "metrics_service" or "none", multiple split by ","
 	ExportType string `mapstructure:"export_type"`
 	// Export telemetry data through Prometheus server, only works on "export_type=prometheus".
 	Prometheus PrometheusConfig `mapstructure:"prometheus"`
 	// Export telemetry data through native meter format to OAP backend, only works on "export_type=metrics_service".
 	MetricsService MetricsServiceConfig `mapstructure:"metrics_service"`
+	// Export pprof service for detect performance issue
+	PProfService PProfServiceConfig `mapstructure:"pprof"`
 }
 
 type PrometheusConfig struct {
@@ -52,6 +58,10 @@ type MetricsServiceConfig struct {
 	MetricPrefix string `mapstructure:"metric_prefix"` // The prefix of telemetry metric name
 }
 
+type PProfServiceConfig struct {
+	Address string `mapstructure:"address"` // The pprof server address.
+}
+
 type Server interface {
 	Start(config *Config) error
 	AfterSharingStart() error
@@ -72,12 +82,25 @@ func Register(name string, server Server, isDefault bool) {
 
 // Init create the global telemetry center according to the config.
 func Init(c *Config) error {
-	currentServer = servers[c.ExportType]
-	if currentServer == nil {
-		return fmt.Errorf("could not found telemetry exporter: %s", c.ExportType)
+	types := strings.Split(c.ExportType, ",")
+	exportServers := make([]Server, 0)
+	for _, t := range types {
+		server := servers[t]
+		if server == nil {
+			return fmt.Errorf("could not found telemetry exporter: %s", t)
+		}
+		if e := server.Start(c); e != nil {
+			return e
+		}
+		exportServers = append(exportServers, server)
 	}
 
-	return currentServer.Start(c)
+	if len(exportServers) > 1 {
+		currentServer = &MultipleServer{Servers: exportServers}
+	} else if len(exportServers) == 1 {
+		currentServer = exportServers[0]
+	}
+	return nil
 }
 
 func AfterShardingStart() error {
@@ -110,3 +133,133 @@ func getServer() Server {
 	}
 	return currentServer
 }
+
+type MultipleServer struct {
+	Servers []Server
+}
+
+func (s *MultipleServer) Start(config *Config) error {
+	var err error
+	for _, ss := range s.Servers {
+		if e := ss.Start(config); e != nil {
+			err = multierror.Append(err, e)
+		}
+	}
+	return err
+}
+
+func (s *MultipleServer) AfterSharingStart() error {
+	var err error
+	for _, ss := range s.Servers {
+		if e := ss.AfterSharingStart(); e != nil {
+			err = multierror.Append(err, e)
+		}
+	}
+	return err
+}
+
+func (s *MultipleServer) Close() error {
+	var err error
+	for _, ss := range s.Servers {
+		if e := ss.Close(); e != nil {
+			err = multierror.Append(err, e)
+		}
+	}
+	return err
+}
+
+func (s *MultipleServer) NewCounter(name, help string, labels ...string) Counter {
+	result := make([]Counter, 0)
+	for _, ss := range s.Servers {
+		result = append(result, ss.NewCounter(name, help, labels...))
+	}
+	return &MultipleCounter{Counters: result}
+}
+
+func (s *MultipleServer) NewGauge(name, help string, getter func() float64, labels ...string) Gauge {
+	result := make([]Gauge, 0)
+	for _, ss := range s.Servers {
+		result = append(result, ss.NewGauge(name, help, getter, labels...))
+	}
+	return &MultipleGauge{Gauges: result}
+}
+
+func (s *MultipleServer) NewDynamicGauge(name, help string, labels ...string) DynamicGauge {
+	result := make([]DynamicGauge, 0)
+	for _, ss := range s.Servers {
+		result = append(result, ss.NewDynamicGauge(name, help, labels...))
+	}
+	return &MultipleDynamicGauge{Gauges: result}
+}
+
+func (s *MultipleServer) NewTimer(name, help string, labels ...string) Timer {
+	result := make([]Timer, 0)
+	for _, ss := range s.Servers {
+		result = append(result, ss.NewTimer(name, help, labels...))
+	}
+	return &MultipleTimer{Timers: result}
+}
+
+type MultipleCounter struct {
+	Counters []Counter
+}
+
+func (m *MultipleCounter) Inc(labelValues ...string) {
+	for _, c := range m.Counters {
+		c.Inc(labelValues...)
+	}
+}
+
+func (m *MultipleCounter) Add(val float64, labelValues ...string) {
+	for _, c := range m.Counters {
+		c.Add(val, labelValues...)
+	}
+}
+
+type MultipleGauge struct {
+	Gauges []Gauge
+}
+
+type MultipleDynamicGauge struct {
+	Gauges []DynamicGauge
+}
+
+func (g *MultipleDynamicGauge) Inc(labelValues ...string) {
+	for _, ga := range g.Gauges {
+		ga.Inc(labelValues...)
+	}
+}
+
+func (g *MultipleDynamicGauge) Dec(labelValues ...string) {
+	for _, ga := range g.Gauges {
+		ga.Dec(labelValues...)
+	}
+}
+
+type MultipleTimer struct {
+	Timers []Timer
+}
+
+func (m *MultipleTimer) Start(labelValues ...string) TimeRecorder {
+	recorders := make([]TimeRecorder, 0)
+	for _, t := range m.Timers {
+		recorders = append(recorders, t.Start(labelValues...))
+	}
+	return &MultipleTimeRecorder{Recorders: recorders}
+}
+
+func (m *MultipleTimer) AddTime(t time.Duration, labelValues ...string) {
+	for _, ti := range m.Timers {
+		ti.AddTime(t, labelValues...)
+	}
+}
+
+type MultipleTimeRecorder struct {
+	Recorders []TimeRecorder
+}
+
+func (m *MultipleTimeRecorder) Stop() {
+	for _, r := range m.Recorders {
+		r.Stop()
+	}
+}
diff --git a/plugins/client/grpc/lb/load_balancer.go b/plugins/client/grpc/lb/load_balancer.go
index b1a62d0..ea6fe33 100644
--- a/plugins/client/grpc/lb/load_balancer.go
+++ b/plugins/client/grpc/lb/load_balancer.go
@@ -21,15 +21,14 @@ import (
 	"hash/crc32"
 	"sync"
 
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+
 	"google.golang.org/grpc/balancer"
 	"google.golang.org/grpc/balancer/base"
-	"google.golang.org/grpc/grpclog"
 )
 
 const Name = "satellite_lb"
 
-var logger = grpclog.Component(Name)
-
 func newBuilder() balancer.Builder {
 	return base.NewBalancerBuilder(Name, &satelliteDynamicPickerBuilder{}, base.Config{HealthCheck: true})
 }
@@ -42,7 +41,6 @@ type satelliteDynamicPickerBuilder struct {
 }
 
 func (s *satelliteDynamicPickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker {
-	logger.Infof("ready to build a new picker: %v", info)
 	if len(info.ReadySCs) == 0 {
 		return base.NewErrPicker(balancer.ErrNoSubConnAvailable)
 	}
@@ -81,6 +79,7 @@ type connectionWrap struct {
 func (s *satelliteDynamicPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
 	// only one connection
 	if s.connCount == 1 {
+		log.Logger.Debugf("pick the connection: %s", s.cons[0].addr)
 		return balancer.PickResult{SubConn: s.cons[0].conn}, nil
 	}
 
@@ -93,6 +92,7 @@ func (s *satelliteDynamicPicker) Pick(info balancer.PickInfo) (balancer.PickResu
 	// check exists appoint address
 	if config.appointAddr != "" {
 		if con := s.addrToConn[config.appointAddr]; con != nil {
+			log.Logger.Debugf("use the appoint connection: %s", config.appointAddr)
 			return balancer.PickResult{SubConn: con}, nil
 		}
 	}
@@ -102,6 +102,7 @@ func (s *satelliteDynamicPicker) Pick(info balancer.PickInfo) (balancer.PickResu
 	connWrap := s.cons[routeIndex]
 	// update the address to the config
 	config.appointAddr = connWrap.addr
+	log.Logger.Debugf("pick the connection: %s", connWrap.addr)
 	return balancer.PickResult{SubConn: connWrap.conn}, nil
 }
 
@@ -110,6 +111,7 @@ func (s *satelliteDynamicPicker) roundRobinConnection() balancer.PickResult {
 	sc := s.cons[s.next]
 	s.next = (s.next + 1) % s.connCount
 	s.mu.Unlock()
+	log.Logger.Debugf("pick the connection: %s", sc.addr)
 	return balancer.PickResult{SubConn: sc.conn}
 }