You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by yu...@apache.org on 2022/06/01 02:26:01 UTC

[incubator-pegasus] branch master updated: feat(admin-cli): support nodes capacity balance using admin-cli (#969)

This is an automated email from the ASF dual-hosted git repository.

yuchenhe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new dbf096f7 feat(admin-cli): support nodes capacity balance using admin-cli (#969)
dbf096f7 is described below

commit dbf096f755d693ae97344c87c5a9424fca152bb6
Author: Jiashuo <js...@live.com>
AuthorDate: Wed Jun 1 10:25:58 2022 +0800

    feat(admin-cli): support nodes capacity balance using admin-cli (#969)
---
 admin-cli/cmd/nodes_balancer.go                    |  42 ++++
 admin-cli/executor/disk_info.go                    |   2 +-
 .../diskbalancer/{helper.go => migrator.go}        |   4 +-
 .../executor/toolkits/nodesbalancer/balancer.go    | 132 +++++++++++
 .../executor/toolkits/nodesbalancer/migrator.go    | 253 +++++++++++++++++++++
 5 files changed, 430 insertions(+), 3 deletions(-)

diff --git a/admin-cli/cmd/nodes_balancer.go b/admin-cli/cmd/nodes_balancer.go
new file mode 100644
index 00000000..7188a66f
--- /dev/null
+++ b/admin-cli/cmd/nodes_balancer.go
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package cmd
+
+import (
+	"github.com/apache/incubator-pegasus/admin-cli/executor/toolkits/nodesbalancer"
+	"github.com/apache/incubator-pegasus/admin-cli/shell"
+	"github.com/desertbit/grumble"
+)
+
+func init() {
+	shell.AddCommand(&grumble.Command{
+		Name: "nodes-balancer",
+		Help: "migrate replica among the replica server to balance the capacity of cluster, please " +
+			"make sure the server config is right, detail see https://github.com/apache/incubator-pegasus/pull/969",
+		Flags: func(a *grumble.Flags) {
+			a.BoolL("auto", false, "whether to migrate replica until all nodes is balanced, false "+
+				"by default, which means it just migrate one replica")
+		},
+		Run: func(c *grumble.Context) error {
+			auto := c.Flags.Bool("auto")
+			return nodesbalancer.BalanceNodeCapacity(pegasusClient, auto)
+		},
+	})
+}
diff --git a/admin-cli/executor/disk_info.go b/admin-cli/executor/disk_info.go
index 45fe3876..a7793bb4 100644
--- a/admin-cli/executor/disk_info.go
+++ b/admin-cli/executor/disk_info.go
@@ -225,7 +225,7 @@ func ConvertReplicaCapacityStruct(replicaCapacityInfos []interface{}) ([]Replica
 		}
 	}
 	if replicas == nil {
-		return nil, fmt.Errorf("the disk has no replica")
+		return []ReplicaCapacityStruct{}, nil
 	}
 	return replicas, nil
 }
diff --git a/admin-cli/executor/toolkits/diskbalancer/helper.go b/admin-cli/executor/toolkits/diskbalancer/migrator.go
similarity index 98%
rename from admin-cli/executor/toolkits/diskbalancer/helper.go
rename to admin-cli/executor/toolkits/diskbalancer/migrator.go
index 55ba7494..ead7f4fa 100644
--- a/admin-cli/executor/toolkits/diskbalancer/helper.go
+++ b/admin-cli/executor/toolkits/diskbalancer/migrator.go
@@ -68,7 +68,7 @@ func changeDiskCleanerInterval(client *executor.Client, replicaServer string, cl
 }
 
 func getNextMigrateAction(client *executor.Client, replicaServer string, minSize int64) (*MigrateAction, error) {
-	disks, totalUsage, totalCapacity, err := queryDiskCapacityInfo(client, replicaServer)
+	disks, totalUsage, totalCapacity, err := QueryDiskCapacityInfo(client, replicaServer)
 	if err != nil {
 		return nil, err
 	}
@@ -84,7 +84,7 @@ func getNextMigrateAction(client *executor.Client, replicaServer string, minSize
 	return migrateAction, nil
 }
 
-func queryDiskCapacityInfo(client *executor.Client, replicaServer string) ([]executor.DiskCapacityStruct, int64, int64, error) {
+func QueryDiskCapacityInfo(client *executor.Client, replicaServer string) ([]executor.DiskCapacityStruct, int64, int64, error) {
 	diskCapacityOnNode, err := executor.GetDiskInfo(client, executor.CapacitySize, replicaServer, "", "", false)
 	if err != nil {
 		return nil, 0, 0, err
diff --git a/admin-cli/executor/toolkits/nodesbalancer/balancer.go b/admin-cli/executor/toolkits/nodesbalancer/balancer.go
new file mode 100644
index 00000000..207874c4
--- /dev/null
+++ b/admin-cli/executor/toolkits/nodesbalancer/balancer.go
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package nodesbalancer
+
+import (
+	"fmt"
+	"time"
+
+	"github.com/apache/incubator-pegasus/admin-cli/executor"
+	"github.com/apache/incubator-pegasus/admin-cli/executor/toolkits"
+	"github.com/apache/incubator-pegasus/go-client/session"
+)
+
+// By default, the node capacity of the server needs to be updated every 10 minutes.
+// Therefore, after a partition is migrated completed, the tool cannot immediately
+// obtain the latest capacity distribution. Please adjust the node capacity update
+// interval of the server to speed up the equalization speed. Relevant configurations
+// are as follows:
+//
+//- disk_stat_interval_seconds = 600
+//+ disk_stat_interval_seconds = 60 # or less
+//
+//- gc_memory_replica_interval_ms = 600000
+//+ gc_memory_replica_interval_ms = 60000 # or less
+
+func BalanceNodeCapacity(client *executor.Client, auto bool) error {
+	err := initClusterEnv(client)
+	if err != nil {
+		return err
+	}
+
+	balancer := &Migrator{}
+	for {
+		err := balancer.updateNodesLoad(client)
+		if err != nil {
+			toolkits.LogInfo(fmt.Sprintf("retry update load, err = %s", err.Error()))
+			time.Sleep(time.Second * 10)
+			continue
+		}
+
+		action, err := balancer.selectNextAction(client)
+		if err != nil {
+			return err
+		}
+
+		err = client.Meta.Balance(action.replica.Gpid, action.replica.Status, action.from.Node, action.to.Node)
+		if err != nil {
+			return fmt.Errorf("migrate action[%s] now is invalid: %s", action.toString(), err.Error())
+		}
+		err = waitCompleted(client, action)
+		if err != nil {
+			return fmt.Errorf("wait replica migrate err: %s", err.Error())
+		}
+		if !auto {
+			break
+		}
+		time.Sleep(time.Second * 10)
+	}
+	err = resetClusterEnv(client)
+	if err != nil {
+		return err
+	}
+	return nil
+}
+
+func initClusterEnv(client *executor.Client) error {
+	toolkits.LogWarn("This cluster will be balanced based capacity, please don't open count-balance in later")
+	time.Sleep(time.Second * 3)
+
+	// set meta level as steady
+	err := executor.SetMetaLevel(client, "steady")
+	if err != nil {
+		return err
+	}
+	// disable migrate replica base `lively`
+	toolkits.LogInfo("set meta.lb.only_move_primary true")
+	err = executor.RemoteCommand(client, session.NodeTypeMeta, "", "meta.lb.only_move_primary", []string{"true"})
+	if err != nil {
+		return err
+	}
+	toolkits.LogInfo("set meta.lb.only_primary_balancer true")
+	err = executor.RemoteCommand(client, session.NodeTypeMeta, "", "meta.lb.only_primary_balancer", []string{"true"})
+	if err != nil {
+		return err
+	}
+	// reset garbage replica clear interval
+	toolkits.LogInfo("set gc_disk_error_replica_interval_seconds 10")
+	err = executor.ConfigCommand(client, session.NodeTypeReplica, "", "gc_disk_error_replica_interval_seconds", "set", "10")
+	if err != nil {
+		return err
+	}
+	toolkits.LogInfo("set gc_disk_garbage_replica_interval_seconds 10")
+	err = executor.ConfigCommand(client, session.NodeTypeReplica, "", "gc_disk_garbage_replica_interval_seconds", "set", "10")
+	if err != nil {
+		return err
+	}
+	return nil
+}
+
+func resetClusterEnv(client *executor.Client) error {
+	toolkits.LogWarn("This cluster garbage interval will be reset default")
+
+	// reset garbage replica clear interval
+	toolkits.LogInfo("set gc_disk_error_replica_interval_seconds 3600")
+	err := executor.ConfigCommand(client, session.NodeTypeReplica, "", "gc_disk_error_replica_interval_seconds", "set", "3600")
+	if err != nil {
+		return err
+	}
+	toolkits.LogInfo("set gc_disk_garbage_replica_interval_seconds 3600")
+	err = executor.ConfigCommand(client, session.NodeTypeReplica, "", "gc_disk_garbage_replica_interval_seconds", "set", "3600")
+	if err != nil {
+		return err
+	}
+	return nil
+}
diff --git a/admin-cli/executor/toolkits/nodesbalancer/migrator.go b/admin-cli/executor/toolkits/nodesbalancer/migrator.go
new file mode 100644
index 00000000..1024de69
--- /dev/null
+++ b/admin-cli/executor/toolkits/nodesbalancer/migrator.go
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package nodesbalancer
+
+import (
+	"fmt"
+	"math"
+	"time"
+
+	migrator "github.com/apache/incubator-pegasus/admin-cli/client"
+	"github.com/apache/incubator-pegasus/admin-cli/executor"
+	"github.com/apache/incubator-pegasus/admin-cli/executor/toolkits"
+	"github.com/apache/incubator-pegasus/admin-cli/executor/toolkits/diskbalancer"
+	"github.com/apache/incubator-pegasus/admin-cli/util"
+	"github.com/apache/incubator-pegasus/go-client/idl/base"
+)
+
+type NodesCapacity struct {
+	Node      *util.PegasusNode `json:"node"`
+	Disks     []executor.DiskCapacityStruct
+	Total     int64 `json:"total"`
+	Usage     int64 `json:"usage"`
+	Available int64 `json:"available"`
+}
+
+type NodesReplica struct {
+	Node     *util.PegasusNode
+	Replicas []*executor.ReplicaCapacityStruct
+}
+
+type Migrator struct {
+	CapacityLoad []NodesCapacity
+	Total        int64
+	Usage        int64
+	Average      int64
+}
+
+func (m *Migrator) reset() {
+	m.Total = 0
+	m.Average = 0
+	m.Usage = 0
+}
+
+func (m *Migrator) updateNodesLoad(client *executor.Client) error {
+	nodes, err := client.Meta.ListNodes()
+	if err != nil {
+		return err
+	}
+
+	var nodesLoad []interface{}
+	for _, node := range nodes {
+		pegasusNode := client.Nodes.MustGetReplica(node.Address.GetAddress())
+		disksLoad, totalUsage, totalCapacity, err := diskbalancer.QueryDiskCapacityInfo(client, pegasusNode.TCPAddr())
+		if err != nil {
+			return err
+		}
+		diskCapacity := NodesCapacity{
+			Node:      pegasusNode,
+			Disks:     disksLoad,
+			Total:     totalCapacity,
+			Usage:     totalUsage,
+			Available: totalCapacity - totalUsage,
+		}
+		nodesLoad = append(nodesLoad, diskCapacity)
+	}
+	if nodesLoad == nil {
+		return err
+	}
+
+	m.reset()
+	util.SortStructsByField(nodesLoad, "Usage")
+	for _, node := range nodesLoad {
+		m.CapacityLoad = append(m.CapacityLoad, node.(NodesCapacity))
+		m.Total += node.(NodesCapacity).Total
+		m.Usage += node.(NodesCapacity).Usage
+	}
+	m.Average = m.Usage / int64(len(nodesLoad))
+	return nil
+}
+
+type partition struct {
+	Gpid   *base.Gpid
+	Status migrator.BalanceType
+	Size   int64
+}
+
+type ActionProposal struct {
+	replica *partition
+	from    *NodesCapacity
+	to      *NodesCapacity
+}
+
+func (act *ActionProposal) toString() string {
+	return fmt.Sprintf("[%s]%s:%s=>%s", act.replica.Status.String(), act.replica.Gpid.String(),
+		act.from.Node.String(), act.to.Node.String())
+}
+
+func (m *Migrator) selectNextAction(client *executor.Client) (*ActionProposal, error) {
+	highNode := m.CapacityLoad[len(m.CapacityLoad)-1]
+	lowNode := m.CapacityLoad[0]
+
+	highDiskOfHighNode := highNode.Disks[len(highNode.Disks)-1]
+	toolkits.LogInfo(fmt.Sprintf("expect_average = %dGB, high node = %s[%s][usage=%dGB], low node = %s[usage=%dGB]\n",
+		m.Average/1024, highNode.Node.String(), highDiskOfHighNode.Disk, highNode.Usage/1024, lowNode.Node.String(), lowNode.Usage/1024))
+
+	lowUsageRatio := lowNode.Usage * 100 / lowNode.Total
+	highUsageRatio := highNode.Usage * 100 / highNode.Total
+
+	if highUsageRatio-lowUsageRatio <= 5 {
+		return nil, fmt.Errorf("high node and low node has little diff: %d vs %d", highUsageRatio, lowUsageRatio)
+	}
+
+	sizeAllowMoved := math.Min(float64(highNode.Usage-m.Average), float64(m.Average-lowNode.Usage))
+	highDiskReplicasOfHighNode, err := getDiskReplicas(client, &highNode, highDiskOfHighNode.Disk)
+	if err != nil {
+		return nil, fmt.Errorf("get high node[%s] high disk[%s] replicas err: %s", highNode.Node.String(), highDiskOfHighNode.Disk, err.Error())
+	}
+
+	totalReplicasOfLowNode, err := getNodeReplicas(client, &lowNode)
+	if err != nil {
+		return nil, fmt.Errorf("get low node[%s] replicas err: %s", lowNode.Node.String(), err.Error())
+	}
+
+	var selectReplica executor.ReplicaCapacityStruct
+	for _, replica := range highDiskReplicasOfHighNode {
+		if replica.Size > int64(sizeAllowMoved) {
+			toolkits.LogDebug(fmt.Sprintf("select next replica for the replica is too large(replica_size > allow_size): %d > %f", replica.Size, sizeAllowMoved))
+			continue
+		}
+
+		if totalReplicasOfLowNode.contain(replica.Gpid) {
+			toolkits.LogDebug(fmt.Sprintf("select next replica for the replica(%s) is has existed target node(%s)", replica.Gpid, lowNode.Node.String()))
+			continue
+		}
+
+		selectReplica = replica
+	}
+
+	if selectReplica.Gpid == "" {
+		return nil, fmt.Errorf("can't find valid replica to balance")
+	}
+
+	gpid, err := util.Str2Gpid(selectReplica.Gpid)
+	if err != nil {
+		return nil, err
+	}
+
+	status := migrator.BalanceCopySec
+	if selectReplica.Status == "primary" {
+		status = migrator.BalanceCopyPri
+	}
+	return &ActionProposal{
+		replica: &partition{
+			Gpid:   gpid,
+			Status: status,
+		},
+		from: &highNode,
+		to:   &lowNode,
+	}, err
+}
+
+type replicas []executor.ReplicaCapacityStruct
+
+func (r replicas) contain(selectReplica string) bool {
+	for _, replica := range r {
+		if replica.Gpid == selectReplica {
+			return true
+		}
+	}
+	return false
+}
+
+func getDiskReplicas(client *executor.Client, replicaServer *NodesCapacity, diskTag string) (replicas, error) {
+	node := replicaServer.Node.TCPAddr()
+	diskInfo, err := executor.GetDiskInfo(client, executor.CapacitySize, node, "", diskTag, false)
+	if err != nil {
+		return nil, err
+	}
+	replicas, err := executor.ConvertReplicaCapacityStruct(diskInfo)
+	if err != nil {
+		return nil, err
+	}
+	return replicas, nil
+}
+
+func getNodeReplicas(client *executor.Client, replicaServer *NodesCapacity) (replicas, error) {
+	node := replicaServer.Node.TCPAddr()
+
+	var totalDiskInfo []interface{}
+	for _, disk := range replicaServer.Disks {
+		tag := disk.Disk
+		diskInfo, err := executor.GetDiskInfo(client, executor.CapacitySize, node, "", tag, false)
+		if err != nil {
+			return nil, err
+		}
+		totalDiskInfo = append(totalDiskInfo, diskInfo...)
+	}
+	replicas, err := executor.ConvertReplicaCapacityStruct(totalDiskInfo)
+	if err != nil {
+		return nil, err
+	}
+	return replicas, nil
+}
+
+func waitCompleted(client *executor.Client, action *ActionProposal) error {
+	for {
+		replicas, err := getNodeReplicas(client, action.to)
+		if err != nil {
+			toolkits.LogInfo(err.Error())
+			time.Sleep(time.Second * 10)
+			continue
+		}
+
+		if !replicas.contain(fmt.Sprintf("%d.%d", action.replica.Gpid.Appid,
+			action.replica.Gpid.PartitionIndex)) {
+			toolkits.LogInfo(fmt.Sprintf("%s is running", action.toString()))
+			time.Sleep(time.Second * 10)
+			continue
+		}
+		break
+	}
+	toolkits.LogInfo(fmt.Sprintf("%s is completed and wait 100s to wait gc garbage", action.toString()))
+	// set meta level as lively to clean garbage
+	err := executor.SetMetaLevel(client, "lively")
+	if err != nil {
+		return err
+	}
+	// recover  meta level as steady to next action
+	time.Sleep(time.Second * 100)
+	err = executor.SetMetaLevel(client, "steady")
+	if err != nil {
+		return err
+	}
+	fmt.Println()
+	return nil
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org