You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by wa...@apache.org on 2023/06/21 04:24:46 UTC

[incubator-pegasus] 33/38: feat: Aggregate table/server level metrics (#1517)

This is an automated email from the ASF dual-hosted git repository.

wangdan pushed a commit to branch migrate-metrics-dev
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git

commit 1247b5aa6cfbcf7fd78c966eab19ee4f58a7a514
Author: xinghuayu007 <14...@qq.com>
AuthorDate: Wed Jun 7 11:45:29 2023 +0800

    feat: Aggregate table/server level metrics (#1517)
    
    Related issue: https://github.com/apache/incubator-pegasus/issues/1206
    
    This patch collects all raw metric data from pegasus, aggregate them into
    table/server/cluster level and sinks them into prometheus.
    If the type of the metric is replica partition, it will be aggregated
    into cluster level table metric.
    If the type of the metric is table, it will be aggregated into cluster
    level table metric, it also will be aggregated into server level table
    metric.
    If the type of the metric is server, it will be aggregated into cluster
    level table metric, it also will be sinked into prometheus directly.
    
    Currently, this patch supports 3 type of prometheus metric types:
    counter, gauge, summary.
---
 collector/avail/detector.go                    | 150 +++++---
 collector/config.yml                           |   3 +-
 collector/main.go                              |  14 +-
 collector/metrics/meta_server_metrics.go       |  23 ++
 collector/metrics/metric_collector.go          | 514 +++++++++++++++++++++++++
 collector/metrics/replica_server_metrics.go    |  23 ++
 collector/{metrics => sink}/falcon_sink.go     |   2 +-
 collector/{metrics => sink}/prometheus_sink.go |   2 +-
 collector/{metrics => sink}/sink.go            |   2 +-
 9 files changed, 662 insertions(+), 71 deletions(-)

diff --git a/collector/avail/detector.go b/collector/avail/detector.go
index 1e23a6210..a4f535394 100644
--- a/collector/avail/detector.go
+++ b/collector/avail/detector.go
@@ -19,106 +19,134 @@ package avail
 
 import (
 	"context"
-	"sync/atomic"
+	"math/rand"
 	"time"
 
+	"github.com/apache/incubator-pegasus/go-client/admin"
 	"github.com/apache/incubator-pegasus/go-client/pegasus"
+	"github.com/prometheus/client_golang/prometheus"
+	"github.com/prometheus/client_golang/prometheus/promauto"
+
 	log "github.com/sirupsen/logrus"
+	"github.com/spf13/viper"
+	"gopkg.in/tomb.v2"
 )
 
 // Detector periodically checks the service availability of the Pegasus cluster.
 type Detector interface {
-
-	// Start detection until the ctx cancelled. This method will block the current thread.
-	Start(ctx context.Context) error
+	Start(tom *tomb.Tomb) error
 }
 
 // NewDetector returns a service-availability detector.
-func NewDetector(client pegasus.Client) Detector {
-	return &pegasusDetector{client: client}
+func NewDetector(detectInterval time.Duration,
+	detectTimeout time.Duration, partitionCount int) Detector {
+	metaServers := viper.GetStringSlice("meta_servers")
+	tableName := viper.GetStringMapString("availablity_detect")["table_name"]
+	// Create detect table.
+	adminClient := admin.NewClient(admin.Config{MetaServers: metaServers})
+	error := adminClient.CreateTable(context.Background(), tableName, partitionCount)
+	if error != nil {
+		log.Errorf("Create detect table %s failed, error: %s", tableName, error)
+	}
+	pegasusClient := pegasus.NewClient(pegasus.Config{MetaServers: metaServers})
+	return &pegasusDetector{
+		client:          pegasusClient,
+		detectTableName: tableName,
+		detectInterval:  detectInterval,
+		detectTimeout:   detectTimeout,
+		partitionCount:  partitionCount,
+	}
 }
 
-type pegasusDetector struct {
-	// client reads and writes periodically to a specified table.
-	client      pegasus.Client
-	detectTable pegasus.TableConnector
+var (
+	DetectTimes = promauto.NewCounter(prometheus.CounterOpts{
+		Name: "detect_times",
+		Help: "The times of availability detecting",
+	})
+
+	ReadFailureTimes = promauto.NewCounter(prometheus.CounterOpts{
+		Name: "read_failure_detect_times",
+		Help: "The failure times of read detecting",
+	})
+
+	WriteFailureTimes = promauto.NewCounter(prometheus.CounterOpts{
+		Name: "write_failure_detect_times",
+		Help: "The failure times of write detecting",
+	})
+
+	ReadLatency = promauto.NewGauge(prometheus.GaugeOpts{
+		Name: "read_latency_ms",
+		Help: "The latency of read data in milliseconds",
+	})
+
+	WriteLatency = promauto.NewGauge(prometheus.GaugeOpts{
+		Name: "write_latency_ms",
+		Help: "The latency of write data in milliseconds",
+	})
+)
 
-	detectInterval  time.Duration
+type pegasusDetector struct {
+	client          pegasus.Client
+	detectTable     pegasus.TableConnector
 	detectTableName string
-
-	// timeout of a single detect
-	detectTimeout time.Duration
-
-	detectHashKeys [][]byte
-
-	recentMinuteDetectTimes  uint64
-	recentMinuteFailureTimes uint64
-
-	recentHourDetectTimes  uint64
-	recentHourFailureTimes uint64
-
-	recentDayDetectTimes  uint64
-	recentDayFailureTimes uint64
+	detectInterval  time.Duration
+	// timeout of a single detect.
+	detectTimeout	time.Duration
+	// partition count.
+	partitionCount	int
 }
 
-func (d *pegasusDetector) Start(rootCtx context.Context) error {
+func (d *pegasusDetector) Start(tom *tomb.Tomb) error {
 	var err error
-	ctx, cancel := context.WithTimeout(rootCtx, 10*time.Second)
-	defer cancel()
-	d.detectTable, err = d.client.OpenTable(ctx, d.detectTableName)
+	// Open the detect table.
+	d.detectTable, err = d.client.OpenTable(context.Background(), d.detectTableName)
 	if err != nil {
+		log.Errorf("Open detect table %s failed, error: %s", d.detectTable, err)
 		return err
 	}
-
 	ticker := time.NewTicker(d.detectInterval)
 	for {
 		select {
-		case <-rootCtx.Done(): // check if context cancelled
+		case <-tom.Dying():
 			return nil
 		case <-ticker.C:
-			return nil
-		default:
+			d.detectPartition()
 		}
-
-		// periodically set/get a configured Pegasus table.
-		d.detect(ctx)
 	}
 }
 
-func (d *pegasusDetector) detect(rootCtx context.Context) {
-	// TODO(yingchun): doesn't work, just to mute lint errors.
-	d.detectPartition(rootCtx, 1)
-}
-
-func (d *pegasusDetector) detectPartition(rootCtx context.Context, partitionIdx int) {
-	d.incrDetectTimes()
+func (d *pegasusDetector) detectPartition() {
+	DetectTimes.Inc()
 
 	go func() {
-		ctx, cancel := context.WithTimeout(rootCtx, d.detectTimeout)
+		ctx, cancel := context.WithTimeout(context.Background(), d.detectTimeout)
 		defer cancel()
-
-		hashkey := d.detectHashKeys[partitionIdx]
-		value := []byte("")
-
+		value := []byte("test")
+		// Select a partition randomly to be detected. That will ensure every partition
+		// be detected.
+		hashkey := []byte(RandStringBytes(d.partitionCount))
+		now := time.Now().UnixMilli()
 		if err := d.detectTable.Set(ctx, hashkey, []byte(""), value); err != nil {
-			d.incrFailureTimes()
-			log.Errorf("set partition [%d] failed, hashkey=\"%s\": %s", partitionIdx, hashkey, err)
+			WriteFailureTimes.Inc()
+			log.Errorf("Set hashkey \"%s\" failed, error: %s", hashkey, err)
 		}
+		ReadLatency.Set(float64(time.Now().UnixMilli() - now))
+		now = time.Now().UnixMilli()
 		if _, err := d.detectTable.Get(ctx, hashkey, []byte("")); err != nil {
-			d.incrFailureTimes()
-			log.Errorf("get partition [%d] failed, hashkey=\"%s\": %s", partitionIdx, hashkey, err)
+			ReadFailureTimes.Inc()
+			log.Errorf("Get hashkey \"%s\" failed, error: %s", hashkey, err)
 		}
+		WriteLatency.Set(float64(time.Now().UnixMilli() - now))
 	}()
 }
 
-func (d *pegasusDetector) incrDetectTimes() {
-	atomic.AddUint64(&d.recentMinuteDetectTimes, 1)
-	atomic.AddUint64(&d.recentHourDetectTimes, 1)
-	atomic.AddUint64(&d.recentDayDetectTimes, 1)
-}
+const letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
 
-func (d *pegasusDetector) incrFailureTimes() {
-	atomic.AddUint64(&d.recentMinuteFailureTimes, 1)
-	atomic.AddUint64(&d.recentHourFailureTimes, 1)
-	atomic.AddUint64(&d.recentDayFailureTimes, 1)
+// Generate a random string.
+func RandStringBytes(n int) string {
+	b := make([]byte, n)
+	for i := range b {
+		b[i] = letterBytes[rand.Intn(len(letterBytes))]
+	}
+	return string(b)
 }
diff --git a/collector/config.yml b/collector/config.yml
index b89993d6c..1ff2d10e9 100644
--- a/collector/config.yml
+++ b/collector/config.yml
@@ -22,6 +22,7 @@ cluster_name : "onebox"
 meta_servers: 
   - 127.0.0.1:34601
   - 127.0.0.1:34602
+  - 127.0.0.1:34603
 
 # local server port
 port : 34101
@@ -41,5 +42,5 @@ falcon_agent:
   port : 1988
   http_path : "/v1/push"
 
-available_detect:
+availablity_detect:
   table_name : test
diff --git a/collector/main.go b/collector/main.go
index d3d45d26c..71dd5abbf 100644
--- a/collector/main.go
+++ b/collector/main.go
@@ -26,8 +26,8 @@ import (
 	"strings"
 	"syscall"
 
-	"github.com/pegasus-kv/collector/aggregate"
-	"github.com/pegasus-kv/collector/usage"
+	"github.com/pegasus-kv/collector/avail"
+	"github.com/pegasus-kv/collector/metrics"
 	"github.com/pegasus-kv/collector/webui"
 	log "github.com/sirupsen/logrus"
 	"github.com/spf13/viper"
@@ -88,12 +88,14 @@ func main() {
 		tom.Kill(errors.New("collector terminates")) // kill other goroutines
 	})
 	tom.Go(func() error {
-		aggregate.Start(tom)
-		return nil
+		// Set detect inteverl and detect timeout 10s.
+		return avail.NewDetector(10000000000, 10000000000, 16).Start(tom)
 	})
 	tom.Go(func() error {
-		usage.NewTableUsageRecorder().Start(tom)
-		return nil
+		return metrics.NewMetaServerMetricCollector().Start(tom)
+	})
+	tom.Go(func() error {
+		return metrics.NewReplicaServerMetricCollector().Start(tom)
 	})
 	<-tom.Dead() // gracefully wait until all goroutines dead
 }
diff --git a/collector/metrics/meta_server_metrics.go b/collector/metrics/meta_server_metrics.go
new file mode 100644
index 000000000..0cff62727
--- /dev/null
+++ b/collector/metrics/meta_server_metrics.go
@@ -0,0 +1,23 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package metrics
+
+func NewMetaServerMetricCollector() MetricCollector {
+	// Set detect interval and timeout 10s.
+	return NewMetricCollector(MetaServer, 10000000000, 10000000000)
+}
diff --git a/collector/metrics/metric_collector.go b/collector/metrics/metric_collector.go
new file mode 100644
index 000000000..9dd8e75e9
--- /dev/null
+++ b/collector/metrics/metric_collector.go
@@ -0,0 +1,514 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package metrics
+
+import (
+	"errors"
+	"fmt"
+	"io/ioutil"
+	"math"
+	"net/http"
+	"time"
+
+	"github.com/prometheus/client_golang/prometheus"
+	"github.com/prometheus/client_golang/prometheus/promauto"
+	log "github.com/sirupsen/logrus"
+	"github.com/spf13/viper"
+	"github.com/tidwall/gjson"
+	"gopkg.in/tomb.v2"
+)
+
+const (
+	MetaServer    int = 0
+	ReplicaServer int = 1
+)
+
+type Metric struct {
+	name string
+	// For metric type for counter/gauge.
+	value float64
+	// For metric type of percentile.
+	values []float64
+	mtype  string
+}
+
+type Metrics []Metric
+
+var GaugeMetricsMap map[string]prometheus.GaugeVec
+var CounterMetricsMap map[string]prometheus.CounterVec
+var SummaryMetricsMap map[string]prometheus.Summary
+
+// DataSource 0 meta server, 1 replica server.
+var DataSource int
+var RoleByDataSource map[int]string
+
+var TableNameByID map[string]string
+
+type MetricCollector interface {
+	Start(tom *tomb.Tomb) error
+}
+
+func NewMetricCollector(
+	dataSource int,
+	detectInterval time.Duration,
+	detectTimeout time.Duration) MetricCollector {
+	DataSource = dataSource
+	GaugeMetricsMap = make(map[string]prometheus.GaugeVec, 128)
+	CounterMetricsMap = make(map[string]prometheus.CounterVec, 128)
+	SummaryMetricsMap = make(map[string]prometheus.Summary, 128)
+	RoleByDataSource = make(map[int]string, 128)
+	TableNameByID = make(map[string]string, 128)
+	RoleByDataSource[0] = "meta_server"
+	RoleByDataSource[1] = "replica_server"
+	initMetrics()
+
+	return &Collector{detectInterval: detectInterval, detectTimeout: detectTimeout}
+}
+
+type Collector struct {
+	detectInterval time.Duration
+	detectTimeout  time.Duration
+}
+
+func (collector *Collector) Start(tom *tomb.Tomb) error {
+	ticker := time.NewTicker(collector.detectInterval)
+	for {
+		select {
+		case <-tom.Dying():
+			return nil
+		case <-ticker.C:
+			updateClusterTableInfo()
+			processAllServerMetrics()
+		}
+	}
+}
+
+// Get replica server address.
+func getReplicaAddrs() ([]string, error) {
+	addrs := viper.GetStringSlice("meta_servers")
+	var rserverAddrs []string
+	for addr := range addrs {
+		url := fmt.Sprintf("http://%s/meta/nodes", addr)
+		resp, err := http.Get(url)
+		if err == nil && resp.StatusCode != http.StatusOK {
+			err = errors.New(resp.Status)
+		}
+		if err != nil {
+			log.Errorf("Fail to get replica server address from %s, err %s", addr, err)
+			continue
+		}
+		body, _ := ioutil.ReadAll(resp.Body)
+		jsonData := gjson.Parse(string(body))
+		for key := range jsonData.Get("details").Map() {
+			rserverAddrs = append(rserverAddrs, key)
+		}
+		defer resp.Body.Close()
+		break
+	}
+	return rserverAddrs, nil
+}
+
+// Register all metrics.
+func initMetrics() {
+	var addrs []string
+	var err error
+	if DataSource == MetaServer {
+		addrs = viper.GetStringSlice("meta_servers")
+	} else {
+		addrs, err = getReplicaAddrs()
+		if err != nil {
+			log.Errorf("Get replica server address failed, err: %s", err)
+			return
+		}
+	}
+	for _, addr := range addrs {
+		data, err := getOneServerMetrics(addr)
+		if err != nil {
+			log.Errorf("Get raw metrics from %s failed, err: %s", addr, err)
+			return
+		}
+		jsonData := gjson.Parse(data)
+		for _, entity := range jsonData.Array() {
+			for _, metric := range entity.Get("metrics").Array() {
+				var name string = metric.Get("name").String()
+				var mtype string = metric.Get("type").String()
+				var desc string = metric.Get("desc").String()
+				switch mtype {
+				case "Counter":
+					if _, ok := CounterMetricsMap[name]; ok {
+						continue
+					}
+					counterMetric := promauto.NewCounterVec(prometheus.CounterOpts{
+						Name: name,
+						Help: desc,
+					}, []string{"endpoint", "role", "level", "title"})
+					CounterMetricsMap[name] = *counterMetric
+				case "Gauge":
+					if _, ok := GaugeMetricsMap[name]; ok {
+						continue
+					}
+					gaugeMetric := promauto.NewGaugeVec(prometheus.GaugeOpts{
+						Name: name,
+						Help: desc,
+					}, []string{"endpoint", "role", "level", "title"})
+					GaugeMetricsMap[name] = *gaugeMetric
+				case "Percentile":
+					if _, ok := SummaryMetricsMap[name]; ok {
+						continue
+					}
+					summaryMetric := promauto.NewSummary(prometheus.SummaryOpts{
+						Name: name,
+						Help: desc,
+						Objectives: map[float64]float64{
+							0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001, 0.999: 0.0001},
+					})
+					SummaryMetricsMap[name] = summaryMetric
+				case "Histogram":
+				default:
+					log.Errorf("Unsupport metric type %s", mtype)
+				}
+			}
+		}
+	}
+}
+
+// Parse metric data and update metrics.
+func processAllServerMetrics() {
+	var addrs []string
+	var err error
+	if DataSource == MetaServer {
+		addrs = viper.GetStringSlice("meta_servers")
+	} else {
+		addrs, err = getReplicaAddrs()
+		if err != nil {
+			log.Errorf("Get replica server address failed, err: %s", err)
+			return
+		}
+	}
+	metricsByTableID := make(map[string]Metrics, 128)
+	metricsByServerTableID := make(map[string]Metrics, 128)
+	var metricsOfCluster []Metric
+	metricsByAddr := make(map[string]Metrics, 128)
+	for _, addr := range addrs {
+		data, err := getOneServerMetrics(addr)
+		if err != nil {
+			log.Errorf("failed to get data from %s, err %s", addr, err)
+			return
+		}
+		jsonData := gjson.Parse(data)
+		for _, entity := range jsonData.Array() {
+			etype := entity.Get("type").String()
+			switch etype {
+			case "replica":
+			case "partition":
+				tableID := entity.Get("attributes").Get("table_id").String()
+				mergeIntoClusterLevelTableMetric(entity.Get("metrics").Array(),
+					tableID, &metricsByTableID)
+			case "table":
+				tableID := entity.Get("attributes").Get("table_id").String()
+				mergeIntoClusterLevelTableMetric(entity.Get("metrics").Array(),
+					tableID, &metricsByTableID)
+				collectServerLevelTableMetric(entity.Get("metrics").Array(), tableID,
+					&metricsByServerTableID)
+				updateServerLevelTableMetrics(addr, metricsByServerTableID)
+			case "server":
+				mergeIntoClusterLevelServerMetric(entity.Get("metrics").Array(),
+					metricsOfCluster)
+				collectServerLevelServerMetrics(entity.Get("metrics").Array(),
+					addr, &metricsByAddr)
+			default:
+				log.Errorf("Unsupport entity type %s", etype)
+			}
+		}
+	}
+
+	updateClusterLevelTableMetrics(metricsByTableID)
+	updateServerLevelServerMetrics(metricsByAddr)
+	updateClusterLevelMetrics(metricsOfCluster)
+}
+
+// Update table metrics. They belong to a specified server.
+func updateServerLevelTableMetrics(addr string, metricsByServerTableID map[string]Metrics) {
+	for tableID, metrics := range metricsByServerTableID {
+		var tableName string
+		if name, ok := TableNameByID[tableID]; !ok {
+			tableName = tableID
+		} else {
+			tableName = name
+		}
+		for _, metric := range metrics {
+			updateMetric(metric, addr, "server", tableName)
+		}
+	}
+}
+
+// Update server metrics. They belong to a specified server.
+func updateServerLevelServerMetrics(metricsByAddr map[string]Metrics) {
+	for addr, metrics := range metricsByAddr {
+		for _, metric := range metrics {
+			updateMetric(metric, addr, "server", "server")
+		}
+	}
+}
+
+// Update cluster level metrics. They belong to a cluster.
+func updateClusterLevelMetrics(metricsOfCluster []Metric) {
+	for _, metric := range metricsOfCluster {
+		updateMetric(metric, "cluster", "server", metric.name)
+	}
+}
+
+// Update table metrics. They belong to a cluster.
+func updateClusterLevelTableMetrics(metricsByTableID map[string]Metrics) {
+	for tableID, metrics := range metricsByTableID {
+		var tableName string
+		if name, ok := TableNameByID[tableID]; !ok {
+			tableName = tableID
+		} else {
+			tableName = name
+		}
+		for _, metric := range metrics {
+			updateMetric(metric, "cluster", "table", tableName)
+		}
+	}
+}
+
+func updateMetric(metric Metric, endpoint string, level string, title string) {
+	role := RoleByDataSource[DataSource]
+	switch metric.mtype {
+		case "Counter":
+			if counter, ok := CounterMetricsMap[metric.name]; ok {
+				counter.With(
+					prometheus.Labels{"endpoint": endpoint,
+						"role": role, "level": level,
+						"title": title}).Add(float64(metric.value))
+			} else {
+				log.Warnf("Unknown metric name %s", metric.name)
+			}
+		case "Gauge":
+			if gauge, ok := GaugeMetricsMap[metric.name]; ok {
+				gauge.With(
+					prometheus.Labels{"endpoint": endpoint,
+						"role": role, "level": level,
+						"title": title}).Set(float64(metric.value))
+			} else {
+				log.Warnf("Unknown metric name %s", metric.name)
+			}
+		case "Percentile":
+			log.Warnf("Todo metric type %s", metric.mtype)
+		case "Histogram":
+		default:
+			log.Warnf("Unsupport metric type %s", metric.mtype)
+	}
+}
+
+func collectServerLevelTableMetric(metrics []gjson.Result, tableID string,
+	metricsByServerTableID *map[string]Metrics) {
+	var mts Metrics
+	for _, metric := range metrics {
+		name := metric.Get("name").String()
+		mtype := metric.Get("type").String()
+		value := metric.Get("value").Float()
+		var values []float64
+		if mtype == "percentile" {
+			values = append(values, metric.Get("p50").Float())
+			values = append(values, metric.Get("p90").Float())
+			values = append(values, metric.Get("p95").Float())
+			values = append(values, metric.Get("p99").Float())
+			values = append(values, metric.Get("p999").Float())
+		}
+		m := Metric{name: name, mtype: mtype, value: value, values: values}
+		mts = append(mts, m)
+	}
+	(*metricsByServerTableID)[tableID] = mts
+}
+
+func collectServerLevelServerMetrics(metrics []gjson.Result, addr string,
+	metricsByAddr *map[string]Metrics) {
+	var mts Metrics
+	for _, metric := range metrics {
+		name := metric.Get("name").String()
+		mtype := metric.Get("type").String()
+		value := metric.Get("value").Float()
+		var values []float64
+		if mtype == "percentile" {
+			values = append(values, metric.Get("p50").Float())
+			values = append(values, metric.Get("p90").Float())
+			values = append(values, metric.Get("p95").Float())
+			values = append(values, metric.Get("p99").Float())
+			values = append(values, metric.Get("p999").Float())
+		}
+		m := Metric{name: name, mtype: mtype, value: value, values: values}
+		mts = append(mts, m)
+	}
+	(*metricsByAddr)[addr] = mts
+}
+
+func mergeIntoClusterLevelServerMetric(metrics []gjson.Result, metricsOfCluster []Metric) {
+	for _, metric := range metrics {
+		name := metric.Get("name").String()
+		mtype := metric.Get("type").String()
+		value := metric.Get("value").Float()
+		var isExisted bool = false
+		for _, m := range metricsOfCluster {
+			if m.name == name {
+				isExisted = true
+				switch mtype {
+				case "Counter":
+				case "Gauge":
+					m.value += value
+				case "Percentile":
+					p50 := metric.Get("p50").Float()
+					m.values[0] = math.Max(m.values[0], p50)
+					p90 := metric.Get("p90").Float()
+					m.values[1] = math.Max(m.values[0], p90)
+					p95 := metric.Get("p95").Float()
+					m.values[2] = math.Max(m.values[0], p95)
+					p99 := metric.Get("p99").Float()
+					m.values[3] = math.Max(m.values[0], p99)
+					p999 := metric.Get("p999").Float()
+					m.values[4] = math.Max(m.values[0], p999)
+				case "Histogram":
+				default:
+					log.Errorf("Unsupport metric type %s", mtype)
+				}
+			}
+		}
+		if !isExisted {
+			value := metric.Get("value").Float()
+			var values []float64
+			if mtype == "percentile" {
+				values = append(values, metric.Get("p50").Float())
+				values = append(values, metric.Get("p90").Float())
+				values = append(values, metric.Get("p95").Float())
+				values = append(values, metric.Get("p99").Float())
+				values = append(values, metric.Get("p999").Float())
+			}
+			m := Metric{name: name, mtype: mtype, value: value, values: values}
+			metricsOfCluster = append(metricsOfCluster, m)
+		}
+	}
+}
+
+func mergeIntoClusterLevelTableMetric(metrics []gjson.Result, tableID string,
+	metricsByTableID *map[string]Metrics) {
+	// Find a same table id, try to merge them.
+	if _, ok := (*metricsByTableID)[tableID]; ok {
+		mts := (*metricsByTableID)[tableID]
+		for _, metric := range metrics {
+			name := metric.Get("name").String()
+			mtype := metric.Get("type").String()
+			value := metric.Get("value").Float()
+			for _, m := range mts {
+				if name == m.name {
+					switch mtype {
+					case "Counter":
+					case "Gauge":
+						m.value += value
+					case "Percentile":
+						p50 := metric.Get("p50").Float()
+						m.values[0] = math.Max(m.values[0], p50)
+						p90 := metric.Get("p90").Float()
+						m.values[1] = math.Max(m.values[0], p90)
+						p95 := metric.Get("p95").Float()
+						m.values[2] = math.Max(m.values[0], p95)
+						p99 := metric.Get("p99").Float()
+						m.values[3] = math.Max(m.values[0], p99)
+						p999 := metric.Get("p999").Float()
+						m.values[4] = math.Max(m.values[0], p999)
+					case "Histogram":
+					default:
+						log.Errorf("Unsupport metric type %s", mtype)
+					}
+				}
+			}
+		}
+	} else {
+		var mts Metrics
+		for _, metric := range metrics {
+			name := metric.Get("name").String()
+			mtype := metric.Get("type").String()
+			value := metric.Get("value").Float()
+			var values []float64
+			if mtype == "percentile" {
+				values = append(values, metric.Get("p50").Float())
+				values = append(values, metric.Get("p90").Float())
+				values = append(values, metric.Get("p95").Float())
+				values = append(values, metric.Get("p99").Float())
+				values = append(values, metric.Get("p999").Float())
+			}
+			m := Metric{name: name, mtype: mtype, value: value, values: values}
+			mts = append(mts, m)
+		}
+		(*metricsByTableID)[tableID] = mts
+	}
+}
+
+func getOneServerMetrics(addr string) (string, error) {
+	url := fmt.Sprintf("http://%s/metrics?detail=true", addr)
+	return httpGet(url)
+}
+
+func httpGet(url string) (string, error) {
+	resp, err := http.Get(url)
+	if err == nil && resp.StatusCode != http.StatusOK {
+		err = errors.New(resp.Status)
+	}
+	if err != nil {
+		log.Errorf("Fail to get data from %s, err %s", url, err)
+		return "", err
+	}
+	body, _ := ioutil.ReadAll(resp.Body)
+	defer resp.Body.Close()
+	return string(body), nil
+}
+
+func getClusterInfo() (string, error) {
+	addrs := viper.GetStringSlice("meta_servers")
+	url := fmt.Sprintf("http://%s/meta/cluster", addrs[0])
+	return httpGet(url)
+}
+
+func getTableInfo(pMetaServer string) (string, error) {
+	url := fmt.Sprintf("http://%s/meta/apps", pMetaServer)
+	return httpGet(url)
+}
+
+func updateClusterTableInfo() {
+	// Get primary meta server address.
+	data, err := getClusterInfo()
+	if err != nil {
+		log.Error("Fail to get cluster info")
+		return
+	}
+	jsonData := gjson.Parse(data)
+	pMetaServer := jsonData.Get("primary_meta_server").String()
+	data, err = getTableInfo(pMetaServer)
+	if err != nil {
+		log.Error("Fail to get table info")
+		return
+	}
+	jsonData = gjson.Parse(data)
+	for _, value := range jsonData.Get("general_info").Map() {
+		tableID := value.Get("app_id").String()
+		tableName := value.Get("app_name").String()
+		if _, ok := TableNameByID[tableID]; !ok {
+			TableNameByID[tableID] = tableName
+		}
+	}
+}
diff --git a/collector/metrics/replica_server_metrics.go b/collector/metrics/replica_server_metrics.go
new file mode 100644
index 000000000..98189de26
--- /dev/null
+++ b/collector/metrics/replica_server_metrics.go
@@ -0,0 +1,23 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package metrics
+
+func NewReplicaServerMetricCollector() MetricCollector {
+	// Set detect interval and timeout 10s.
+	return NewMetricCollector(ReplicaServer, 10000000000, 10000000000)
+}
diff --git a/collector/metrics/falcon_sink.go b/collector/sink/falcon_sink.go
similarity index 99%
rename from collector/metrics/falcon_sink.go
rename to collector/sink/falcon_sink.go
index 4ccaedfcb..906374b92 100644
--- a/collector/metrics/falcon_sink.go
+++ b/collector/sink/falcon_sink.go
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package metrics
+package sink
 
 import (
 	"bytes"
diff --git a/collector/metrics/prometheus_sink.go b/collector/sink/prometheus_sink.go
similarity index 99%
rename from collector/metrics/prometheus_sink.go
rename to collector/sink/prometheus_sink.go
index e37fbc33d..34d5e1884 100644
--- a/collector/metrics/prometheus_sink.go
+++ b/collector/sink/prometheus_sink.go
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package metrics
+package sink
 
 import (
 	"sync"
diff --git a/collector/metrics/sink.go b/collector/sink/sink.go
similarity index 99%
rename from collector/metrics/sink.go
rename to collector/sink/sink.go
index fbe590e86..361c73303 100644
--- a/collector/metrics/sink.go
+++ b/collector/sink/sink.go
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package metrics
+package sink
 
 import (
 	"github.com/pegasus-kv/collector/aggregate"


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org