You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by yu...@apache.org on 2022/06/01 02:26:01 UTC
[incubator-pegasus] branch master updated: feat(admin-cli): support nodes capacity balance using admin-cli (#969)
This is an automated email from the ASF dual-hosted git repository.
yuchenhe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new dbf096f7 feat(admin-cli): support nodes capacity balance using admin-cli (#969)
dbf096f7 is described below
commit dbf096f755d693ae97344c87c5a9424fca152bb6
Author: Jiashuo <js...@live.com>
AuthorDate: Wed Jun 1 10:25:58 2022 +0800
feat(admin-cli): support nodes capacity balance using admin-cli (#969)
---
admin-cli/cmd/nodes_balancer.go | 42 ++++
admin-cli/executor/disk_info.go | 2 +-
.../diskbalancer/{helper.go => migrator.go} | 4 +-
.../executor/toolkits/nodesbalancer/balancer.go | 132 +++++++++++
.../executor/toolkits/nodesbalancer/migrator.go | 253 +++++++++++++++++++++
5 files changed, 430 insertions(+), 3 deletions(-)
diff --git a/admin-cli/cmd/nodes_balancer.go b/admin-cli/cmd/nodes_balancer.go
new file mode 100644
index 00000000..7188a66f
--- /dev/null
+++ b/admin-cli/cmd/nodes_balancer.go
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package cmd
+
+import (
+ "github.com/apache/incubator-pegasus/admin-cli/executor/toolkits/nodesbalancer"
+ "github.com/apache/incubator-pegasus/admin-cli/shell"
+ "github.com/desertbit/grumble"
+)
+
+func init() {
+ shell.AddCommand(&grumble.Command{
+ Name: "nodes-balancer",
+ Help: "migrate replica among the replica server to balance the capacity of cluster, please " +
+ "make sure the server config is right, detail see https://github.com/apache/incubator-pegasus/pull/969",
+ Flags: func(a *grumble.Flags) {
+ a.BoolL("auto", false, "whether to migrate replica until all nodes is balanced, false "+
+ "by default, which means it just migrate one replica")
+ },
+ Run: func(c *grumble.Context) error {
+ auto := c.Flags.Bool("auto")
+ return nodesbalancer.BalanceNodeCapacity(pegasusClient, auto)
+ },
+ })
+}
diff --git a/admin-cli/executor/disk_info.go b/admin-cli/executor/disk_info.go
index 45fe3876..a7793bb4 100644
--- a/admin-cli/executor/disk_info.go
+++ b/admin-cli/executor/disk_info.go
@@ -225,7 +225,7 @@ func ConvertReplicaCapacityStruct(replicaCapacityInfos []interface{}) ([]Replica
}
}
if replicas == nil {
- return nil, fmt.Errorf("the disk has no replica")
+ return []ReplicaCapacityStruct{}, nil
}
return replicas, nil
}
diff --git a/admin-cli/executor/toolkits/diskbalancer/helper.go b/admin-cli/executor/toolkits/diskbalancer/migrator.go
similarity index 98%
rename from admin-cli/executor/toolkits/diskbalancer/helper.go
rename to admin-cli/executor/toolkits/diskbalancer/migrator.go
index 55ba7494..ead7f4fa 100644
--- a/admin-cli/executor/toolkits/diskbalancer/helper.go
+++ b/admin-cli/executor/toolkits/diskbalancer/migrator.go
@@ -68,7 +68,7 @@ func changeDiskCleanerInterval(client *executor.Client, replicaServer string, cl
}
func getNextMigrateAction(client *executor.Client, replicaServer string, minSize int64) (*MigrateAction, error) {
- disks, totalUsage, totalCapacity, err := queryDiskCapacityInfo(client, replicaServer)
+ disks, totalUsage, totalCapacity, err := QueryDiskCapacityInfo(client, replicaServer)
if err != nil {
return nil, err
}
@@ -84,7 +84,7 @@ func getNextMigrateAction(client *executor.Client, replicaServer string, minSize
return migrateAction, nil
}
-func queryDiskCapacityInfo(client *executor.Client, replicaServer string) ([]executor.DiskCapacityStruct, int64, int64, error) {
+func QueryDiskCapacityInfo(client *executor.Client, replicaServer string) ([]executor.DiskCapacityStruct, int64, int64, error) {
diskCapacityOnNode, err := executor.GetDiskInfo(client, executor.CapacitySize, replicaServer, "", "", false)
if err != nil {
return nil, 0, 0, err
diff --git a/admin-cli/executor/toolkits/nodesbalancer/balancer.go b/admin-cli/executor/toolkits/nodesbalancer/balancer.go
new file mode 100644
index 00000000..207874c4
--- /dev/null
+++ b/admin-cli/executor/toolkits/nodesbalancer/balancer.go
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package nodesbalancer
+
+import (
+ "fmt"
+ "time"
+
+ "github.com/apache/incubator-pegasus/admin-cli/executor"
+ "github.com/apache/incubator-pegasus/admin-cli/executor/toolkits"
+ "github.com/apache/incubator-pegasus/go-client/session"
+)
+
+// By default, the node capacity of the server needs to be updated every 10 minutes.
+// Therefore, after a partition is migrated completed, the tool cannot immediately
+// obtain the latest capacity distribution. Please adjust the node capacity update
+// interval of the server to speed up the equalization speed. Relevant configurations
+// are as follows:
+//
+//- disk_stat_interval_seconds = 600
+//+ disk_stat_interval_seconds = 60 # or less
+//
+//- gc_memory_replica_interval_ms = 600000
+//+ gc_memory_replica_interval_ms = 60000 # or less
+
+func BalanceNodeCapacity(client *executor.Client, auto bool) error {
+ err := initClusterEnv(client)
+ if err != nil {
+ return err
+ }
+
+ balancer := &Migrator{}
+ for {
+ err := balancer.updateNodesLoad(client)
+ if err != nil {
+ toolkits.LogInfo(fmt.Sprintf("retry update load, err = %s", err.Error()))
+ time.Sleep(time.Second * 10)
+ continue
+ }
+
+ action, err := balancer.selectNextAction(client)
+ if err != nil {
+ return err
+ }
+
+ err = client.Meta.Balance(action.replica.Gpid, action.replica.Status, action.from.Node, action.to.Node)
+ if err != nil {
+ return fmt.Errorf("migrate action[%s] now is invalid: %s", action.toString(), err.Error())
+ }
+ err = waitCompleted(client, action)
+ if err != nil {
+ return fmt.Errorf("wait replica migrate err: %s", err.Error())
+ }
+ if !auto {
+ break
+ }
+ time.Sleep(time.Second * 10)
+ }
+ err = resetClusterEnv(client)
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
+func initClusterEnv(client *executor.Client) error {
+ toolkits.LogWarn("This cluster will be balanced based capacity, please don't open count-balance in later")
+ time.Sleep(time.Second * 3)
+
+ // set meta level as steady
+ err := executor.SetMetaLevel(client, "steady")
+ if err != nil {
+ return err
+ }
+ // disable migrate replica base `lively`
+ toolkits.LogInfo("set meta.lb.only_move_primary true")
+ err = executor.RemoteCommand(client, session.NodeTypeMeta, "", "meta.lb.only_move_primary", []string{"true"})
+ if err != nil {
+ return err
+ }
+ toolkits.LogInfo("set meta.lb.only_primary_balancer true")
+ err = executor.RemoteCommand(client, session.NodeTypeMeta, "", "meta.lb.only_primary_balancer", []string{"true"})
+ if err != nil {
+ return err
+ }
+ // reset garbage replica clear interval
+ toolkits.LogInfo("set gc_disk_error_replica_interval_seconds 10")
+ err = executor.ConfigCommand(client, session.NodeTypeReplica, "", "gc_disk_error_replica_interval_seconds", "set", "10")
+ if err != nil {
+ return err
+ }
+ toolkits.LogInfo("set gc_disk_garbage_replica_interval_seconds 10")
+ err = executor.ConfigCommand(client, session.NodeTypeReplica, "", "gc_disk_garbage_replica_interval_seconds", "set", "10")
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
+func resetClusterEnv(client *executor.Client) error {
+ toolkits.LogWarn("This cluster garbage interval will be reset default")
+
+ // reset garbage replica clear interval
+ toolkits.LogInfo("set gc_disk_error_replica_interval_seconds 3600")
+ err := executor.ConfigCommand(client, session.NodeTypeReplica, "", "gc_disk_error_replica_interval_seconds", "set", "3600")
+ if err != nil {
+ return err
+ }
+ toolkits.LogInfo("set gc_disk_garbage_replica_interval_seconds 3600")
+ err = executor.ConfigCommand(client, session.NodeTypeReplica, "", "gc_disk_garbage_replica_interval_seconds", "set", "3600")
+ if err != nil {
+ return err
+ }
+ return nil
+}
diff --git a/admin-cli/executor/toolkits/nodesbalancer/migrator.go b/admin-cli/executor/toolkits/nodesbalancer/migrator.go
new file mode 100644
index 00000000..1024de69
--- /dev/null
+++ b/admin-cli/executor/toolkits/nodesbalancer/migrator.go
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package nodesbalancer
+
+import (
+ "fmt"
+ "math"
+ "time"
+
+ migrator "github.com/apache/incubator-pegasus/admin-cli/client"
+ "github.com/apache/incubator-pegasus/admin-cli/executor"
+ "github.com/apache/incubator-pegasus/admin-cli/executor/toolkits"
+ "github.com/apache/incubator-pegasus/admin-cli/executor/toolkits/diskbalancer"
+ "github.com/apache/incubator-pegasus/admin-cli/util"
+ "github.com/apache/incubator-pegasus/go-client/idl/base"
+)
+
+type NodesCapacity struct {
+ Node *util.PegasusNode `json:"node"`
+ Disks []executor.DiskCapacityStruct
+ Total int64 `json:"total"`
+ Usage int64 `json:"usage"`
+ Available int64 `json:"available"`
+}
+
+type NodesReplica struct {
+ Node *util.PegasusNode
+ Replicas []*executor.ReplicaCapacityStruct
+}
+
+type Migrator struct {
+ CapacityLoad []NodesCapacity
+ Total int64
+ Usage int64
+ Average int64
+}
+
+func (m *Migrator) reset() {
+ m.Total = 0
+ m.Average = 0
+ m.Usage = 0
+}
+
+func (m *Migrator) updateNodesLoad(client *executor.Client) error {
+ nodes, err := client.Meta.ListNodes()
+ if err != nil {
+ return err
+ }
+
+ var nodesLoad []interface{}
+ for _, node := range nodes {
+ pegasusNode := client.Nodes.MustGetReplica(node.Address.GetAddress())
+ disksLoad, totalUsage, totalCapacity, err := diskbalancer.QueryDiskCapacityInfo(client, pegasusNode.TCPAddr())
+ if err != nil {
+ return err
+ }
+ diskCapacity := NodesCapacity{
+ Node: pegasusNode,
+ Disks: disksLoad,
+ Total: totalCapacity,
+ Usage: totalUsage,
+ Available: totalCapacity - totalUsage,
+ }
+ nodesLoad = append(nodesLoad, diskCapacity)
+ }
+ if nodesLoad == nil {
+ return err
+ }
+
+ m.reset()
+ util.SortStructsByField(nodesLoad, "Usage")
+ for _, node := range nodesLoad {
+ m.CapacityLoad = append(m.CapacityLoad, node.(NodesCapacity))
+ m.Total += node.(NodesCapacity).Total
+ m.Usage += node.(NodesCapacity).Usage
+ }
+ m.Average = m.Usage / int64(len(nodesLoad))
+ return nil
+}
+
+type partition struct {
+ Gpid *base.Gpid
+ Status migrator.BalanceType
+ Size int64
+}
+
+type ActionProposal struct {
+ replica *partition
+ from *NodesCapacity
+ to *NodesCapacity
+}
+
+func (act *ActionProposal) toString() string {
+ return fmt.Sprintf("[%s]%s:%s=>%s", act.replica.Status.String(), act.replica.Gpid.String(),
+ act.from.Node.String(), act.to.Node.String())
+}
+
+func (m *Migrator) selectNextAction(client *executor.Client) (*ActionProposal, error) {
+ highNode := m.CapacityLoad[len(m.CapacityLoad)-1]
+ lowNode := m.CapacityLoad[0]
+
+ highDiskOfHighNode := highNode.Disks[len(highNode.Disks)-1]
+ toolkits.LogInfo(fmt.Sprintf("expect_average = %dGB, high node = %s[%s][usage=%dGB], low node = %s[usage=%dGB]\n",
+ m.Average/1024, highNode.Node.String(), highDiskOfHighNode.Disk, highNode.Usage/1024, lowNode.Node.String(), lowNode.Usage/1024))
+
+ lowUsageRatio := lowNode.Usage * 100 / lowNode.Total
+ highUsageRatio := highNode.Usage * 100 / highNode.Total
+
+ if highUsageRatio-lowUsageRatio <= 5 {
+ return nil, fmt.Errorf("high node and low node has little diff: %d vs %d", highUsageRatio, lowUsageRatio)
+ }
+
+ sizeAllowMoved := math.Min(float64(highNode.Usage-m.Average), float64(m.Average-lowNode.Usage))
+ highDiskReplicasOfHighNode, err := getDiskReplicas(client, &highNode, highDiskOfHighNode.Disk)
+ if err != nil {
+ return nil, fmt.Errorf("get high node[%s] high disk[%s] replicas err: %s", highNode.Node.String(), highDiskOfHighNode.Disk, err.Error())
+ }
+
+ totalReplicasOfLowNode, err := getNodeReplicas(client, &lowNode)
+ if err != nil {
+ return nil, fmt.Errorf("get low node[%s] replicas err: %s", lowNode.Node.String(), err.Error())
+ }
+
+ var selectReplica executor.ReplicaCapacityStruct
+ for _, replica := range highDiskReplicasOfHighNode {
+ if replica.Size > int64(sizeAllowMoved) {
+ toolkits.LogDebug(fmt.Sprintf("select next replica for the replica is too large(replica_size > allow_size): %d > %f", replica.Size, sizeAllowMoved))
+ continue
+ }
+
+ if totalReplicasOfLowNode.contain(replica.Gpid) {
+ toolkits.LogDebug(fmt.Sprintf("select next replica for the replica(%s) is has existed target node(%s)", replica.Gpid, lowNode.Node.String()))
+ continue
+ }
+
+ selectReplica = replica
+ }
+
+ if selectReplica.Gpid == "" {
+ return nil, fmt.Errorf("can't find valid replica to balance")
+ }
+
+ gpid, err := util.Str2Gpid(selectReplica.Gpid)
+ if err != nil {
+ return nil, err
+ }
+
+ status := migrator.BalanceCopySec
+ if selectReplica.Status == "primary" {
+ status = migrator.BalanceCopyPri
+ }
+ return &ActionProposal{
+ replica: &partition{
+ Gpid: gpid,
+ Status: status,
+ },
+ from: &highNode,
+ to: &lowNode,
+ }, err
+}
+
+type replicas []executor.ReplicaCapacityStruct
+
+func (r replicas) contain(selectReplica string) bool {
+ for _, replica := range r {
+ if replica.Gpid == selectReplica {
+ return true
+ }
+ }
+ return false
+}
+
+func getDiskReplicas(client *executor.Client, replicaServer *NodesCapacity, diskTag string) (replicas, error) {
+ node := replicaServer.Node.TCPAddr()
+ diskInfo, err := executor.GetDiskInfo(client, executor.CapacitySize, node, "", diskTag, false)
+ if err != nil {
+ return nil, err
+ }
+ replicas, err := executor.ConvertReplicaCapacityStruct(diskInfo)
+ if err != nil {
+ return nil, err
+ }
+ return replicas, nil
+}
+
+func getNodeReplicas(client *executor.Client, replicaServer *NodesCapacity) (replicas, error) {
+ node := replicaServer.Node.TCPAddr()
+
+ var totalDiskInfo []interface{}
+ for _, disk := range replicaServer.Disks {
+ tag := disk.Disk
+ diskInfo, err := executor.GetDiskInfo(client, executor.CapacitySize, node, "", tag, false)
+ if err != nil {
+ return nil, err
+ }
+ totalDiskInfo = append(totalDiskInfo, diskInfo...)
+ }
+ replicas, err := executor.ConvertReplicaCapacityStruct(totalDiskInfo)
+ if err != nil {
+ return nil, err
+ }
+ return replicas, nil
+}
+
+func waitCompleted(client *executor.Client, action *ActionProposal) error {
+ for {
+ replicas, err := getNodeReplicas(client, action.to)
+ if err != nil {
+ toolkits.LogInfo(err.Error())
+ time.Sleep(time.Second * 10)
+ continue
+ }
+
+ if !replicas.contain(fmt.Sprintf("%d.%d", action.replica.Gpid.Appid,
+ action.replica.Gpid.PartitionIndex)) {
+ toolkits.LogInfo(fmt.Sprintf("%s is running", action.toString()))
+ time.Sleep(time.Second * 10)
+ continue
+ }
+ break
+ }
+ toolkits.LogInfo(fmt.Sprintf("%s is completed and wait 100s to wait gc garbage", action.toString()))
+ // set meta level as lively to clean garbage
+ err := executor.SetMetaLevel(client, "lively")
+ if err != nil {
+ return err
+ }
+ // recover meta level as steady to next action
+ time.Sleep(time.Second * 100)
+ err = executor.SetMetaLevel(client, "steady")
+ if err != nil {
+ return err
+ }
+ fmt.Println()
+ return nil
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org