You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pegasus.apache.org by "acelyc111 (via GitHub)" <gi...@apache.org> on 2023/05/15 08:32:55 UTC
[GitHub] [incubator-pegasus] acelyc111 commented on a diff in pull request #1466: feat: Aggregate table/server level metrics
acelyc111 commented on code in PR #1466:
URL: https://github.com/apache/incubator-pegasus/pull/1466#discussion_r1193509106
##########
collector/avail/detector.go:
##########
@@ -19,106 +19,122 @@ package avail
import (
"context"
- "sync/atomic"
+ "math/rand"
"time"
+ "github.com/apache/incubator-pegasus/go-client/admin"
"github.com/apache/incubator-pegasus/go-client/pegasus"
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/client_golang/prometheus/promauto"
+
log "github.com/sirupsen/logrus"
+ "github.com/spf13/viper"
+ "gopkg.in/tomb.v2"
)
// Detector periodically checks the service availability of the Pegasus cluster.
type Detector interface {
-
- // Start detection until the ctx cancelled. This method will block the current thread.
- Start(ctx context.Context) error
+ Start(tom *tomb.Tomb) error
}
// NewDetector returns a service-availability detector.
-func NewDetector(client pegasus.Client) Detector {
- return &pegasusDetector{client: client}
+func NewDetector(detectInterval time.Duration,
+ detectTimeout time.Duration, partitionCount int) Detector {
+ metaServers := viper.GetStringSlice("meta_servers")
+ tableName := viper.GetStringMapString("availablity_detect")["table_name"]
+ // Create detect table.
+ adminClient := admin.NewClient(admin.Config{MetaServers: metaServers})
+ error := adminClient.CreateTable(context.Background(), tableName, partitionCount)
+ if error != nil {
+ log.Errorf("Create detect table %s failed, error: %s", tableName, error)
+ }
+ pegasusClient := pegasus.NewClient(pegasus.Config{MetaServers: metaServers})
+ return &pegasusDetector{
+ client: pegasusClient,
+ detectTableName: tableName,
+ detectInterval: detectInterval,
+ detectTimeout: detectTimeout,
+ partitionCount: partitionCount,
+ }
}
-type pegasusDetector struct {
- // client reads and writes periodically to a specified table.
- client pegasus.Client
- detectTable pegasus.TableConnector
+var (
+ DetectTimes = promauto.NewCounter(prometheus.CounterOpts{
+ Name: "detect_times",
+ Help: "The times of availability detecting",
+ })
+
+ ReadFailureTimes = promauto.NewCounter(prometheus.CounterOpts{
+ Name: "read_failure_detect_times",
+ Help: "The failure times of read detecting",
+ })
+
+ WriteFailureTimes = promauto.NewCounter(prometheus.CounterOpts{
+ Name: "write_failure_detect_times",
+ Help: "The failure times of write detecting",
+ })
+)
- detectInterval time.Duration
+type pegasusDetector struct {
+ client pegasus.Client
+ detectTable pegasus.TableConnector
detectTableName string
-
- // timeout of a single detect
+ detectInterval time.Duration
+ // timeout of a single detect.
detectTimeout time.Duration
-
- detectHashKeys [][]byte
-
- recentMinuteDetectTimes uint64
- recentMinuteFailureTimes uint64
-
- recentHourDetectTimes uint64
- recentHourFailureTimes uint64
-
- recentDayDetectTimes uint64
- recentDayFailureTimes uint64
+ // partition count.
+ partitionCount int
}
-func (d *pegasusDetector) Start(rootCtx context.Context) error {
+func (d *pegasusDetector) Start(tom *tomb.Tomb) error {
var err error
- ctx, cancel := context.WithTimeout(rootCtx, 10*time.Second)
- defer cancel()
- d.detectTable, err = d.client.OpenTable(ctx, d.detectTableName)
+ // Open the detect table.
+ d.detectTable, err = d.client.OpenTable(context.Background(), d.detectTableName)
if err != nil {
+ log.Errorf("Open detect table %s failed, error: %s", d.detectTable, err)
return err
}
-
ticker := time.NewTicker(d.detectInterval)
for {
select {
- case <-rootCtx.Done(): // check if context cancelled
+ case <-tom.Dying():
return nil
case <-ticker.C:
- return nil
+ d.detectPartition()
+ break
default:
}
-
- // periodically set/get a configured Pegasus table.
- d.detect(ctx)
}
}
-func (d *pegasusDetector) detect(rootCtx context.Context) {
- // TODO(yingchun): doesn't work, just to mute lint errors.
- d.detectPartition(rootCtx, 1)
-}
-
-func (d *pegasusDetector) detectPartition(rootCtx context.Context, partitionIdx int) {
- d.incrDetectTimes()
+func (d *pegasusDetector) detectPartition() {
+ DetectTimes.Inc()
go func() {
- ctx, cancel := context.WithTimeout(rootCtx, d.detectTimeout)
+ ctx, cancel := context.WithTimeout(context.Background(), d.detectTimeout)
Review Comment:
How about add some latency metrics for read and write operations as well?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: dev-unsubscribe@pegasus.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@pegasus.apache.org
For additional commands, e-mail: dev-help@pegasus.apache.org