You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pegasus.apache.org by "xinghuayu007 (via GitHub)" <gi...@apache.org> on 2023/05/05 09:02:39 UTC

[GitHub] [incubator-pegasus] xinghuayu007 opened a new pull request, #1466: [Collector] Aggregate table/server level metrics

xinghuayu007 opened a new pull request, #1466:
URL: https://github.com/apache/incubator-pegasus/pull/1466

   ### What problem does this PR solve? <!--add issue link with summary if exists-->
   
   
   ### What is changed and how does it work?
   
   
   ### Checklist <!--REMOVE the items that are not applicable-->
   
   ##### Tests <!-- At least one of them must be included. -->
   
   - Unit test
   - Integration test
   - Manual test (add detailed scripts or steps below)
   - No code
   
   ##### Code changes
   
   - Has exported function/method change
   - Has exported variable/fields change
   - Has interface methods change
   - Has persistent data change
   
   ##### Side effects
   
   - Possible performance regression
   - Increased code complexity
   - Breaking backward compatibility
   
   ##### Related changes
   
   - Need to cherry-pick to the release branch
   - Need to update the documentation
   - Need to be included in the release note
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@pegasus.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@pegasus.apache.org
For additional commands, e-mail: dev-help@pegasus.apache.org


[GitHub] [incubator-pegasus] acelyc111 commented on a diff in pull request #1466: [Collector] Aggregate table/server level metrics

Posted by "acelyc111 (via GitHub)" <gi...@apache.org>.
acelyc111 commented on code in PR #1466:
URL: https://github.com/apache/incubator-pegasus/pull/1466#discussion_r1185896498


##########
collector/metrics/metric_collector.go:
##########
@@ -0,0 +1,354 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package metrics
+
+import (
+	"errors"
+	"time"
+	"fmt"
+	"io/ioutil"
+	"net/http"
+
+	log "github.com/sirupsen/logrus"
+	"github.com/spf13/viper"
+	"gopkg.in/tomb.v2"
+	"github.com/tidwall/gjson"
+	"github.com/prometheus/client_golang/prometheus"
+    "github.com/prometheus/client_golang/prometheus/promauto"
+)
+
+const (
+	META_SERVER int = 0
+	REPLICA_SERVER int = 1
+)
+
+type Metric struct {
+	name string
+	value int64
+	mtype string
+	desc string
+	unit string
+}
+
+type Metrics []Metric
+
+type Attribute struct {
+	name string
+	value string
+}
+
+type Entity struct {
+	etype string
+	eid string
+	attributes string
+	metrics []Metric
+}
+
+var gaugeMetricsMap_ map[string]prometheus.GaugeVec
+var counterMetricsMap_ map[string]prometheus.CounterVec
+var histogramMetricsMap_ map[string]prometheus.Histogram
+var summaryMetricsMap_ map[string]prometheus.Summary
+
+var dataSource_ int
+
+type MetricCollector interface {
+	Start(tom *tomb.Tomb) error
+}
+
+func NewMetricCollector(dataSource int) MetricCollector {
+	dataSource_ = dataSource
+	gaugeMetricsMap_ = make(map[string]prometheus.GaugeVec, 128)
+	counterMetricsMap_ = make(map[string]prometheus.CounterVec, 128)
+	histogramMetricsMap_ = make(map[string]prometheus.Histogram, 128)
+	summaryMetricsMap_ = make(map[string]prometheus.Summary, 128)
+	initMetrics()
+	return &Collector{detectInterval: 10, detectTimeout: 10}
+}
+
+type Collector struct {
+	detectInterval  time.Duration
+	detectTimeout time.Duration
+}
+
+func (collector *Collector) Start(tom *tomb.Tomb) error {
+	ticker := time.NewTicker(collector.detectInterval)
+	for {
+		select {
+			case <-tom.Dying():
+				return nil
+			case <-ticker.C:
+				processAllMetaServerMetrics()
+				break
+			default:
+		}
+	}
+}
+
+func initMetrics() {
+	var addrs []string
+	if dataSource_ == META_SERVER {
+		addrs = viper.GetStringSlice("meta_servers")
+	} else {
+		addrs = viper.GetStringSlice("replica_servers")

Review Comment:
   Generally, the replica servers list is not exposed like meta servers list, we can fetch replica servers list via meta servers instead, by HTTP `curl localhost:34601/meta/nodes`
   
   Further more, we have to update server list periodically to tolerate the case of scale in and scale out.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@pegasus.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@pegasus.apache.org
For additional commands, e-mail: dev-help@pegasus.apache.org


[GitHub] [incubator-pegasus] acelyc111 commented on a diff in pull request #1466: feat: Aggregate table/server level metrics

Posted by "acelyc111 (via GitHub)" <gi...@apache.org>.
acelyc111 commented on code in PR #1466:
URL: https://github.com/apache/incubator-pegasus/pull/1466#discussion_r1193509106


##########
collector/avail/detector.go:
##########
@@ -19,106 +19,122 @@ package avail
 
 import (
 	"context"
-	"sync/atomic"
+	"math/rand"
 	"time"
 
+	"github.com/apache/incubator-pegasus/go-client/admin"
 	"github.com/apache/incubator-pegasus/go-client/pegasus"
+	"github.com/prometheus/client_golang/prometheus"
+	"github.com/prometheus/client_golang/prometheus/promauto"
+
 	log "github.com/sirupsen/logrus"
+	"github.com/spf13/viper"
+	"gopkg.in/tomb.v2"
 )
 
 // Detector periodically checks the service availability of the Pegasus cluster.
 type Detector interface {
-
-	// Start detection until the ctx cancelled. This method will block the current thread.
-	Start(ctx context.Context) error
+	Start(tom *tomb.Tomb) error
 }
 
 // NewDetector returns a service-availability detector.
-func NewDetector(client pegasus.Client) Detector {
-	return &pegasusDetector{client: client}
+func NewDetector(detectInterval time.Duration,
+	detectTimeout time.Duration, partitionCount int) Detector {
+	metaServers := viper.GetStringSlice("meta_servers")
+	tableName := viper.GetStringMapString("availablity_detect")["table_name"]
+	// Create detect table.
+	adminClient := admin.NewClient(admin.Config{MetaServers: metaServers})
+	error := adminClient.CreateTable(context.Background(), tableName, partitionCount)
+	if error != nil {
+		log.Errorf("Create detect table %s failed, error: %s", tableName, error)
+	}
+	pegasusClient := pegasus.NewClient(pegasus.Config{MetaServers: metaServers})
+	return &pegasusDetector{
+		client:          pegasusClient,
+		detectTableName: tableName,
+		detectInterval:  detectInterval,
+		detectTimeout:   detectTimeout,
+		partitionCount:  partitionCount,
+	}
 }
 
-type pegasusDetector struct {
-	// client reads and writes periodically to a specified table.
-	client      pegasus.Client
-	detectTable pegasus.TableConnector
+var (
+	DetectTimes = promauto.NewCounter(prometheus.CounterOpts{
+		Name: "detect_times",
+		Help: "The times of availability detecting",
+	})
+
+	ReadFailureTimes = promauto.NewCounter(prometheus.CounterOpts{
+		Name: "read_failure_detect_times",
+		Help: "The failure times of read detecting",
+	})
+
+	WriteFailureTimes = promauto.NewCounter(prometheus.CounterOpts{
+		Name: "write_failure_detect_times",
+		Help: "The failure times of write detecting",
+	})
+)
 
-	detectInterval  time.Duration
+type pegasusDetector struct {
+	client          pegasus.Client
+	detectTable     pegasus.TableConnector
 	detectTableName string
-
-	// timeout of a single detect
+	detectInterval  time.Duration
+	// timeout of a single detect.
 	detectTimeout time.Duration
-
-	detectHashKeys [][]byte
-
-	recentMinuteDetectTimes  uint64
-	recentMinuteFailureTimes uint64
-
-	recentHourDetectTimes  uint64
-	recentHourFailureTimes uint64
-
-	recentDayDetectTimes  uint64
-	recentDayFailureTimes uint64
+	// partition count.
+	partitionCount int
 }
 
-func (d *pegasusDetector) Start(rootCtx context.Context) error {
+func (d *pegasusDetector) Start(tom *tomb.Tomb) error {
 	var err error
-	ctx, cancel := context.WithTimeout(rootCtx, 10*time.Second)
-	defer cancel()
-	d.detectTable, err = d.client.OpenTable(ctx, d.detectTableName)
+	// Open the detect table.
+	d.detectTable, err = d.client.OpenTable(context.Background(), d.detectTableName)
 	if err != nil {
+		log.Errorf("Open detect table %s failed, error: %s", d.detectTable, err)
 		return err
 	}
-
 	ticker := time.NewTicker(d.detectInterval)
 	for {
 		select {
-		case <-rootCtx.Done(): // check if context cancelled
+		case <-tom.Dying():
 			return nil
 		case <-ticker.C:
-			return nil
+			d.detectPartition()
+			break
 		default:
 		}
-
-		// periodically set/get a configured Pegasus table.
-		d.detect(ctx)
 	}
 }
 
-func (d *pegasusDetector) detect(rootCtx context.Context) {
-	// TODO(yingchun): doesn't work, just to mute lint errors.
-	d.detectPartition(rootCtx, 1)
-}
-
-func (d *pegasusDetector) detectPartition(rootCtx context.Context, partitionIdx int) {
-	d.incrDetectTimes()
+func (d *pegasusDetector) detectPartition() {
+	DetectTimes.Inc()
 
 	go func() {
-		ctx, cancel := context.WithTimeout(rootCtx, d.detectTimeout)
+		ctx, cancel := context.WithTimeout(context.Background(), d.detectTimeout)

Review Comment:
   How about add some latency metrics for read and write operations as well?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@pegasus.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@pegasus.apache.org
For additional commands, e-mail: dev-help@pegasus.apache.org


[GitHub] [incubator-pegasus] acelyc111 commented on a diff in pull request #1466: [Collector] Aggregate table/server level metrics

Posted by "acelyc111 (via GitHub)" <gi...@apache.org>.
acelyc111 commented on code in PR #1466:
URL: https://github.com/apache/incubator-pegasus/pull/1466#discussion_r1185891976


##########
collector/avail/detector.go:
##########
@@ -50,75 +72,45 @@ type pegasusDetector struct {
 	detectTimeout time.Duration
 
 	detectHashKeys [][]byte
-
-	recentMinuteDetectTimes  uint64
-	recentMinuteFailureTimes uint64
-
-	recentHourDetectTimes  uint64
-	recentHourFailureTimes uint64
-
-	recentDayDetectTimes  uint64
-	recentDayFailureTimes uint64
 }
 
-func (d *pegasusDetector) Start(rootCtx context.Context) error {
+func (d *pegasusDetector) Start(tom *tomb.Tomb) error {
 	var err error
-	ctx, cancel := context.WithTimeout(rootCtx, 10*time.Second)
-	defer cancel()
-	d.detectTable, err = d.client.OpenTable(ctx, d.detectTableName)
+	d.detectTable, err = d.client.OpenTable(context.Background(), d.detectTableName)
 	if err != nil {
+		log.Error(err)
 		return err
 	}
-
 	ticker := time.NewTicker(d.detectInterval)
 	for {
 		select {
-		case <-rootCtx.Done(): // check if context cancelled
-			return nil
-		case <-ticker.C:
-			return nil
-		default:
+			case <-tom.Dying():
+				return nil
+			case <-ticker.C:
+				d.detectPartition(1)

Review Comment:
   Detect only partition 1 is not enough, it's necessary to detect all partitions. Generate hashkeys distributed in all partitions can resolve it, for example, generate thousands of random hashkeys on a 16 partitions table is rough OK IMO.



##########
collector/metrics/metric_collector.go:
##########
@@ -0,0 +1,354 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package metrics
+
+import (
+	"errors"
+	"time"
+	"fmt"
+	"io/ioutil"
+	"net/http"
+
+	log "github.com/sirupsen/logrus"
+	"github.com/spf13/viper"
+	"gopkg.in/tomb.v2"
+	"github.com/tidwall/gjson"
+	"github.com/prometheus/client_golang/prometheus"
+    "github.com/prometheus/client_golang/prometheus/promauto"
+)
+
+const (
+	META_SERVER int = 0
+	REPLICA_SERVER int = 1
+)
+
+type Metric struct {
+	name string
+	value int64
+	mtype string
+	desc string
+	unit string
+}
+
+type Metrics []Metric
+
+type Attribute struct {
+	name string
+	value string
+}
+
+type Entity struct {
+	etype string
+	eid string
+	attributes string
+	metrics []Metric
+}
+
+var gaugeMetricsMap_ map[string]prometheus.GaugeVec
+var counterMetricsMap_ map[string]prometheus.CounterVec
+var histogramMetricsMap_ map[string]prometheus.Histogram
+var summaryMetricsMap_ map[string]prometheus.Summary
+
+var dataSource_ int
+
+type MetricCollector interface {
+	Start(tom *tomb.Tomb) error
+}
+
+func NewMetricCollector(dataSource int) MetricCollector {
+	dataSource_ = dataSource
+	gaugeMetricsMap_ = make(map[string]prometheus.GaugeVec, 128)
+	counterMetricsMap_ = make(map[string]prometheus.CounterVec, 128)
+	histogramMetricsMap_ = make(map[string]prometheus.Histogram, 128)
+	summaryMetricsMap_ = make(map[string]prometheus.Summary, 128)
+	initMetrics()
+	return &Collector{detectInterval: 10, detectTimeout: 10}
+}
+
+type Collector struct {
+	detectInterval  time.Duration
+	detectTimeout time.Duration
+}
+
+func (collector *Collector) Start(tom *tomb.Tomb) error {
+	ticker := time.NewTicker(collector.detectInterval)
+	for {
+		select {
+			case <-tom.Dying():
+				return nil
+			case <-ticker.C:
+				processAllMetaServerMetrics()
+				break
+			default:
+		}
+	}
+}
+
+func initMetrics() {
+	var addrs []string
+	if dataSource_ == META_SERVER {
+		addrs = viper.GetStringSlice("meta_servers")
+	} else {
+		addrs = viper.GetStringSlice("replica_servers")

Review Comment:
   Generally, the replica servers list is not exposed as meta servers list, we can fetch replica servers list via meta servers instead, though HTTP `curl localhost:34601/meta/nodes`
   
   Further more, we have to update server list periodically to tolerate the case of scale in and scale out.



##########
collector/metrics/metric_collector.go:
##########
@@ -0,0 +1,354 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package metrics
+
+import (
+	"errors"
+	"time"
+	"fmt"
+	"io/ioutil"
+	"net/http"
+
+	log "github.com/sirupsen/logrus"
+	"github.com/spf13/viper"
+	"gopkg.in/tomb.v2"
+	"github.com/tidwall/gjson"
+	"github.com/prometheus/client_golang/prometheus"
+    "github.com/prometheus/client_golang/prometheus/promauto"
+)
+
+const (
+	META_SERVER int = 0
+	REPLICA_SERVER int = 1
+)
+
+type Metric struct {
+	name string
+	value int64
+	mtype string
+	desc string
+	unit string
+}
+
+type Metrics []Metric
+
+type Attribute struct {
+	name string
+	value string
+}
+
+type Entity struct {
+	etype string
+	eid string
+	attributes string
+	metrics []Metric
+}
+
+var gaugeMetricsMap_ map[string]prometheus.GaugeVec
+var counterMetricsMap_ map[string]prometheus.CounterVec
+var histogramMetricsMap_ map[string]prometheus.Histogram
+var summaryMetricsMap_ map[string]prometheus.Summary
+
+var dataSource_ int
+
+type MetricCollector interface {
+	Start(tom *tomb.Tomb) error
+}
+
+func NewMetricCollector(dataSource int) MetricCollector {
+	dataSource_ = dataSource
+	gaugeMetricsMap_ = make(map[string]prometheus.GaugeVec, 128)
+	counterMetricsMap_ = make(map[string]prometheus.CounterVec, 128)
+	histogramMetricsMap_ = make(map[string]prometheus.Histogram, 128)
+	summaryMetricsMap_ = make(map[string]prometheus.Summary, 128)
+	initMetrics()
+	return &Collector{detectInterval: 10, detectTimeout: 10}
+}
+
+type Collector struct {
+	detectInterval  time.Duration
+	detectTimeout time.Duration
+}
+
+func (collector *Collector) Start(tom *tomb.Tomb) error {
+	ticker := time.NewTicker(collector.detectInterval)
+	for {
+		select {
+			case <-tom.Dying():
+				return nil
+			case <-ticker.C:
+				processAllMetaServerMetrics()
+				break
+			default:
+		}
+	}
+}
+
+func initMetrics() {
+	var addrs []string
+	if dataSource_ == META_SERVER {
+		addrs = viper.GetStringSlice("meta_servers")
+	} else {
+		addrs = viper.GetStringSlice("replica_servers")
+	}
+
+	for _, addr := range addrs {
+		data, err := getOneMetaServerMetrics(addr)
+		if err != nil {
+			log.Errorf(fmt.Sprintf("Get raw metrics from %s failed, err: %s", addr, err))
+			return
+		}
+		json_data := gjson.Parse(data)
+		for _, entity := range json_data.Array() {
+			for _, metric := range entity.Get("metrics").Array() {
+				var name string = metric.Get("name").String()
+				var mtype string = metric.Get("type").String()
+				var desc string = metric.Get("description").String()
+				log.Errorf("name:%s type:%s desc:%s", name, mtype, desc)
+				switch mtype {
+				case "counter":
+					if _, ok := counterMetricsMap_[name]; ok {
+						continue
+					}
+					counterMetric := promauto.NewCounterVec(prometheus.CounterOpts{
+						Name: name,
+						Help: desc,
+					}, []string{"level", "endpoint"})
+					counterMetricsMap_[name] = *counterMetric
+					break
+				case "gauge":
+					if _, ok := gaugeMetricsMap_[name]; ok {
+						continue
+					}
+					gaugeMetric := promauto.NewGaugeVec(prometheus.GaugeOpts{
+						Name: name,
+						Help: desc,
+					}, []string{"level", "endpoint"})
+					gaugeMetricsMap_[name] = *gaugeMetric
+					break
+				case "histogram":

Review Comment:
   The new metrics framework dosen't has histogram type of metrics, so it can be removed.



##########
collector/metrics/metric_collector.go:
##########
@@ -0,0 +1,354 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package metrics
+
+import (
+	"errors"
+	"time"
+	"fmt"
+	"io/ioutil"
+	"net/http"
+
+	log "github.com/sirupsen/logrus"
+	"github.com/spf13/viper"
+	"gopkg.in/tomb.v2"
+	"github.com/tidwall/gjson"
+	"github.com/prometheus/client_golang/prometheus"
+    "github.com/prometheus/client_golang/prometheus/promauto"
+)
+
+const (
+	META_SERVER int = 0
+	REPLICA_SERVER int = 1
+)
+
+type Metric struct {
+	name string
+	value int64
+	mtype string
+	desc string
+	unit string
+}
+
+type Metrics []Metric
+
+type Attribute struct {
+	name string
+	value string
+}
+
+type Entity struct {
+	etype string
+	eid string
+	attributes string
+	metrics []Metric
+}
+
+var gaugeMetricsMap_ map[string]prometheus.GaugeVec
+var counterMetricsMap_ map[string]prometheus.CounterVec
+var histogramMetricsMap_ map[string]prometheus.Histogram
+var summaryMetricsMap_ map[string]prometheus.Summary
+
+var dataSource_ int
+
+type MetricCollector interface {
+	Start(tom *tomb.Tomb) error
+}
+
+func NewMetricCollector(dataSource int) MetricCollector {
+	dataSource_ = dataSource
+	gaugeMetricsMap_ = make(map[string]prometheus.GaugeVec, 128)
+	counterMetricsMap_ = make(map[string]prometheus.CounterVec, 128)
+	histogramMetricsMap_ = make(map[string]prometheus.Histogram, 128)
+	summaryMetricsMap_ = make(map[string]prometheus.Summary, 128)
+	initMetrics()
+	return &Collector{detectInterval: 10, detectTimeout: 10}
+}
+
+type Collector struct {
+	detectInterval  time.Duration
+	detectTimeout time.Duration
+}
+
+func (collector *Collector) Start(tom *tomb.Tomb) error {
+	ticker := time.NewTicker(collector.detectInterval)
+	for {
+		select {
+			case <-tom.Dying():
+				return nil
+			case <-ticker.C:
+				processAllMetaServerMetrics()
+				break
+			default:
+		}
+	}
+}
+
+func initMetrics() {
+	var addrs []string
+	if dataSource_ == META_SERVER {
+		addrs = viper.GetStringSlice("meta_servers")
+	} else {
+		addrs = viper.GetStringSlice("replica_servers")
+	}
+
+	for _, addr := range addrs {
+		data, err := getOneMetaServerMetrics(addr)
+		if err != nil {
+			log.Errorf(fmt.Sprintf("Get raw metrics from %s failed, err: %s", addr, err))
+			return
+		}
+		json_data := gjson.Parse(data)
+		for _, entity := range json_data.Array() {
+			for _, metric := range entity.Get("metrics").Array() {
+				var name string = metric.Get("name").String()
+				var mtype string = metric.Get("type").String()
+				var desc string = metric.Get("description").String()
+				log.Errorf("name:%s type:%s desc:%s", name, mtype, desc)
+				switch mtype {
+				case "counter":
+					if _, ok := counterMetricsMap_[name]; ok {
+						continue
+					}
+					counterMetric := promauto.NewCounterVec(prometheus.CounterOpts{
+						Name: name,
+						Help: desc,
+					}, []string{"level", "endpoint"})
+					counterMetricsMap_[name] = *counterMetric
+					break
+				case "gauge":
+					if _, ok := gaugeMetricsMap_[name]; ok {
+						continue
+					}
+					gaugeMetric := promauto.NewGaugeVec(prometheus.GaugeOpts{
+						Name: name,
+						Help: desc,
+					}, []string{"level", "endpoint"})
+					gaugeMetricsMap_[name] = *gaugeMetric
+					break
+				case "histogram":
+					if _, ok := histogramMetricsMap_[name]; ok {
+						continue
+					}
+					histogramMetric := promauto.NewHistogram(prometheus.HistogramOpts{
+						Name:    name,
+						Help:    desc,
+						Buckets: prometheus.LinearBuckets(-3, .1, 61),
+					})
+					histogramMetricsMap_[name] = histogramMetric
+					break
+				case "percentile":
+					if _, ok := summaryMetricsMap_[name]; ok {
+						continue
+					}
+					log.Errorf("name:%s", name)
+					summaryMetric := promauto.NewSummary(prometheus.SummaryOpts{
+						Name:    name,
+						Help:    desc,
+						Objectives: map[float64]float64{
+							0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001, 0.999: 0.0001},
+					})
+					summaryMetricsMap_[name] = summaryMetric
+					break
+				default:
+					log.Errorf("Unsupport metric type %s", mtype)
+					break
+				}
+			}
+		}
+	}
+}
+
+func processAllMetaServerMetrics() {
+	var addrs []string
+	if dataSource_ == META_SERVER {
+		addrs = viper.GetStringSlice("meta_servers")
+	} else {
+		addrs = viper.GetStringSlice("replica_servers")
+	}
+	metric_by_table_id := make(map[string]Metrics, 128)
+	metric_of_cluster := make([]Metric, 128)
+	for _, addr := range addrs {
+		data, err := getOneMetaServerMetrics(addr)
+		if err != nil {
+			log.Errorf("failed to get data from %s, err %s", addr, err)
+			return
+		}
+		json_data := gjson.Parse(data)
+		for _, entity := range json_data.Array() {
+			etype := entity.Get("type").String()
+			switch etype {
+			case "replica":
+			case "partition":
+				table_id := entity.Get("attributes").Get("table_id").String()
+				mergeIntoTableLevel(entity.Get("metrics").Array(), table_id, &metric_by_table_id)
+				break
+			case "server":
+				mergeIntoClusterLevel(entity.Get("metrics").Array(), metric_of_cluster)

Review Comment:
   The raw server level metrics are also needed to sink to Promethues, that is to say, some metrics are with attributes
   ```
   level: server
   endpoint: <host>
   ```



##########
collector/metrics/metric_collector.go:
##########
@@ -0,0 +1,354 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package metrics
+
+import (
+	"errors"
+	"time"
+	"fmt"
+	"io/ioutil"
+	"net/http"
+
+	log "github.com/sirupsen/logrus"
+	"github.com/spf13/viper"
+	"gopkg.in/tomb.v2"
+	"github.com/tidwall/gjson"
+	"github.com/prometheus/client_golang/prometheus"
+    "github.com/prometheus/client_golang/prometheus/promauto"
+)
+
+const (
+	META_SERVER int = 0
+	REPLICA_SERVER int = 1
+)
+
+type Metric struct {
+	name string
+	value int64
+	mtype string
+	desc string
+	unit string
+}
+
+type Metrics []Metric
+
+type Attribute struct {
+	name string
+	value string
+}
+
+type Entity struct {
+	etype string
+	eid string
+	attributes string
+	metrics []Metric
+}
+
+var gaugeMetricsMap_ map[string]prometheus.GaugeVec
+var counterMetricsMap_ map[string]prometheus.CounterVec
+var histogramMetricsMap_ map[string]prometheus.Histogram
+var summaryMetricsMap_ map[string]prometheus.Summary
+
+var dataSource_ int
+
+type MetricCollector interface {
+	Start(tom *tomb.Tomb) error
+}
+
+func NewMetricCollector(dataSource int) MetricCollector {
+	dataSource_ = dataSource
+	gaugeMetricsMap_ = make(map[string]prometheus.GaugeVec, 128)
+	counterMetricsMap_ = make(map[string]prometheus.CounterVec, 128)
+	histogramMetricsMap_ = make(map[string]prometheus.Histogram, 128)
+	summaryMetricsMap_ = make(map[string]prometheus.Summary, 128)
+	initMetrics()
+	return &Collector{detectInterval: 10, detectTimeout: 10}
+}
+
+type Collector struct {
+	detectInterval  time.Duration
+	detectTimeout time.Duration
+}
+
+func (collector *Collector) Start(tom *tomb.Tomb) error {
+	ticker := time.NewTicker(collector.detectInterval)
+	for {
+		select {
+			case <-tom.Dying():
+				return nil
+			case <-ticker.C:
+				processAllMetaServerMetrics()
+				break
+			default:
+		}
+	}
+}
+
+func initMetrics() {
+	var addrs []string
+	if dataSource_ == META_SERVER {
+		addrs = viper.GetStringSlice("meta_servers")
+	} else {
+		addrs = viper.GetStringSlice("replica_servers")
+	}
+
+	for _, addr := range addrs {
+		data, err := getOneMetaServerMetrics(addr)
+		if err != nil {
+			log.Errorf(fmt.Sprintf("Get raw metrics from %s failed, err: %s", addr, err))
+			return
+		}
+		json_data := gjson.Parse(data)
+		for _, entity := range json_data.Array() {
+			for _, metric := range entity.Get("metrics").Array() {
+				var name string = metric.Get("name").String()
+				var mtype string = metric.Get("type").String()
+				var desc string = metric.Get("description").String()
+				log.Errorf("name:%s type:%s desc:%s", name, mtype, desc)
+				switch mtype {
+				case "counter":
+					if _, ok := counterMetricsMap_[name]; ok {
+						continue
+					}
+					counterMetric := promauto.NewCounterVec(prometheus.CounterOpts{
+						Name: name,
+						Help: desc,
+					}, []string{"level", "endpoint"})
+					counterMetricsMap_[name] = *counterMetric
+					break
+				case "gauge":
+					if _, ok := gaugeMetricsMap_[name]; ok {
+						continue
+					}
+					gaugeMetric := promauto.NewGaugeVec(prometheus.GaugeOpts{
+						Name: name,
+						Help: desc,
+					}, []string{"level", "endpoint"})
+					gaugeMetricsMap_[name] = *gaugeMetric
+					break
+				case "histogram":
+					if _, ok := histogramMetricsMap_[name]; ok {
+						continue
+					}
+					histogramMetric := promauto.NewHistogram(prometheus.HistogramOpts{
+						Name:    name,
+						Help:    desc,
+						Buckets: prometheus.LinearBuckets(-3, .1, 61),
+					})
+					histogramMetricsMap_[name] = histogramMetric
+					break
+				case "percentile":
+					if _, ok := summaryMetricsMap_[name]; ok {
+						continue
+					}
+					log.Errorf("name:%s", name)
+					summaryMetric := promauto.NewSummary(prometheus.SummaryOpts{
+						Name:    name,
+						Help:    desc,
+						Objectives: map[float64]float64{
+							0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001, 0.999: 0.0001},
+					})
+					summaryMetricsMap_[name] = summaryMetric
+					break
+				default:
+					log.Errorf("Unsupport metric type %s", mtype)
+					break
+				}
+			}
+		}
+	}
+}
+
+func processAllMetaServerMetrics() {
+	var addrs []string
+	if dataSource_ == META_SERVER {
+		addrs = viper.GetStringSlice("meta_servers")
+	} else {
+		addrs = viper.GetStringSlice("replica_servers")
+	}
+	metric_by_table_id := make(map[string]Metrics, 128)
+	metric_of_cluster := make([]Metric, 128)
+	for _, addr := range addrs {
+		data, err := getOneMetaServerMetrics(addr)
+		if err != nil {
+			log.Errorf("failed to get data from %s, err %s", addr, err)
+			return
+		}
+		json_data := gjson.Parse(data)
+		for _, entity := range json_data.Array() {
+			etype := entity.Get("type").String()
+			switch etype {
+			case "replica":
+			case "partition":
+				table_id := entity.Get("attributes").Get("table_id").String()
+				mergeIntoTableLevel(entity.Get("metrics").Array(), table_id, &metric_by_table_id)
+				break
+			case "server":
+				mergeIntoClusterLevel(entity.Get("metrics").Array(), metric_of_cluster)
+				break
+			default:
+				log.Errorf("Unsupport entity type %s", etype)
+			}
+		}
+	}
+	// Update table level metrics.
+	for table_id, metrics := range metric_by_table_id {
+		for _, metric := range metrics {
+			switch metric.mtype {
+				case "counter":
+					if counter, ok := counterMetricsMap_[metric.name]; ok {
+						counter.With(
+							prometheus.Labels{"level":"table", "endpoint":table_id}).Add(
+								float64(metric.value))
+					} else {
+						log.Warnf("Unknown metric name %s", metric.name)
+					}
+					break
+				case "gauge":
+					if gauge, ok := gaugeMetricsMap_[metric.name]; ok {
+						gauge.With(
+							prometheus.Labels{"level":"table", "endpoint":table_id}).Set(
+								float64(metric.value))
+					} else {
+						log.Warnf("Unknown metric name %s", metric.name)
+					}
+					break
+				case "histogram":
+				case "percentile":
+					log.Warnf("Unsupport metric type %s", metric.mtype)
+					break
+				default:
+					log.Warnf("Unknown metric type %s", metric.mtype)
+			}
+		}
+	}
+	// Update server level metrics
+	for _, metric := range metric_of_cluster {
+		switch metric.mtype {
+			case "counter":
+				if counter, ok := counterMetricsMap_[metric.name]; ok {
+					counter.With(
+						prometheus.Labels{"level":"server", "endpoint":"cluster"}).Add(
+							float64(metric.value))
+				} else {
+					log.Warnf("Unknown metric name %s", metric.name)
+				}
+				break
+			case "gauge":
+				if gauge, ok := gaugeMetricsMap_[metric.name]; ok {
+					gauge.With(
+						prometheus.Labels{"level":"server", "endpoint":"cluster"}).Set(
+							float64(metric.value))
+				} else {
+					log.Warnf("Unknown metric name %s", metric.name)
+				}
+				break
+			case "histogram":
+			case "percentile":
+				log.Warnf("Unsupport metric type %s", metric.mtype)
+				break
+			default:
+				log.Warnf("Unknown metric type %s", metric.mtype)
+		}
+	}
+}
+
+func mergeIntoClusterLevel(metrics []gjson.Result, metric_of_cluster []Metric) {
+	for _, metric := range metrics {
+		name := metric.Get("name").String()
+		mtype := metric.Get("type").String()
+		value := metric.Get("value").Int()
+		var isExisted bool = false
+		for _, m := range metric_of_cluster {
+			if m.name == name {
+				isExisted = true
+				switch mtype {
+				case "counter":
+				case "gauge":
+					m.value += value
+					break
+				case "histogram":
+				case "percentile":
+					break
+				default:
+					log.Errorf("Unsupport metric type %s", mtype)
+				}
+			}
+		}
+		if !isExisted {
+			unit := metric.Get("name").String()
+			desc := metric.Get("desc").String()
+			value := metric.Get("value").Int()
+			m := Metric{name:name, mtype:mtype, unit:unit, desc:desc, value:value}
+			metric_of_cluster = append(metric_of_cluster, m)
+		}
+	}
+}
+
+func mergeIntoTableLevel(metrics []gjson.Result, table_id string,
+						 metric_by_table_id *map[string]Metrics) {
+	// Find a same table id, try to merge them.
+	if _, ok := (*metric_by_table_id)[table_id]; ok {
+		mts := (*metric_by_table_id)[table_id]
+		for _, metric := range metrics {
+			name := metric.Get("name").String()
+			mtype := metric.Get("type").String()
+			value := metric.Get("value").Int()
+			for _, m := range mts {
+				if name == m.name {
+					switch mtype {
+					case "counter":
+					case "gauge":
+						m.value += value
+						break
+					case "histogram":
+					case "percentile":
+						break
+					}
+				}
+			}
+		}
+	} else {
+		var mts Metrics
+		for _, metric := range metrics {
+			name := metric.Get("name").String()
+			mtype := metric.Get("type").String()
+			unit := metric.Get("name").String()
+			desc := metric.Get("desc").String()
+			value := metric.Get("value").Int()
+			m := Metric{name:name, mtype:mtype, unit:unit, desc:desc, value:value}
+			mts = append(mts, m)
+		}
+		(*metric_by_table_id)[table_id] = mts
+	}
+}
+
+func getOneMetaServerMetrics(addr string) (string, error) {
+	url := fmt.Sprintf("http://%s/metrics?detail=true", addr)

Review Comment:
   > detail=true
   
   Fetch details everytime is a waste of bandwith, we can fetch the details at the first time to initilalize the metrics, then cache the details and fetch metrics without `details`.



##########
collector/metrics/metric_collector.go:
##########
@@ -0,0 +1,354 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package metrics
+
+import (
+	"errors"
+	"time"
+	"fmt"
+	"io/ioutil"
+	"net/http"
+
+	log "github.com/sirupsen/logrus"
+	"github.com/spf13/viper"
+	"gopkg.in/tomb.v2"
+	"github.com/tidwall/gjson"
+	"github.com/prometheus/client_golang/prometheus"
+    "github.com/prometheus/client_golang/prometheus/promauto"
+)
+
+const (
+	META_SERVER int = 0
+	REPLICA_SERVER int = 1
+)
+
+type Metric struct {
+	name string
+	value int64
+	mtype string
+	desc string
+	unit string
+}
+
+type Metrics []Metric
+
+type Attribute struct {
+	name string
+	value string
+}
+
+type Entity struct {
+	etype string
+	eid string
+	attributes string
+	metrics []Metric
+}
+
+var gaugeMetricsMap_ map[string]prometheus.GaugeVec
+var counterMetricsMap_ map[string]prometheus.CounterVec
+var histogramMetricsMap_ map[string]prometheus.Histogram
+var summaryMetricsMap_ map[string]prometheus.Summary
+
+var dataSource_ int
+
+type MetricCollector interface {
+	Start(tom *tomb.Tomb) error
+}
+
+func NewMetricCollector(dataSource int) MetricCollector {
+	dataSource_ = dataSource
+	gaugeMetricsMap_ = make(map[string]prometheus.GaugeVec, 128)
+	counterMetricsMap_ = make(map[string]prometheus.CounterVec, 128)
+	histogramMetricsMap_ = make(map[string]prometheus.Histogram, 128)
+	summaryMetricsMap_ = make(map[string]prometheus.Summary, 128)
+	initMetrics()
+	return &Collector{detectInterval: 10, detectTimeout: 10}
+}
+
+type Collector struct {
+	detectInterval  time.Duration
+	detectTimeout time.Duration
+}
+
+func (collector *Collector) Start(tom *tomb.Tomb) error {
+	ticker := time.NewTicker(collector.detectInterval)
+	for {
+		select {
+			case <-tom.Dying():
+				return nil
+			case <-ticker.C:
+				processAllMetaServerMetrics()
+				break
+			default:
+		}
+	}
+}
+
+func initMetrics() {
+	var addrs []string
+	if dataSource_ == META_SERVER {
+		addrs = viper.GetStringSlice("meta_servers")
+	} else {
+		addrs = viper.GetStringSlice("replica_servers")
+	}
+
+	for _, addr := range addrs {
+		data, err := getOneMetaServerMetrics(addr)
+		if err != nil {
+			log.Errorf(fmt.Sprintf("Get raw metrics from %s failed, err: %s", addr, err))
+			return
+		}
+		json_data := gjson.Parse(data)
+		for _, entity := range json_data.Array() {
+			for _, metric := range entity.Get("metrics").Array() {
+				var name string = metric.Get("name").String()
+				var mtype string = metric.Get("type").String()
+				var desc string = metric.Get("description").String()
+				log.Errorf("name:%s type:%s desc:%s", name, mtype, desc)
+				switch mtype {
+				case "counter":
+					if _, ok := counterMetricsMap_[name]; ok {
+						continue
+					}
+					counterMetric := promauto.NewCounterVec(prometheus.CounterOpts{
+						Name: name,
+						Help: desc,
+					}, []string{"level", "endpoint"})
+					counterMetricsMap_[name] = *counterMetric
+					break
+				case "gauge":
+					if _, ok := gaugeMetricsMap_[name]; ok {
+						continue
+					}
+					gaugeMetric := promauto.NewGaugeVec(prometheus.GaugeOpts{
+						Name: name,
+						Help: desc,
+					}, []string{"level", "endpoint"})
+					gaugeMetricsMap_[name] = *gaugeMetric
+					break
+				case "histogram":
+					if _, ok := histogramMetricsMap_[name]; ok {
+						continue
+					}
+					histogramMetric := promauto.NewHistogram(prometheus.HistogramOpts{
+						Name:    name,
+						Help:    desc,
+						Buckets: prometheus.LinearBuckets(-3, .1, 61),
+					})
+					histogramMetricsMap_[name] = histogramMetric
+					break
+				case "percentile":
+					if _, ok := summaryMetricsMap_[name]; ok {
+						continue
+					}
+					log.Errorf("name:%s", name)
+					summaryMetric := promauto.NewSummary(prometheus.SummaryOpts{
+						Name:    name,
+						Help:    desc,
+						Objectives: map[float64]float64{
+							0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001, 0.999: 0.0001},
+					})
+					summaryMetricsMap_[name] = summaryMetric
+					break
+				default:
+					log.Errorf("Unsupport metric type %s", mtype)
+					break
+				}
+			}
+		}
+	}
+}
+
+func processAllMetaServerMetrics() {
+	var addrs []string
+	if dataSource_ == META_SERVER {
+		addrs = viper.GetStringSlice("meta_servers")
+	} else {
+		addrs = viper.GetStringSlice("replica_servers")
+	}
+	metric_by_table_id := make(map[string]Metrics, 128)
+	metric_of_cluster := make([]Metric, 128)
+	for _, addr := range addrs {
+		data, err := getOneMetaServerMetrics(addr)
+		if err != nil {
+			log.Errorf("failed to get data from %s, err %s", addr, err)
+			return
+		}
+		json_data := gjson.Parse(data)
+		for _, entity := range json_data.Array() {
+			etype := entity.Get("type").String()
+			switch etype {
+			case "replica":
+			case "partition":
+				table_id := entity.Get("attributes").Get("table_id").String()
+				mergeIntoTableLevel(entity.Get("metrics").Array(), table_id, &metric_by_table_id)
+				break
+			case "server":
+				mergeIntoClusterLevel(entity.Get("metrics").Array(), metric_of_cluster)
+				break
+			default:
+				log.Errorf("Unsupport entity type %s", etype)
+			}
+		}
+	}
+	// Update table level metrics.
+	for table_id, metrics := range metric_by_table_id {
+		for _, metric := range metrics {
+			switch metric.mtype {
+				case "counter":
+					if counter, ok := counterMetricsMap_[metric.name]; ok {
+						counter.With(
+							prometheus.Labels{"level":"table", "endpoint":table_id}).Add(
+								float64(metric.value))
+					} else {
+						log.Warnf("Unknown metric name %s", metric.name)
+					}
+					break
+				case "gauge":
+					if gauge, ok := gaugeMetricsMap_[metric.name]; ok {
+						gauge.With(
+							prometheus.Labels{"level":"table", "endpoint":table_id}).Set(
+								float64(metric.value))
+					} else {
+						log.Warnf("Unknown metric name %s", metric.name)
+					}
+					break
+				case "histogram":
+				case "percentile":
+					log.Warnf("Unsupport metric type %s", metric.mtype)
+					break
+				default:
+					log.Warnf("Unknown metric type %s", metric.mtype)
+			}
+		}
+	}
+	// Update server level metrics

Review Comment:
   > server level
   
   cluste level?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@pegasus.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@pegasus.apache.org
For additional commands, e-mail: dev-help@pegasus.apache.org


[GitHub] [incubator-pegasus] xinghuayu007 closed pull request #1466: feat: Aggregate table/server level metrics

Posted by "xinghuayu007 (via GitHub)" <gi...@apache.org>.
xinghuayu007 closed pull request #1466: feat: Aggregate table/server level metrics
URL: https://github.com/apache/incubator-pegasus/pull/1466


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@pegasus.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@pegasus.apache.org
For additional commands, e-mail: dev-help@pegasus.apache.org


[GitHub] [incubator-pegasus] acelyc111 commented on a diff in pull request #1466: feat: Aggregate table/server level metrics

Posted by "acelyc111 (via GitHub)" <gi...@apache.org>.
acelyc111 commented on code in PR #1466:
URL: https://github.com/apache/incubator-pegasus/pull/1466#discussion_r1195922372


##########
collector/avail/detector.go:
##########
@@ -19,106 +19,134 @@ package avail
 
 import (
 	"context"
-	"sync/atomic"
+	"math/rand"
 	"time"
 
+	"github.com/apache/incubator-pegasus/go-client/admin"
 	"github.com/apache/incubator-pegasus/go-client/pegasus"
+	"github.com/prometheus/client_golang/prometheus"
+	"github.com/prometheus/client_golang/prometheus/promauto"
+
 	log "github.com/sirupsen/logrus"
+	"github.com/spf13/viper"
+	"gopkg.in/tomb.v2"
 )
 
 // Detector periodically checks the service availability of the Pegasus cluster.
 type Detector interface {
-
-	// Start detection until the ctx cancelled. This method will block the current thread.
-	Start(ctx context.Context) error
+	Start(tom *tomb.Tomb) error
 }
 
 // NewDetector returns a service-availability detector.
-func NewDetector(client pegasus.Client) Detector {
-	return &pegasusDetector{client: client}
+func NewDetector(detectInterval time.Duration,
+	detectTimeout time.Duration, partitionCount int) Detector {
+	metaServers := viper.GetStringSlice("meta_servers")
+	tableName := viper.GetStringMapString("availablity_detect")["table_name"]
+	// Create detect table.
+	adminClient := admin.NewClient(admin.Config{MetaServers: metaServers})
+	error := adminClient.CreateTable(context.Background(), tableName, partitionCount)
+	if error != nil {
+		log.Errorf("Create detect table %s failed, error: %s", tableName, error)
+	}
+	pegasusClient := pegasus.NewClient(pegasus.Config{MetaServers: metaServers})
+	return &pegasusDetector{
+		client:          pegasusClient,
+		detectTableName: tableName,
+		detectInterval:  detectInterval,
+		detectTimeout:   detectTimeout,
+		partitionCount:  partitionCount,
+	}
 }
 
-type pegasusDetector struct {
-	// client reads and writes periodically to a specified table.
-	client      pegasus.Client
-	detectTable pegasus.TableConnector
+var (
+	DetectTimes = promauto.NewCounter(prometheus.CounterOpts{
+		Name: "detect_times",
+		Help: "The times of availability detecting",
+	})
+
+	ReadFailureTimes = promauto.NewCounter(prometheus.CounterOpts{
+		Name: "read_failure_detect_times",
+		Help: "The failure times of read detecting",
+	})
+
+	WriteFailureTimes = promauto.NewCounter(prometheus.CounterOpts{
+		Name: "write_failure_detect_times",
+		Help: "The failure times of write detecting",
+	})
+
+	ReadLatency = promauto.NewGauge(prometheus.GaugeOpts{
+		Name: "read_latency",
+		Help: "The latency of read data",
+	})
+
+	WriteLatency = promauto.NewGauge(prometheus.GaugeOpts{
+		Name: "write_latency",
+		Help: "The latency of write data",
+	})
+)
 
-	detectInterval  time.Duration
+type pegasusDetector struct {
+	client          pegasus.Client
+	detectTable     pegasus.TableConnector
 	detectTableName string
-
-	// timeout of a single detect
+	detectInterval  time.Duration
+	// timeout of a single detect.
 	detectTimeout time.Duration
-
-	detectHashKeys [][]byte
-
-	recentMinuteDetectTimes  uint64
-	recentMinuteFailureTimes uint64
-
-	recentHourDetectTimes  uint64
-	recentHourFailureTimes uint64
-
-	recentDayDetectTimes  uint64
-	recentDayFailureTimes uint64
+	// partition count.
+	partitionCount int
 }
 
-func (d *pegasusDetector) Start(rootCtx context.Context) error {
+func (d *pegasusDetector) Start(tom *tomb.Tomb) error {
 	var err error
-	ctx, cancel := context.WithTimeout(rootCtx, 10*time.Second)
-	defer cancel()
-	d.detectTable, err = d.client.OpenTable(ctx, d.detectTableName)
+	// Open the detect table.
+	d.detectTable, err = d.client.OpenTable(context.Background(), d.detectTableName)
 	if err != nil {
+		log.Errorf("Open detect table %s failed, error: %s", d.detectTable, err)
 		return err
 	}
-
 	ticker := time.NewTicker(d.detectInterval)
 	for {
 		select {
-		case <-rootCtx.Done(): // check if context cancelled
+		case <-tom.Dying():
 			return nil
 		case <-ticker.C:
-			return nil
-		default:
+			d.detectPartition()
 		}

Review Comment:
   Why delete the `default` case?



##########
collector/avail/detector.go:
##########
@@ -19,106 +19,134 @@ package avail
 
 import (
 	"context"
-	"sync/atomic"
+	"math/rand"
 	"time"
 
+	"github.com/apache/incubator-pegasus/go-client/admin"
 	"github.com/apache/incubator-pegasus/go-client/pegasus"
+	"github.com/prometheus/client_golang/prometheus"
+	"github.com/prometheus/client_golang/prometheus/promauto"
+
 	log "github.com/sirupsen/logrus"
+	"github.com/spf13/viper"
+	"gopkg.in/tomb.v2"
 )
 
 // Detector periodically checks the service availability of the Pegasus cluster.
 type Detector interface {
-
-	// Start detection until the ctx cancelled. This method will block the current thread.
-	Start(ctx context.Context) error
+	Start(tom *tomb.Tomb) error
 }
 
 // NewDetector returns a service-availability detector.
-func NewDetector(client pegasus.Client) Detector {
-	return &pegasusDetector{client: client}
+func NewDetector(detectInterval time.Duration,
+	detectTimeout time.Duration, partitionCount int) Detector {
+	metaServers := viper.GetStringSlice("meta_servers")
+	tableName := viper.GetStringMapString("availablity_detect")["table_name"]
+	// Create detect table.
+	adminClient := admin.NewClient(admin.Config{MetaServers: metaServers})
+	error := adminClient.CreateTable(context.Background(), tableName, partitionCount)
+	if error != nil {
+		log.Errorf("Create detect table %s failed, error: %s", tableName, error)
+	}
+	pegasusClient := pegasus.NewClient(pegasus.Config{MetaServers: metaServers})
+	return &pegasusDetector{
+		client:          pegasusClient,
+		detectTableName: tableName,
+		detectInterval:  detectInterval,
+		detectTimeout:   detectTimeout,
+		partitionCount:  partitionCount,
+	}
 }
 
-type pegasusDetector struct {
-	// client reads and writes periodically to a specified table.
-	client      pegasus.Client
-	detectTable pegasus.TableConnector
+var (
+	DetectTimes = promauto.NewCounter(prometheus.CounterOpts{
+		Name: "detect_times",
+		Help: "The times of availability detecting",
+	})
+
+	ReadFailureTimes = promauto.NewCounter(prometheus.CounterOpts{
+		Name: "read_failure_detect_times",
+		Help: "The failure times of read detecting",
+	})
+
+	WriteFailureTimes = promauto.NewCounter(prometheus.CounterOpts{
+		Name: "write_failure_detect_times",
+		Help: "The failure times of write detecting",
+	})
+
+	ReadLatency = promauto.NewGauge(prometheus.GaugeOpts{
+		Name: "read_latency",
+		Help: "The latency of read data",
+	})
+
+	WriteLatency = promauto.NewGauge(prometheus.GaugeOpts{
+		Name: "write_latency",
+		Help: "The latency of write data",
+	})
+)
 
-	detectInterval  time.Duration
+type pegasusDetector struct {
+	client          pegasus.Client
+	detectTable     pegasus.TableConnector
 	detectTableName string
-
-	// timeout of a single detect
+	detectInterval  time.Duration
+	// timeout of a single detect.
 	detectTimeout time.Duration

Review Comment:
   ```suggestion
   	detectTimeout   time.Duration
   ```



##########
collector/avail/detector.go:
##########
@@ -19,106 +19,134 @@ package avail
 
 import (
 	"context"
-	"sync/atomic"
+	"math/rand"
 	"time"
 
+	"github.com/apache/incubator-pegasus/go-client/admin"
 	"github.com/apache/incubator-pegasus/go-client/pegasus"
+	"github.com/prometheus/client_golang/prometheus"
+	"github.com/prometheus/client_golang/prometheus/promauto"
+
 	log "github.com/sirupsen/logrus"
+	"github.com/spf13/viper"
+	"gopkg.in/tomb.v2"
 )
 
 // Detector periodically checks the service availability of the Pegasus cluster.
 type Detector interface {
-
-	// Start detection until the ctx cancelled. This method will block the current thread.
-	Start(ctx context.Context) error
+	Start(tom *tomb.Tomb) error
 }
 
 // NewDetector returns a service-availability detector.
-func NewDetector(client pegasus.Client) Detector {
-	return &pegasusDetector{client: client}
+func NewDetector(detectInterval time.Duration,
+	detectTimeout time.Duration, partitionCount int) Detector {
+	metaServers := viper.GetStringSlice("meta_servers")
+	tableName := viper.GetStringMapString("availablity_detect")["table_name"]
+	// Create detect table.
+	adminClient := admin.NewClient(admin.Config{MetaServers: metaServers})
+	error := adminClient.CreateTable(context.Background(), tableName, partitionCount)
+	if error != nil {
+		log.Errorf("Create detect table %s failed, error: %s", tableName, error)
+	}
+	pegasusClient := pegasus.NewClient(pegasus.Config{MetaServers: metaServers})
+	return &pegasusDetector{
+		client:          pegasusClient,
+		detectTableName: tableName,
+		detectInterval:  detectInterval,
+		detectTimeout:   detectTimeout,
+		partitionCount:  partitionCount,
+	}
 }
 
-type pegasusDetector struct {
-	// client reads and writes periodically to a specified table.
-	client      pegasus.Client
-	detectTable pegasus.TableConnector
+var (
+	DetectTimes = promauto.NewCounter(prometheus.CounterOpts{
+		Name: "detect_times",
+		Help: "The times of availability detecting",
+	})
+
+	ReadFailureTimes = promauto.NewCounter(prometheus.CounterOpts{
+		Name: "read_failure_detect_times",
+		Help: "The failure times of read detecting",
+	})
+
+	WriteFailureTimes = promauto.NewCounter(prometheus.CounterOpts{
+		Name: "write_failure_detect_times",
+		Help: "The failure times of write detecting",
+	})
+
+	ReadLatency = promauto.NewGauge(prometheus.GaugeOpts{
+		Name: "read_latency",
+		Help: "The latency of read data",
+	})
+
+	WriteLatency = promauto.NewGauge(prometheus.GaugeOpts{
+		Name: "write_latency",
+		Help: "The latency of write data",

Review Comment:
   ```suggestion
   		Help: "The latency of write data in milliseconds",
   ```



##########
collector/avail/detector.go:
##########
@@ -19,106 +19,134 @@ package avail
 
 import (
 	"context"
-	"sync/atomic"
+	"math/rand"
 	"time"
 
+	"github.com/apache/incubator-pegasus/go-client/admin"
 	"github.com/apache/incubator-pegasus/go-client/pegasus"
+	"github.com/prometheus/client_golang/prometheus"
+	"github.com/prometheus/client_golang/prometheus/promauto"
+
 	log "github.com/sirupsen/logrus"
+	"github.com/spf13/viper"
+	"gopkg.in/tomb.v2"
 )
 
 // Detector periodically checks the service availability of the Pegasus cluster.
 type Detector interface {
-
-	// Start detection until the ctx cancelled. This method will block the current thread.
-	Start(ctx context.Context) error
+	Start(tom *tomb.Tomb) error
 }
 
 // NewDetector returns a service-availability detector.
-func NewDetector(client pegasus.Client) Detector {
-	return &pegasusDetector{client: client}
+func NewDetector(detectInterval time.Duration,
+	detectTimeout time.Duration, partitionCount int) Detector {
+	metaServers := viper.GetStringSlice("meta_servers")
+	tableName := viper.GetStringMapString("availablity_detect")["table_name"]
+	// Create detect table.
+	adminClient := admin.NewClient(admin.Config{MetaServers: metaServers})
+	error := adminClient.CreateTable(context.Background(), tableName, partitionCount)
+	if error != nil {
+		log.Errorf("Create detect table %s failed, error: %s", tableName, error)
+	}
+	pegasusClient := pegasus.NewClient(pegasus.Config{MetaServers: metaServers})
+	return &pegasusDetector{
+		client:          pegasusClient,
+		detectTableName: tableName,
+		detectInterval:  detectInterval,
+		detectTimeout:   detectTimeout,
+		partitionCount:  partitionCount,
+	}
 }
 
-type pegasusDetector struct {
-	// client reads and writes periodically to a specified table.
-	client      pegasus.Client
-	detectTable pegasus.TableConnector
+var (
+	DetectTimes = promauto.NewCounter(prometheus.CounterOpts{
+		Name: "detect_times",
+		Help: "The times of availability detecting",
+	})
+
+	ReadFailureTimes = promauto.NewCounter(prometheus.CounterOpts{
+		Name: "read_failure_detect_times",
+		Help: "The failure times of read detecting",
+	})
+
+	WriteFailureTimes = promauto.NewCounter(prometheus.CounterOpts{
+		Name: "write_failure_detect_times",
+		Help: "The failure times of write detecting",
+	})
+
+	ReadLatency = promauto.NewGauge(prometheus.GaugeOpts{
+		Name: "read_latency",
+		Help: "The latency of read data",
+	})
+
+	WriteLatency = promauto.NewGauge(prometheus.GaugeOpts{
+		Name: "write_latency",
+		Help: "The latency of write data",
+	})
+)
 
-	detectInterval  time.Duration
+type pegasusDetector struct {
+	client          pegasus.Client
+	detectTable     pegasus.TableConnector
 	detectTableName string
-
-	// timeout of a single detect
+	detectInterval  time.Duration
+	// timeout of a single detect.
 	detectTimeout time.Duration
-
-	detectHashKeys [][]byte
-
-	recentMinuteDetectTimes  uint64
-	recentMinuteFailureTimes uint64
-
-	recentHourDetectTimes  uint64
-	recentHourFailureTimes uint64
-
-	recentDayDetectTimes  uint64
-	recentDayFailureTimes uint64
+	// partition count.
+	partitionCount int

Review Comment:
   ```suggestion
   	partitionCount  int
   ```



##########
collector/avail/detector.go:
##########
@@ -19,106 +19,134 @@ package avail
 
 import (
 	"context"
-	"sync/atomic"
+	"math/rand"
 	"time"
 
+	"github.com/apache/incubator-pegasus/go-client/admin"
 	"github.com/apache/incubator-pegasus/go-client/pegasus"
+	"github.com/prometheus/client_golang/prometheus"
+	"github.com/prometheus/client_golang/prometheus/promauto"
+
 	log "github.com/sirupsen/logrus"
+	"github.com/spf13/viper"
+	"gopkg.in/tomb.v2"
 )
 
 // Detector periodically checks the service availability of the Pegasus cluster.
 type Detector interface {
-
-	// Start detection until the ctx cancelled. This method will block the current thread.
-	Start(ctx context.Context) error
+	Start(tom *tomb.Tomb) error
 }
 
 // NewDetector returns a service-availability detector.
-func NewDetector(client pegasus.Client) Detector {
-	return &pegasusDetector{client: client}
+func NewDetector(detectInterval time.Duration,
+	detectTimeout time.Duration, partitionCount int) Detector {
+	metaServers := viper.GetStringSlice("meta_servers")
+	tableName := viper.GetStringMapString("availablity_detect")["table_name"]
+	// Create detect table.
+	adminClient := admin.NewClient(admin.Config{MetaServers: metaServers})
+	error := adminClient.CreateTable(context.Background(), tableName, partitionCount)
+	if error != nil {
+		log.Errorf("Create detect table %s failed, error: %s", tableName, error)
+	}
+	pegasusClient := pegasus.NewClient(pegasus.Config{MetaServers: metaServers})
+	return &pegasusDetector{
+		client:          pegasusClient,
+		detectTableName: tableName,
+		detectInterval:  detectInterval,
+		detectTimeout:   detectTimeout,
+		partitionCount:  partitionCount,
+	}
 }
 
-type pegasusDetector struct {
-	// client reads and writes periodically to a specified table.
-	client      pegasus.Client
-	detectTable pegasus.TableConnector
+var (
+	DetectTimes = promauto.NewCounter(prometheus.CounterOpts{
+		Name: "detect_times",
+		Help: "The times of availability detecting",
+	})
+
+	ReadFailureTimes = promauto.NewCounter(prometheus.CounterOpts{
+		Name: "read_failure_detect_times",
+		Help: "The failure times of read detecting",
+	})
+
+	WriteFailureTimes = promauto.NewCounter(prometheus.CounterOpts{
+		Name: "write_failure_detect_times",
+		Help: "The failure times of write detecting",
+	})
+
+	ReadLatency = promauto.NewGauge(prometheus.GaugeOpts{
+		Name: "read_latency",
+		Help: "The latency of read data",

Review Comment:
   ```suggestion
   		Help: "The latency of read data in milliseconds",
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@pegasus.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@pegasus.apache.org
For additional commands, e-mail: dev-help@pegasus.apache.org


[GitHub] [incubator-pegasus] acelyc111 commented on pull request #1466: feat: Aggregate table/server level metrics

Posted by "acelyc111 (via GitHub)" <gi...@apache.org>.
acelyc111 commented on PR #1466:
URL: https://github.com/apache/incubator-pegasus/pull/1466#issuecomment-1550738213

   Could you please paste a sample output of the collector's promethues exporter?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@pegasus.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@pegasus.apache.org
For additional commands, e-mail: dev-help@pegasus.apache.org


[GitHub] [incubator-pegasus] acelyc111 commented on a diff in pull request #1466: [Collector] Aggregate table/server level metrics

Posted by "acelyc111 (via GitHub)" <gi...@apache.org>.
acelyc111 commented on code in PR #1466:
URL: https://github.com/apache/incubator-pegasus/pull/1466#discussion_r1185896498


##########
collector/metrics/metric_collector.go:
##########
@@ -0,0 +1,354 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package metrics
+
+import (
+	"errors"
+	"time"
+	"fmt"
+	"io/ioutil"
+	"net/http"
+
+	log "github.com/sirupsen/logrus"
+	"github.com/spf13/viper"
+	"gopkg.in/tomb.v2"
+	"github.com/tidwall/gjson"
+	"github.com/prometheus/client_golang/prometheus"
+    "github.com/prometheus/client_golang/prometheus/promauto"
+)
+
+const (
+	META_SERVER int = 0
+	REPLICA_SERVER int = 1
+)
+
+type Metric struct {
+	name string
+	value int64
+	mtype string
+	desc string
+	unit string
+}
+
+type Metrics []Metric
+
+type Attribute struct {
+	name string
+	value string
+}
+
+type Entity struct {
+	etype string
+	eid string
+	attributes string
+	metrics []Metric
+}
+
+var gaugeMetricsMap_ map[string]prometheus.GaugeVec
+var counterMetricsMap_ map[string]prometheus.CounterVec
+var histogramMetricsMap_ map[string]prometheus.Histogram
+var summaryMetricsMap_ map[string]prometheus.Summary
+
+var dataSource_ int
+
+type MetricCollector interface {
+	Start(tom *tomb.Tomb) error
+}
+
+func NewMetricCollector(dataSource int) MetricCollector {
+	dataSource_ = dataSource
+	gaugeMetricsMap_ = make(map[string]prometheus.GaugeVec, 128)
+	counterMetricsMap_ = make(map[string]prometheus.CounterVec, 128)
+	histogramMetricsMap_ = make(map[string]prometheus.Histogram, 128)
+	summaryMetricsMap_ = make(map[string]prometheus.Summary, 128)
+	initMetrics()
+	return &Collector{detectInterval: 10, detectTimeout: 10}
+}
+
+type Collector struct {
+	detectInterval  time.Duration
+	detectTimeout time.Duration
+}
+
+func (collector *Collector) Start(tom *tomb.Tomb) error {
+	ticker := time.NewTicker(collector.detectInterval)
+	for {
+		select {
+			case <-tom.Dying():
+				return nil
+			case <-ticker.C:
+				processAllMetaServerMetrics()
+				break
+			default:
+		}
+	}
+}
+
+func initMetrics() {
+	var addrs []string
+	if dataSource_ == META_SERVER {
+		addrs = viper.GetStringSlice("meta_servers")
+	} else {
+		addrs = viper.GetStringSlice("replica_servers")

Review Comment:
   Generally, the replica servers list is not exposed like meta servers list, we can fetch replica servers list via meta servers instead, though HTTP `curl localhost:34601/meta/nodes`
   
   Further more, we have to update server list periodically to tolerate the case of scale in and scale out.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@pegasus.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@pegasus.apache.org
For additional commands, e-mail: dev-help@pegasus.apache.org


[GitHub] [incubator-pegasus] acelyc111 commented on a diff in pull request #1466: feat: Aggregate table/server level metrics

Posted by "acelyc111 (via GitHub)" <gi...@apache.org>.
acelyc111 commented on code in PR #1466:
URL: https://github.com/apache/incubator-pegasus/pull/1466#discussion_r1195932406


##########
collector/metrics/metric_collector.go:
##########
@@ -0,0 +1,581 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package metrics
+
+import (
+	"errors"
+	"fmt"
+	"io/ioutil"
+	"math"
+	"net/http"
+	"time"
+
+	"github.com/prometheus/client_golang/prometheus"
+	"github.com/prometheus/client_golang/prometheus/promauto"
+	log "github.com/sirupsen/logrus"
+	"github.com/spf13/viper"
+	"github.com/tidwall/gjson"
+	"gopkg.in/tomb.v2"
+)
+
+const (
+	MetaServer    int = 0
+	ReplicaServer int = 1
+)
+
+type Metric struct {
+	name string
+	// For metric type for counter/gauge.
+	value float64
+	// For metric type of percentile.
+	values []float64
+	mtype  string
+}
+
+type Metrics []Metric
+
+var GaugeMetricsMap map[string]prometheus.GaugeVec
+var CounterMetricsMap map[string]prometheus.CounterVec
+var SummaryMetricsMap map[string]prometheus.Summary
+
+// DataSource 0 meta server, 1 replica server.
+var DataSource int
+var RoleByDataSource map[int]string
+
+var TableNameByID map[string]string
+
+type MetricCollector interface {
+	Start(tom *tomb.Tomb) error
+}
+
+func NewMetricCollector(
+	dataSource int,
+	detectInterval time.Duration,
+	detectTimeout time.Duration) MetricCollector {
+	DataSource = dataSource
+	GaugeMetricsMap = make(map[string]prometheus.GaugeVec, 128)
+	CounterMetricsMap = make(map[string]prometheus.CounterVec, 128)
+	SummaryMetricsMap = make(map[string]prometheus.Summary, 128)
+	RoleByDataSource = make(map[int]string, 128)
+	TableNameByID = make(map[string]string, 128)
+	RoleByDataSource[0] = "meta_server"
+	RoleByDataSource[1] = "replica_server"
+	initMetrics()
+
+	return &Collector{detectInterval: detectInterval, detectTimeout: detectTimeout}
+}
+
+type Collector struct {
+	detectInterval time.Duration
+	detectTimeout  time.Duration
+}
+
+func (collector *Collector) Start(tom *tomb.Tomb) error {
+	ticker := time.NewTicker(collector.detectInterval)
+	for {
+		select {
+		case <-tom.Dying():
+			return nil
+		case <-ticker.C:
+			updateClusterTableInfo()
+			processAllMetaServerMetrics()
+		}
+	}
+}
+
+// Get replica server address.
+func getReplicaAddrs() ([]string, error) {
+	addrs := viper.GetStringSlice("meta_servers")
+	var rserverAddrs []string
+	url := fmt.Sprintf("http://%s/meta/nodes", addrs[0])

Review Comment:
   If there are multiple meta servers, it's needed to attempt to fetch info from other meta servers if the first one failed.



##########
collector/metrics/metric_collector.go:
##########
@@ -0,0 +1,581 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package metrics
+
+import (
+	"errors"
+	"fmt"
+	"io/ioutil"
+	"math"
+	"net/http"
+	"time"
+
+	"github.com/prometheus/client_golang/prometheus"
+	"github.com/prometheus/client_golang/prometheus/promauto"
+	log "github.com/sirupsen/logrus"
+	"github.com/spf13/viper"
+	"github.com/tidwall/gjson"
+	"gopkg.in/tomb.v2"
+)
+
+const (
+	MetaServer    int = 0
+	ReplicaServer int = 1
+)
+
+type Metric struct {
+	name string
+	// For metric type for counter/gauge.
+	value float64
+	// For metric type of percentile.
+	values []float64
+	mtype  string
+}
+
+type Metrics []Metric
+
+var GaugeMetricsMap map[string]prometheus.GaugeVec
+var CounterMetricsMap map[string]prometheus.CounterVec
+var SummaryMetricsMap map[string]prometheus.Summary
+
+// DataSource 0 meta server, 1 replica server.
+var DataSource int
+var RoleByDataSource map[int]string
+
+var TableNameByID map[string]string
+
+type MetricCollector interface {
+	Start(tom *tomb.Tomb) error
+}
+
+func NewMetricCollector(
+	dataSource int,
+	detectInterval time.Duration,
+	detectTimeout time.Duration) MetricCollector {
+	DataSource = dataSource
+	GaugeMetricsMap = make(map[string]prometheus.GaugeVec, 128)
+	CounterMetricsMap = make(map[string]prometheus.CounterVec, 128)
+	SummaryMetricsMap = make(map[string]prometheus.Summary, 128)
+	RoleByDataSource = make(map[int]string, 128)
+	TableNameByID = make(map[string]string, 128)
+	RoleByDataSource[0] = "meta_server"
+	RoleByDataSource[1] = "replica_server"
+	initMetrics()
+
+	return &Collector{detectInterval: detectInterval, detectTimeout: detectTimeout}
+}
+
+type Collector struct {
+	detectInterval time.Duration
+	detectTimeout  time.Duration
+}
+
+func (collector *Collector) Start(tom *tomb.Tomb) error {
+	ticker := time.NewTicker(collector.detectInterval)
+	for {
+		select {
+		case <-tom.Dying():
+			return nil
+		case <-ticker.C:
+			updateClusterTableInfo()
+			processAllMetaServerMetrics()
+		}
+	}
+}
+
+// Get replica server address.
+func getReplicaAddrs() ([]string, error) {
+	addrs := viper.GetStringSlice("meta_servers")
+	var rserverAddrs []string
+	url := fmt.Sprintf("http://%s/meta/nodes", addrs[0])
+	resp, err := http.Get(url)
+	if err == nil && resp.StatusCode != http.StatusOK {
+		err = errors.New(resp.Status)
+	}
+	if err != nil {
+		log.Errorf("Fail to get replica server address from %s, err %s", addrs[0], err)
+		return rserverAddrs, err
+	}
+	body, _ := ioutil.ReadAll(resp.Body)
+	jsonData := gjson.Parse(string(body))
+	for key := range jsonData.Get("details").Map() {
+		rserverAddrs = append(rserverAddrs, key)
+	}
+	defer resp.Body.Close()
+	return rserverAddrs, nil
+}
+
+// Register all metrics.
+func initMetrics() {
+	var addrs []string
+	var err error
+	if DataSource == MetaServer {
+		addrs = viper.GetStringSlice("meta_servers")
+	} else {
+		addrs, err = getReplicaAddrs()
+		if err != nil {
+			log.Errorf("Get replica server address failed, err: %s", err)
+			return
+		}
+	}
+	for _, addr := range addrs {
+		data, err := getOneServerMetrics(addr)
+		if err != nil {
+			log.Errorf("Get raw metrics from %s failed, err: %s", addr, err)
+			return
+		}
+		jsonData := gjson.Parse(data)
+		for _, entity := range jsonData.Array() {
+			for _, metric := range entity.Get("metrics").Array() {
+				var name string = metric.Get("name").String()
+				var mtype string = metric.Get("type").String()
+				var desc string = metric.Get("desc").String()
+				switch mtype {
+				case "counter":
+					if _, ok := CounterMetricsMap[name]; ok {
+						continue
+					}
+					counterMetric := promauto.NewCounterVec(prometheus.CounterOpts{
+						Name: name,
+						Help: desc,
+					}, []string{"endpoint", "role", "level", "title"})
+					CounterMetricsMap[name] = *counterMetric
+				case "gauge":
+					if _, ok := GaugeMetricsMap[name]; ok {
+						continue
+					}
+					gaugeMetric := promauto.NewGaugeVec(prometheus.GaugeOpts{
+						Name: name,
+						Help: desc,
+					}, []string{"endpoint", "role", "level", "title"})
+					GaugeMetricsMap[name] = *gaugeMetric
+				case "percentile":
+					if _, ok := SummaryMetricsMap[name]; ok {
+						continue
+					}
+					summaryMetric := promauto.NewSummary(prometheus.SummaryOpts{
+						Name: name,
+						Help: desc,
+						Objectives: map[float64]float64{
+							0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001, 0.999: 0.0001},
+					})
+					SummaryMetricsMap[name] = summaryMetric
+				case "histogram":
+				default:
+					log.Errorf("Unsupport metric type %s", mtype)
+				}
+			}
+		}
+	}
+}
+
+// Parse metric data and update metrics.
+func processAllMetaServerMetrics() {
+	var addrs []string
+	var err error
+	if DataSource == MetaServer {
+		addrs = viper.GetStringSlice("meta_servers")
+	} else {
+		addrs, err = getReplicaAddrs()
+		if err != nil {
+			log.Errorf("Get replica server address failed, err: %s", err)
+			return
+		}
+	}
+	metricsByTableID := make(map[string]Metrics, 128)
+	metricsByServerTableID := make(map[string]Metrics, 128)
+	var metricsOfCluster []Metric
+	metricsByAddr := make(map[string]Metrics, 128)
+	for _, addr := range addrs {
+		data, err := getOneServerMetrics(addr)
+		if err != nil {
+			log.Errorf("failed to get data from %s, err %s", addr, err)
+			return
+		}
+		jsonData := gjson.Parse(data)
+		for _, entity := range jsonData.Array() {
+			etype := entity.Get("type").String()
+			switch etype {
+			case "replica":
+			case "partition":
+				tableID := entity.Get("attributes").Get("table_id").String()
+				mergeIntoClusterLevelTableMetric(entity.Get("metrics").Array(),
+					tableID, &metricsByTableID)
+			case "table":
+				tableID := entity.Get("attributes").Get("table_id").String()
+				mergeIntoClusterLevelTableMetric(entity.Get("metrics").Array(),
+					tableID, &metricsByTableID)
+				collectServerLevelTableMetric(entity.Get("metrics").Array(), tableID,
+					&metricsByServerTableID)
+				updateServerLevelTableMetrics(addr, metricsByServerTableID)
+			case "server":
+				mergeIntoClusterLevelServerMetric(entity.Get("metrics").Array(),
+					metricsOfCluster)
+				collectServerLevelServerMetrics(entity.Get("metrics").Array(),
+					addr, &metricsByAddr)
+			default:
+				log.Errorf("Unsupport entity type %s", etype)
+			}
+		}
+	}
+
+	updateClusterLevelTableMetrics(metricsByTableID)
+	updateServerLevelServerMetrics(metricsByAddr)
+	updateClusterLevelMetrics(metricsOfCluster)
+}
+
+// Update table metrics. They belong to a specified server.
+func updateServerLevelTableMetrics(addr string, metricsByServerTableID map[string]Metrics) {
+	role := RoleByDataSource[DataSource]
+	for tableID, metrics := range metricsByServerTableID {
+		var tableName string
+		if name, ok := TableNameByID[tableID]; !ok {
+			tableName = tableID

Review Comment:
   Is there any case it return not ok?



##########
collector/metrics/metric_collector.go:
##########
@@ -0,0 +1,581 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package metrics
+
+import (
+	"errors"
+	"fmt"
+	"io/ioutil"
+	"math"
+	"net/http"
+	"time"
+
+	"github.com/prometheus/client_golang/prometheus"
+	"github.com/prometheus/client_golang/prometheus/promauto"
+	log "github.com/sirupsen/logrus"
+	"github.com/spf13/viper"
+	"github.com/tidwall/gjson"
+	"gopkg.in/tomb.v2"
+)
+
+const (
+	MetaServer    int = 0
+	ReplicaServer int = 1
+)
+
+type Metric struct {
+	name string
+	// For metric type for counter/gauge.
+	value float64
+	// For metric type of percentile.
+	values []float64
+	mtype  string
+}
+
+type Metrics []Metric
+
+var GaugeMetricsMap map[string]prometheus.GaugeVec
+var CounterMetricsMap map[string]prometheus.CounterVec
+var SummaryMetricsMap map[string]prometheus.Summary
+
+// DataSource 0 meta server, 1 replica server.
+var DataSource int
+var RoleByDataSource map[int]string
+
+var TableNameByID map[string]string
+
+type MetricCollector interface {
+	Start(tom *tomb.Tomb) error
+}
+
+func NewMetricCollector(
+	dataSource int,
+	detectInterval time.Duration,
+	detectTimeout time.Duration) MetricCollector {
+	DataSource = dataSource
+	GaugeMetricsMap = make(map[string]prometheus.GaugeVec, 128)
+	CounterMetricsMap = make(map[string]prometheus.CounterVec, 128)
+	SummaryMetricsMap = make(map[string]prometheus.Summary, 128)
+	RoleByDataSource = make(map[int]string, 128)
+	TableNameByID = make(map[string]string, 128)
+	RoleByDataSource[0] = "meta_server"
+	RoleByDataSource[1] = "replica_server"
+	initMetrics()
+
+	return &Collector{detectInterval: detectInterval, detectTimeout: detectTimeout}
+}
+
+type Collector struct {
+	detectInterval time.Duration
+	detectTimeout  time.Duration
+}
+
+func (collector *Collector) Start(tom *tomb.Tomb) error {
+	ticker := time.NewTicker(collector.detectInterval)
+	for {
+		select {
+		case <-tom.Dying():
+			return nil
+		case <-ticker.C:
+			updateClusterTableInfo()
+			processAllMetaServerMetrics()
+		}
+	}
+}
+
+// Get replica server address.
+func getReplicaAddrs() ([]string, error) {
+	addrs := viper.GetStringSlice("meta_servers")
+	var rserverAddrs []string
+	url := fmt.Sprintf("http://%s/meta/nodes", addrs[0])
+	resp, err := http.Get(url)
+	if err == nil && resp.StatusCode != http.StatusOK {
+		err = errors.New(resp.Status)
+	}
+	if err != nil {
+		log.Errorf("Fail to get replica server address from %s, err %s", addrs[0], err)
+		return rserverAddrs, err
+	}
+	body, _ := ioutil.ReadAll(resp.Body)
+	jsonData := gjson.Parse(string(body))
+	for key := range jsonData.Get("details").Map() {
+		rserverAddrs = append(rserverAddrs, key)
+	}
+	defer resp.Body.Close()
+	return rserverAddrs, nil
+}
+
+// Register all metrics.
+func initMetrics() {
+	var addrs []string
+	var err error
+	if DataSource == MetaServer {
+		addrs = viper.GetStringSlice("meta_servers")
+	} else {
+		addrs, err = getReplicaAddrs()
+		if err != nil {
+			log.Errorf("Get replica server address failed, err: %s", err)
+			return
+		}
+	}
+	for _, addr := range addrs {
+		data, err := getOneServerMetrics(addr)
+		if err != nil {
+			log.Errorf("Get raw metrics from %s failed, err: %s", addr, err)
+			return
+		}
+		jsonData := gjson.Parse(data)
+		for _, entity := range jsonData.Array() {
+			for _, metric := range entity.Get("metrics").Array() {
+				var name string = metric.Get("name").String()
+				var mtype string = metric.Get("type").String()
+				var desc string = metric.Get("desc").String()
+				switch mtype {
+				case "counter":
+					if _, ok := CounterMetricsMap[name]; ok {
+						continue
+					}
+					counterMetric := promauto.NewCounterVec(prometheus.CounterOpts{
+						Name: name,
+						Help: desc,
+					}, []string{"endpoint", "role", "level", "title"})
+					CounterMetricsMap[name] = *counterMetric
+				case "gauge":
+					if _, ok := GaugeMetricsMap[name]; ok {
+						continue
+					}
+					gaugeMetric := promauto.NewGaugeVec(prometheus.GaugeOpts{
+						Name: name,
+						Help: desc,
+					}, []string{"endpoint", "role", "level", "title"})
+					GaugeMetricsMap[name] = *gaugeMetric
+				case "percentile":
+					if _, ok := SummaryMetricsMap[name]; ok {
+						continue
+					}
+					summaryMetric := promauto.NewSummary(prometheus.SummaryOpts{
+						Name: name,
+						Help: desc,
+						Objectives: map[float64]float64{
+							0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001, 0.999: 0.0001},
+					})
+					SummaryMetricsMap[name] = summaryMetric
+				case "histogram":
+				default:
+					log.Errorf("Unsupport metric type %s", mtype)
+				}
+			}
+		}
+	}
+}
+
+// Parse metric data and update metrics.
+func processAllMetaServerMetrics() {
+	var addrs []string
+	var err error
+	if DataSource == MetaServer {
+		addrs = viper.GetStringSlice("meta_servers")
+	} else {
+		addrs, err = getReplicaAddrs()

Review Comment:
   This function processes both meta server and replica server, but named `processAllMetaServerMetrics`?



##########
collector/metrics/metric_collector.go:
##########
@@ -0,0 +1,581 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package metrics
+
+import (
+	"errors"
+	"fmt"
+	"io/ioutil"
+	"math"
+	"net/http"
+	"time"
+
+	"github.com/prometheus/client_golang/prometheus"
+	"github.com/prometheus/client_golang/prometheus/promauto"
+	log "github.com/sirupsen/logrus"
+	"github.com/spf13/viper"
+	"github.com/tidwall/gjson"
+	"gopkg.in/tomb.v2"
+)
+
+const (
+	MetaServer    int = 0
+	ReplicaServer int = 1
+)
+
+type Metric struct {
+	name string
+	// For metric type for counter/gauge.
+	value float64
+	// For metric type of percentile.
+	values []float64
+	mtype  string
+}
+
+type Metrics []Metric
+
+var GaugeMetricsMap map[string]prometheus.GaugeVec
+var CounterMetricsMap map[string]prometheus.CounterVec
+var SummaryMetricsMap map[string]prometheus.Summary
+
+// DataSource 0 meta server, 1 replica server.
+var DataSource int
+var RoleByDataSource map[int]string
+
+var TableNameByID map[string]string
+
+type MetricCollector interface {
+	Start(tom *tomb.Tomb) error
+}
+
+func NewMetricCollector(
+	dataSource int,
+	detectInterval time.Duration,
+	detectTimeout time.Duration) MetricCollector {
+	DataSource = dataSource
+	GaugeMetricsMap = make(map[string]prometheus.GaugeVec, 128)
+	CounterMetricsMap = make(map[string]prometheus.CounterVec, 128)
+	SummaryMetricsMap = make(map[string]prometheus.Summary, 128)
+	RoleByDataSource = make(map[int]string, 128)
+	TableNameByID = make(map[string]string, 128)
+	RoleByDataSource[0] = "meta_server"
+	RoleByDataSource[1] = "replica_server"
+	initMetrics()
+
+	return &Collector{detectInterval: detectInterval, detectTimeout: detectTimeout}
+}
+
+type Collector struct {
+	detectInterval time.Duration
+	detectTimeout  time.Duration
+}
+
+func (collector *Collector) Start(tom *tomb.Tomb) error {
+	ticker := time.NewTicker(collector.detectInterval)
+	for {
+		select {
+		case <-tom.Dying():
+			return nil
+		case <-ticker.C:
+			updateClusterTableInfo()
+			processAllMetaServerMetrics()
+		}
+	}
+}
+
+// Get replica server address.
+func getReplicaAddrs() ([]string, error) {
+	addrs := viper.GetStringSlice("meta_servers")
+	var rserverAddrs []string
+	url := fmt.Sprintf("http://%s/meta/nodes", addrs[0])
+	resp, err := http.Get(url)
+	if err == nil && resp.StatusCode != http.StatusOK {
+		err = errors.New(resp.Status)
+	}
+	if err != nil {
+		log.Errorf("Fail to get replica server address from %s, err %s", addrs[0], err)
+		return rserverAddrs, err
+	}
+	body, _ := ioutil.ReadAll(resp.Body)
+	jsonData := gjson.Parse(string(body))
+	for key := range jsonData.Get("details").Map() {
+		rserverAddrs = append(rserverAddrs, key)
+	}
+	defer resp.Body.Close()
+	return rserverAddrs, nil
+}
+
+// Register all metrics.
+func initMetrics() {
+	var addrs []string
+	var err error
+	if DataSource == MetaServer {
+		addrs = viper.GetStringSlice("meta_servers")
+	} else {
+		addrs, err = getReplicaAddrs()
+		if err != nil {
+			log.Errorf("Get replica server address failed, err: %s", err)
+			return
+		}
+	}
+	for _, addr := range addrs {
+		data, err := getOneServerMetrics(addr)
+		if err != nil {
+			log.Errorf("Get raw metrics from %s failed, err: %s", addr, err)
+			return
+		}
+		jsonData := gjson.Parse(data)
+		for _, entity := range jsonData.Array() {
+			for _, metric := range entity.Get("metrics").Array() {
+				var name string = metric.Get("name").String()
+				var mtype string = metric.Get("type").String()
+				var desc string = metric.Get("desc").String()
+				switch mtype {
+				case "counter":
+					if _, ok := CounterMetricsMap[name]; ok {
+						continue
+					}
+					counterMetric := promauto.NewCounterVec(prometheus.CounterOpts{
+						Name: name,
+						Help: desc,
+					}, []string{"endpoint", "role", "level", "title"})
+					CounterMetricsMap[name] = *counterMetric
+				case "gauge":
+					if _, ok := GaugeMetricsMap[name]; ok {
+						continue
+					}
+					gaugeMetric := promauto.NewGaugeVec(prometheus.GaugeOpts{
+						Name: name,
+						Help: desc,
+					}, []string{"endpoint", "role", "level", "title"})
+					GaugeMetricsMap[name] = *gaugeMetric
+				case "percentile":
+					if _, ok := SummaryMetricsMap[name]; ok {
+						continue
+					}
+					summaryMetric := promauto.NewSummary(prometheus.SummaryOpts{
+						Name: name,
+						Help: desc,
+						Objectives: map[float64]float64{
+							0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001, 0.999: 0.0001},
+					})
+					SummaryMetricsMap[name] = summaryMetric
+				case "histogram":
+				default:
+					log.Errorf("Unsupport metric type %s", mtype)
+				}
+			}
+		}
+	}
+}
+
+// Parse metric data and update metrics.
+func processAllMetaServerMetrics() {
+	var addrs []string
+	var err error
+	if DataSource == MetaServer {
+		addrs = viper.GetStringSlice("meta_servers")
+	} else {
+		addrs, err = getReplicaAddrs()
+		if err != nil {
+			log.Errorf("Get replica server address failed, err: %s", err)
+			return
+		}
+	}
+	metricsByTableID := make(map[string]Metrics, 128)
+	metricsByServerTableID := make(map[string]Metrics, 128)
+	var metricsOfCluster []Metric
+	metricsByAddr := make(map[string]Metrics, 128)
+	for _, addr := range addrs {
+		data, err := getOneServerMetrics(addr)
+		if err != nil {
+			log.Errorf("failed to get data from %s, err %s", addr, err)
+			return
+		}
+		jsonData := gjson.Parse(data)
+		for _, entity := range jsonData.Array() {
+			etype := entity.Get("type").String()
+			switch etype {
+			case "replica":
+			case "partition":
+				tableID := entity.Get("attributes").Get("table_id").String()
+				mergeIntoClusterLevelTableMetric(entity.Get("metrics").Array(),
+					tableID, &metricsByTableID)
+			case "table":
+				tableID := entity.Get("attributes").Get("table_id").String()
+				mergeIntoClusterLevelTableMetric(entity.Get("metrics").Array(),
+					tableID, &metricsByTableID)
+				collectServerLevelTableMetric(entity.Get("metrics").Array(), tableID,
+					&metricsByServerTableID)
+				updateServerLevelTableMetrics(addr, metricsByServerTableID)
+			case "server":
+				mergeIntoClusterLevelServerMetric(entity.Get("metrics").Array(),
+					metricsOfCluster)
+				collectServerLevelServerMetrics(entity.Get("metrics").Array(),
+					addr, &metricsByAddr)
+			default:
+				log.Errorf("Unsupport entity type %s", etype)
+			}
+		}
+	}
+
+	updateClusterLevelTableMetrics(metricsByTableID)
+	updateServerLevelServerMetrics(metricsByAddr)
+	updateClusterLevelMetrics(metricsOfCluster)
+}
+
+// Update table metrics. They belong to a specified server.
+func updateServerLevelTableMetrics(addr string, metricsByServerTableID map[string]Metrics) {
+	role := RoleByDataSource[DataSource]
+	for tableID, metrics := range metricsByServerTableID {
+		var tableName string
+		if name, ok := TableNameByID[tableID]; !ok {
+			tableName = tableID
+		} else {
+			tableName = name
+		}
+		for _, metric := range metrics {
+			switch metric.mtype {
+			case "counter":
+				if counter, ok := CounterMetricsMap[metric.name]; ok {
+					counter.With(
+						prometheus.Labels{"endpoint": addr,
+							"role": role, "level": "server",
+							"title": tableName}).Add(float64(metric.value))
+				} else {
+					log.Warnf("Unknown metric name %s", metric.name)
+				}
+			case "gauge":
+				if gauge, ok := GaugeMetricsMap[metric.name]; ok {
+					gauge.With(
+						prometheus.Labels{"endpoint": addr,
+							"role": role, "level": "server",
+							"title": tableName}).Set(float64(metric.value))
+				} else {
+					log.Warnf("Unknown metric name %s", metric.name)
+				}
+			case "percentile":
+				log.Warnf("Todo metric type %s", metric.mtype)
+			case "histogram":
+			default:
+				log.Warnf("Unknown metric type %s", metric.mtype)
+			}
+		}
+	}
+}
+
+// Update server metrics. They belong to a specified server.
+func updateServerLevelServerMetrics(metricsByAddr map[string]Metrics) {
+	role := RoleByDataSource[DataSource]
+	for addr, metrics := range metricsByAddr {
+		for _, metric := range metrics {
+			switch metric.mtype {
+			case "counter":
+				if counter, ok := CounterMetricsMap[metric.name]; ok {
+					counter.With(
+						prometheus.Labels{"endpoint": addr,
+							"role": role, "level": "server",
+							"title": "server"}).Add(float64(metric.value))
+				} else {
+					log.Warnf("Unknown metric name %s", metric.name)
+				}
+			case "gauge":
+				if gauge, ok := GaugeMetricsMap[metric.name]; ok {
+					gauge.With(
+						prometheus.Labels{"endpoint": addr,
+							"role": role, "level": "server",
+							"title": "server"}).Set(float64(metric.value))
+				} else {
+					log.Warnf("Unknown metric name %s", metric.name)
+				}
+			case "percentile":
+				log.Warnf("Todo metric type %s", metric.mtype)
+			case "histogram":
+			default:
+				log.Warnf("Unknown metric type %s", metric.mtype)
+			}
+		}

Review Comment:
   Is the code duplicate? Is it able to encapsulate a function?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@pegasus.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@pegasus.apache.org
For additional commands, e-mail: dev-help@pegasus.apache.org