You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pegasus.apache.org by "acelyc111 (via GitHub)" <gi...@apache.org> on 2023/05/15 08:32:55 UTC

[GitHub] [incubator-pegasus] acelyc111 commented on a diff in pull request #1466: feat: Aggregate table/server level metrics

acelyc111 commented on code in PR #1466:
URL: https://github.com/apache/incubator-pegasus/pull/1466#discussion_r1193509106


##########
collector/avail/detector.go:
##########
@@ -19,106 +19,122 @@ package avail
 
 import (
 	"context"
-	"sync/atomic"
+	"math/rand"
 	"time"
 
+	"github.com/apache/incubator-pegasus/go-client/admin"
 	"github.com/apache/incubator-pegasus/go-client/pegasus"
+	"github.com/prometheus/client_golang/prometheus"
+	"github.com/prometheus/client_golang/prometheus/promauto"
+
 	log "github.com/sirupsen/logrus"
+	"github.com/spf13/viper"
+	"gopkg.in/tomb.v2"
 )
 
 // Detector periodically checks the service availability of the Pegasus cluster.
 type Detector interface {
-
-	// Start detection until the ctx cancelled. This method will block the current thread.
-	Start(ctx context.Context) error
+	Start(tom *tomb.Tomb) error
 }
 
 // NewDetector returns a service-availability detector.
-func NewDetector(client pegasus.Client) Detector {
-	return &pegasusDetector{client: client}
+func NewDetector(detectInterval time.Duration,
+	detectTimeout time.Duration, partitionCount int) Detector {
+	metaServers := viper.GetStringSlice("meta_servers")
+	tableName := viper.GetStringMapString("availablity_detect")["table_name"]
+	// Create detect table.
+	adminClient := admin.NewClient(admin.Config{MetaServers: metaServers})
+	error := adminClient.CreateTable(context.Background(), tableName, partitionCount)
+	if error != nil {
+		log.Errorf("Create detect table %s failed, error: %s", tableName, error)
+	}
+	pegasusClient := pegasus.NewClient(pegasus.Config{MetaServers: metaServers})
+	return &pegasusDetector{
+		client:          pegasusClient,
+		detectTableName: tableName,
+		detectInterval:  detectInterval,
+		detectTimeout:   detectTimeout,
+		partitionCount:  partitionCount,
+	}
 }
 
-type pegasusDetector struct {
-	// client reads and writes periodically to a specified table.
-	client      pegasus.Client
-	detectTable pegasus.TableConnector
+var (
+	DetectTimes = promauto.NewCounter(prometheus.CounterOpts{
+		Name: "detect_times",
+		Help: "The times of availability detecting",
+	})
+
+	ReadFailureTimes = promauto.NewCounter(prometheus.CounterOpts{
+		Name: "read_failure_detect_times",
+		Help: "The failure times of read detecting",
+	})
+
+	WriteFailureTimes = promauto.NewCounter(prometheus.CounterOpts{
+		Name: "write_failure_detect_times",
+		Help: "The failure times of write detecting",
+	})
+)
 
-	detectInterval  time.Duration
+type pegasusDetector struct {
+	client          pegasus.Client
+	detectTable     pegasus.TableConnector
 	detectTableName string
-
-	// timeout of a single detect
+	detectInterval  time.Duration
+	// timeout of a single detect.
 	detectTimeout time.Duration
-
-	detectHashKeys [][]byte
-
-	recentMinuteDetectTimes  uint64
-	recentMinuteFailureTimes uint64
-
-	recentHourDetectTimes  uint64
-	recentHourFailureTimes uint64
-
-	recentDayDetectTimes  uint64
-	recentDayFailureTimes uint64
+	// partition count.
+	partitionCount int
 }
 
-func (d *pegasusDetector) Start(rootCtx context.Context) error {
+func (d *pegasusDetector) Start(tom *tomb.Tomb) error {
 	var err error
-	ctx, cancel := context.WithTimeout(rootCtx, 10*time.Second)
-	defer cancel()
-	d.detectTable, err = d.client.OpenTable(ctx, d.detectTableName)
+	// Open the detect table.
+	d.detectTable, err = d.client.OpenTable(context.Background(), d.detectTableName)
 	if err != nil {
+		log.Errorf("Open detect table %s failed, error: %s", d.detectTable, err)
 		return err
 	}
-
 	ticker := time.NewTicker(d.detectInterval)
 	for {
 		select {
-		case <-rootCtx.Done(): // check if context cancelled
+		case <-tom.Dying():
 			return nil
 		case <-ticker.C:
-			return nil
+			d.detectPartition()
+			break
 		default:
 		}
-
-		// periodically set/get a configured Pegasus table.
-		d.detect(ctx)
 	}
 }
 
-func (d *pegasusDetector) detect(rootCtx context.Context) {
-	// TODO(yingchun): doesn't work, just to mute lint errors.
-	d.detectPartition(rootCtx, 1)
-}
-
-func (d *pegasusDetector) detectPartition(rootCtx context.Context, partitionIdx int) {
-	d.incrDetectTimes()
+func (d *pegasusDetector) detectPartition() {
+	DetectTimes.Inc()
 
 	go func() {
-		ctx, cancel := context.WithTimeout(rootCtx, d.detectTimeout)
+		ctx, cancel := context.WithTimeout(context.Background(), d.detectTimeout)

Review Comment:
   How about add some latency metrics for read and write operations as well?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@pegasus.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@pegasus.apache.org
For additional commands, e-mail: dev-help@pegasus.apache.org