You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kvrocks.apache.org by ti...@apache.org on 2022/10/18 13:36:11 UTC
[incubator-kvrocks] branch unstable updated: Move TCL test integration/slotmigrate to Go case (#1008)
This is an automated email from the ASF dual-hosted git repository.
tison pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/incubator-kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new cc492897 Move TCL test integration/slotmigrate to Go case (#1008)
cc492897 is described below
commit cc492897668aa3654dac5ea7d7f076a895d25809
Author: tison <wa...@gmail.com>
AuthorDate: Tue Oct 18 21:36:05 2022 +0800
Move TCL test integration/slotmigrate to Go case (#1008)
Signed-off-by: tison <wa...@gmail.com>
---
.../integration/slotmigrate/slotmigrate_test.go | 340 ++++++++++++-
tests/tcl/tests/integration/slotmigrate.tcl | 554 ---------------------
tests/tcl/tests/test_helper.tcl | 3 +-
3 files changed, 340 insertions(+), 557 deletions(-)
diff --git a/tests/gocase/integration/slotmigrate/slotmigrate_test.go b/tests/gocase/integration/slotmigrate/slotmigrate_test.go
index 49698566..2b125608 100644
--- a/tests/gocase/integration/slotmigrate/slotmigrate_test.go
+++ b/tests/gocase/integration/slotmigrate/slotmigrate_test.go
@@ -22,6 +22,7 @@ package slotmigrate
import (
"context"
"fmt"
+ "strconv"
"strings"
"testing"
"time"
@@ -361,12 +362,349 @@ func TestSlotMigrateThreeNodes(t *testing.T) {
})
}
+func TestSlotMigrateDataType(t *testing.T) {
+ ctx := context.Background()
+
+ srv0 := util.StartServer(t, map[string]string{"cluster-enabled": "yes"})
+ defer func() { srv0.Close() }()
+ rdb0 := srv0.NewClient()
+ defer func() { require.NoError(t, rdb0.Close()) }()
+ id0 := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00"
+ require.NoError(t, rdb0.Do(ctx, "clusterx", "SETNODEID", id0).Err())
+
+ srv1 := util.StartServer(t, map[string]string{"cluster-enabled": "yes"})
+ defer func() { srv1.Close() }()
+ rdb1 := srv1.NewClient()
+ defer func() { require.NoError(t, rdb1.Close()) }()
+ id1 := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01"
+ require.NoError(t, rdb1.Do(ctx, "clusterx", "SETNODEID", id1).Err())
+
+ clusterNodes := fmt.Sprintf("%s %s %d master - 0-10000\n", id0, srv0.Host(), srv0.Port())
+ clusterNodes += fmt.Sprintf("%s %s %d master - 10001-16383", id1, srv1.Host(), srv1.Port())
+ require.NoError(t, rdb0.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err())
+ require.NoError(t, rdb1.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err())
+
+ t.Run("MIGRATE - Cannot migrate two slot at the same time", func(t *testing.T) {
+ cnt := 20000
+ for i := 0; i < cnt; i++ {
+ require.NoError(t, rdb0.LPush(ctx, util.SlotTable[0], i).Err())
+ }
+ require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", "0", id1).Val())
+ require.ErrorContains(t, rdb0.Do(ctx, "clusterx", "migrate", "2", id1).Err(), "There is already a migrating slot")
+ waitForMigrateState(t, rdb0, "0", "success")
+ require.EqualValues(t, cnt, rdb1.LLen(ctx, util.SlotTable[0]).Val())
+ })
+
+ t.Run("MIGRATE - Slot migrate all types of existing data", func(t *testing.T) {
+ keys := make(map[string]string, 0)
+ for _, typ := range []string{"string", "string2", "list", "hash", "set", "zset", "bitmap", "sortint"} {
+ keys[typ] = fmt.Sprintf("%s_{%s}", typ, util.SlotTable[1])
+ require.NoError(t, rdb0.Del(ctx, keys[typ]).Err())
+ }
+ // type string
+ require.NoError(t, rdb0.Set(ctx, keys["string"], keys["string"], 0).Err())
+ require.NoError(t, rdb0.Expire(ctx, keys["string"], 10*time.Second).Err())
+ // type expired string
+ require.NoError(t, rdb0.Set(ctx, keys["string2"], keys["string2"], time.Second).Err())
+ time.Sleep(3 * time.Second)
+ require.Empty(t, rdb0.Get(ctx, keys["string2"]).Val())
+ // type list
+ require.NoError(t, rdb0.RPush(ctx, keys["list"], 0, 1, 2, 3, 4, 5).Err())
+ require.NoError(t, rdb0.LPush(ctx, keys["list"], 9, 3, 7, 3, 5, 4).Err())
+ require.NoError(t, rdb0.LSet(ctx, keys["list"], 5, 0).Err())
+ require.NoError(t, rdb0.LInsert(ctx, keys["list"], "before", 9, 3).Err())
+ require.NoError(t, rdb0.LTrim(ctx, keys["list"], 3, -3).Err())
+ require.NoError(t, rdb0.RPop(ctx, keys["list"]).Err())
+ require.NoError(t, rdb0.LPop(ctx, keys["list"]).Err())
+ require.NoError(t, rdb0.LRem(ctx, keys["list"], 4, 3).Err())
+ require.NoError(t, rdb0.Expire(ctx, keys["list"], 10*time.Second).Err())
+ // type hash
+ require.NoError(t, rdb0.HMSet(ctx, keys["hash"], 0, 1, 2, 3, 4, 5, 6, 7).Err())
+ require.NoError(t, rdb0.HDel(ctx, keys["hash"], "2").Err())
+ require.NoError(t, rdb0.Expire(ctx, keys["hash"], 10*time.Second).Err())
+ // type set
+ require.NoError(t, rdb0.SAdd(ctx, keys["set"], 0, 1, 2, 3, 4, 5).Err())
+ require.NoError(t, rdb0.SRem(ctx, keys["set"], 1, 3).Err())
+ require.NoError(t, rdb0.Expire(ctx, keys["set"], 10*time.Second).Err())
+ // type zset
+ require.NoError(t, rdb0.ZAdd(ctx, keys["zset"], []redis.Z{{0, 1}, {2, 3}, {4, 5}}...).Err())
+ require.NoError(t, rdb0.ZRem(ctx, keys["zset"], 1, 3).Err())
+ require.NoError(t, rdb0.Expire(ctx, keys["zset"], 10*time.Second).Err())
+ // type bitmap
+ for i := 1; i < 20; i += 2 {
+ require.NoError(t, rdb0.SetBit(ctx, keys["bitmap"], int64(i), 1).Err())
+ }
+ for i := 10000; i < 11000; i += 2 {
+ require.NoError(t, rdb0.SetBit(ctx, keys["bitmap"], int64(i), 1).Err())
+ }
+ require.NoError(t, rdb0.Expire(ctx, keys["bitmap"], 10*time.Second).Err())
+ // type sortint
+ require.NoError(t, rdb0.Do(ctx, "SIADD", keys["sortint"], 2, 4, 1, 3).Err())
+ require.NoError(t, rdb0.Do(ctx, "SIREM", keys["sortint"], 1).Err())
+ require.NoError(t, rdb0.Expire(ctx, keys["sortint"], 10*time.Second).Err())
+ // check source data existence
+ for _, typ := range []string{"string", "list", "hash", "set", "zset", "bitmap", "sortint"} {
+ require.EqualValues(t, 1, rdb0.Exists(ctx, keys[typ]).Val())
+ }
+ // get source data
+ lv := rdb0.LRange(ctx, keys["list"], 0, -1).Val()
+ hv := rdb0.HGetAll(ctx, keys["hash"]).Val()
+ sv := rdb0.SMembers(ctx, keys["set"]).Val()
+ zv := rdb0.ZRangeWithScores(ctx, keys["zset"], 0, -1).Val()
+ siv := rdb0.Do(ctx, "SIRANGE", keys["sortint"], 0, -1).Val()
+ // migrate slot 1, all keys above are belong to slot 1
+ require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", "1", id1).Val())
+ waitForMigrateState(t, rdb0, "1", "success")
+ // check destination data
+ // type string
+ require.Equal(t, keys["string"], rdb1.Get(ctx, keys["string"]).Val())
+ util.BetweenValues(t, rdb1.TTL(ctx, keys["string"]).Val(), time.Second, 10*time.Second)
+ require.Empty(t, rdb1.Get(ctx, keys["string2"]).Val())
+ // type list
+ require.EqualValues(t, lv, rdb1.LRange(ctx, keys["list"], 0, -1).Val())
+ util.BetweenValues(t, rdb1.TTL(ctx, keys["list"]).Val(), time.Second, 10*time.Second)
+ // type hash
+ require.EqualValues(t, hv, rdb1.HGetAll(ctx, keys["hash"]).Val())
+ util.BetweenValues(t, rdb1.TTL(ctx, keys["hash"]).Val(), time.Second, 10*time.Second)
+ // type set
+ require.EqualValues(t, sv, rdb1.SMembers(ctx, keys["set"]).Val())
+ util.BetweenValues(t, rdb1.TTL(ctx, keys["set"]).Val(), time.Second, 10*time.Second)
+ // type zset
+ require.EqualValues(t, zv, rdb1.ZRangeWithScores(ctx, keys["zset"], 0, -1).Val())
+ util.BetweenValues(t, rdb1.TTL(ctx, keys["zset"]).Val(), time.Second, 10*time.Second)
+ // type bitmap
+ for i := 1; i < 20; i += 2 {
+ require.EqualValues(t, 1, rdb1.GetBit(ctx, keys["bitmap"], int64(i)).Val())
+ }
+ for i := 10000; i < 11000; i += 2 {
+ require.EqualValues(t, 1, rdb1.GetBit(ctx, keys["bitmap"], int64(i)).Val())
+ }
+ for i := 0; i < 20; i += 2 {
+ require.EqualValues(t, 0, rdb1.GetBit(ctx, keys["bitmap"], int64(i)).Val())
+ }
+ util.BetweenValues(t, rdb1.TTL(ctx, keys["bitmap"]).Val(), time.Second, 10*time.Second)
+ // type sortint
+ require.EqualValues(t, siv, rdb1.Do(ctx, "SIRANGE", keys["sortint"], 0, -1).Val())
+ util.BetweenValues(t, rdb1.TTL(ctx, keys["sortint"]).Val(), time.Second, 10*time.Second)
+ // topology is changed on source server
+ for _, typ := range []string{"string", "list", "hash", "set", "zset", "bitmap", "sortint"} {
+ require.ErrorContains(t, rdb0.Exists(ctx, keys[typ]).Err(), "MOVED")
+ }
+ })
+
+ t.Run("MIGRATE - Accessing slot is forbidden on source server but not on destination server", func(t *testing.T) {
+ require.NoError(t, rdb0.Set(ctx, util.SlotTable[3], 3, 0).Err())
+ require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", "3", id1).Val())
+ waitForMigrateState(t, rdb0, "3", "success")
+ require.ErrorContains(t, rdb0.Set(ctx, util.SlotTable[3], "slot3", 0).Err(), "MOVED")
+ require.ErrorContains(t, rdb0.Del(ctx, util.SlotTable[3]).Err(), "MOVED")
+ require.ErrorContains(t, rdb0.Exists(ctx, util.SlotTable[3]).Err(), "MOVED")
+ require.NoError(t, rdb0.Set(ctx, util.SlotTable[4], "slot4", 0).Err())
+ })
+
+ t.Run("MIGRATE - Slot isn't forbidden writing when starting migrating", func(t *testing.T) {
+ cnt := 20000
+ for i := 0; i < cnt; i++ {
+ require.NoError(t, rdb0.LPush(ctx, util.SlotTable[5], i).Err())
+ }
+ require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", "5", id1).Val())
+ requireMigrateState(t, rdb0, "5", "start")
+ // write during migrating
+ require.EqualValues(t, cnt+1, rdb0.LPush(ctx, util.SlotTable[5], cnt).Val())
+ waitForMigrateState(t, rdb0, "5", "success")
+ require.Equal(t, strconv.Itoa(cnt), rdb1.LPop(ctx, util.SlotTable[5]).Val())
+ })
+
+ t.Run("MIGRATE - Slot keys are not cleared after migration but cleared after setslot", func(t *testing.T) {
+ require.NoError(t, rdb0.Set(ctx, util.SlotTable[6], "slot6", 0).Err())
+ require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", "6", id1).Val())
+ waitForMigrateState(t, rdb0, "6", "success")
+ require.Equal(t, "slot6", rdb1.Get(ctx, util.SlotTable[6]).Val())
+ require.Contains(t, rdb0.Keys(ctx, "*").Val(), util.SlotTable[6])
+ require.NoError(t, rdb0.Do(ctx, "clusterx", "setslot", "6", "node", id1, "2").Err())
+ require.NotContains(t, rdb0.Keys(ctx, "*").Val(), util.SlotTable[6])
+ })
+
+ t.Run("MIGRATE - Migrate incremental data via parsing and filtering data in WAL", func(t *testing.T) {
+ keys := []string{
+ // slot15 key for slowing migrate-speed when migrating existing data
+ util.SlotTable[15],
+ // slot15 all types keys string/hash/set/zset/list/sortint
+ "key:000042915392",
+ "key:000043146202",
+ "key:000044434182",
+ "key:000045189446",
+ "key:000047413016",
+ "key:000049190069",
+ "key:000049930003",
+ "key:000049980785",
+ "key:000056730838",
+ }
+ for _, key := range keys {
+ require.NoError(t, rdb0.Del(ctx, key).Err())
+ }
+ require.NoError(t, rdb0.ConfigSet(ctx, "migrate-speed", "64").Err())
+ require.Equal(t, map[string]string{"migrate-speed": "64"}, rdb0.ConfigGet(ctx, "migrate-speed").Val())
+
+ cnt := 2000
+ for i := 0; i < cnt; i++ {
+ require.NoError(t, rdb0.LPush(ctx, keys[0], i).Err())
+ }
+ require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", "15", id1).Val())
+
+ // write key that doesn't belong to this slot
+ require.NoError(t, rdb0.Del(ctx, util.SlotTable[12]).Err())
+ require.NoError(t, rdb0.Set(ctx, util.SlotTable[12], "slot12", 0).Err())
+
+ // write increment operations include all kinds of types
+ // 1. type string
+ require.NoError(t, rdb0.SetEx(ctx, keys[1], 15, 10000*time.Second).Err())
+ require.NoError(t, rdb0.IncrBy(ctx, keys[1], 2).Err())
+ require.NoError(t, rdb0.DecrBy(ctx, keys[1], 1).Err())
+ require.NoError(t, rdb0.Set(ctx, keys[2], "val", 0).Err())
+ require.NoError(t, rdb0.Del(ctx, keys[2]).Err())
+ require.NoError(t, rdb0.SetBit(ctx, keys[3], 10086, 1).Err())
+ require.NoError(t, rdb0.Expire(ctx, keys[3], 10000*time.Second).Err())
+ require.NoError(t, rdb0.Del(ctx, util.SlotTable[13]).Err())
+ // verify expireat binlog could be parsed
+ require.NoError(t, rdb0.Set(ctx, util.SlotTable[13], "slot13", 0).Err())
+ require.NoError(t, rdb0.ExpireAt(ctx, util.SlotTable[13], time.Now().Add(100*time.Second)).Err())
+ // verify del command
+ require.NoError(t, rdb0.Set(ctx, util.SlotTable[14], "slot14", 0).Err())
+ require.NoError(t, rdb0.Del(ctx, util.SlotTable[14]).Err())
+ // 2. type hash
+ require.NoError(t, rdb0.HMSet(ctx, keys[4], "f1", "1", "f2", "2").Err())
+ require.NoError(t, rdb0.HDel(ctx, keys[4], "f1").Err())
+ require.NoError(t, rdb0.HIncrBy(ctx, keys[4], "f2", 2).Err())
+ require.NoError(t, rdb0.HIncrBy(ctx, keys[4], "f2", -1).Err())
+ // 3. type set
+ require.NoError(t, rdb0.SAdd(ctx, keys[5], 1, 2).Err())
+ require.NoError(t, rdb0.SRem(ctx, keys[5], 1).Err())
+ // 4. type zset
+ require.NoError(t, rdb0.ZAdd(ctx, keys[6], []redis.Z{{2, "m1"}}...).Err())
+ require.NoError(t, rdb0.ZIncrBy(ctx, keys[6], 2, "m1").Err())
+ require.NoError(t, rdb0.ZIncrBy(ctx, keys[6], -1, "m1").Err())
+ require.NoError(t, rdb0.ZAdd(ctx, keys[6], []redis.Z{{3, "m3"}}...).Err())
+ require.NoError(t, rdb0.ZRem(ctx, keys[6], "m3").Err())
+ require.NoError(t, rdb0.ZAdd(ctx, keys[6], []redis.Z{{1, ""}}...).Err())
+ require.NoError(t, rdb0.ZRem(ctx, keys[6], "").Err())
+ // 5. type list
+ require.NoError(t, rdb0.LPush(ctx, keys[7], "item1").Err())
+ require.NoError(t, rdb0.RPush(ctx, keys[7], "item2").Err())
+ require.NoError(t, rdb0.LPush(ctx, keys[7], "item3").Err())
+ require.NoError(t, rdb0.RPush(ctx, keys[7], "item4").Err())
+ require.Equal(t, "item3", rdb0.LPop(ctx, keys[7]).Val())
+ require.Equal(t, "item4", rdb0.RPop(ctx, keys[7]).Val())
+ require.NoError(t, rdb0.LPush(ctx, keys[7], "item7").Err())
+ require.NoError(t, rdb0.RPush(ctx, keys[7], "item8").Err())
+ require.NoError(t, rdb0.LSet(ctx, keys[7], 0, "item5").Err())
+ require.NoError(t, rdb0.LInsert(ctx, keys[7], "before", "item2", "item6").Err())
+ require.NoError(t, rdb0.LRem(ctx, keys[7], 1, "item7").Err())
+ require.NoError(t, rdb0.LTrim(ctx, keys[7], 1, -1).Err())
+ // 6. type bitmap
+ for i := 1; i < 20; i += 2 {
+ require.NoError(t, rdb0.SetBit(ctx, keys[8], int64(i), 1).Err())
+ }
+ for i := 10000; i < 11000; i += 2 {
+ require.NoError(t, rdb0.SetBit(ctx, keys[8], int64(i), 1).Err())
+ }
+ // 7. type sortint
+ require.NoError(t, rdb0.Do(ctx, "SIADD", keys[9], 2, 4, 1, 3).Err())
+ require.NoError(t, rdb0.Do(ctx, "SIREM", keys[9], 2).Err())
+ // check data in source server
+ require.EqualValues(t, cnt, rdb0.LLen(ctx, keys[0]).Val())
+ strv := rdb0.Get(ctx, keys[1]).Val()
+ strt := rdb0.TTL(ctx, keys[1]).Val()
+ bv := rdb0.GetBit(ctx, keys[3], 10086).Val()
+ bt := rdb0.TTL(ctx, keys[3]).Val()
+ hv := rdb0.HGetAll(ctx, keys[4]).Val()
+ sv := rdb0.SMembers(ctx, keys[5]).Val()
+ zv := rdb0.ZRangeWithScores(ctx, keys[6], 0, -1).Val()
+ lv := rdb0.LRange(ctx, keys[7], 0, -1).Val()
+ siv := rdb0.Do(ctx, "SIRANGE", keys[9], 0, -1).Val()
+ waitForMigrateStateInDuration(t, rdb0, "15", "success", time.Minute)
+ waitForImportSate(t, rdb1, "15", "success")
+ // check if the data is consistent
+ // 1. type string
+ require.EqualValues(t, cnt, rdb1.LLen(ctx, keys[0]).Val())
+ require.EqualValues(t, strv, rdb1.Get(ctx, keys[1]).Val())
+ require.Less(t, rdb1.TTL(ctx, keys[1]).Val()-strt, 100*time.Second)
+ require.Empty(t, rdb1.Get(ctx, keys[2]).Val())
+ require.EqualValues(t, bv, rdb1.GetBit(ctx, keys[3], 10086).Val())
+ require.Less(t, rdb1.TTL(ctx, keys[3]).Val()-bt, 100*time.Second)
+ require.ErrorContains(t, rdb1.Exists(ctx, util.SlotTable[13]).Err(), "MOVED")
+ // 2. type hash
+ require.EqualValues(t, hv, rdb1.HGetAll(ctx, keys[4]).Val())
+ require.EqualValues(t, "3", rdb1.HGet(ctx, keys[4], "f2").Val())
+ // 3. type set
+ require.EqualValues(t, sv, rdb1.SMembers(ctx, keys[5]).Val())
+ // 4. type zset
+ require.EqualValues(t, zv, rdb1.ZRangeWithScores(ctx, keys[6], 0, -1).Val())
+ require.EqualValues(t, 3, rdb1.ZScore(ctx, keys[6], "m1").Val())
+ // 5. type list
+ require.EqualValues(t, lv, rdb1.LRange(ctx, keys[7], 0, -1).Val())
+ // 6. type bitmap
+ for i := 1; i < 20; i += 2 {
+ require.EqualValues(t, 1, rdb1.GetBit(ctx, keys[8], int64(i)).Val())
+ }
+ for i := 10000; i < 11000; i += 2 {
+ require.EqualValues(t, 1, rdb1.GetBit(ctx, keys[8], int64(i)).Val())
+ }
+ for i := 0; i < 20; i += 2 {
+ require.EqualValues(t, 0, rdb1.GetBit(ctx, keys[8], int64(i)).Val())
+ }
+ // 7. type sortint
+ require.EqualValues(t, siv, rdb1.Do(ctx, "SIRANGE", keys[9], 0, -1).Val())
+
+ // not migrate if the key doesn't belong to slot 1
+ require.Equal(t, "slot12", rdb0.Get(ctx, util.SlotTable[12]).Val())
+ require.ErrorContains(t, rdb1.Exists(ctx, util.SlotTable[12]).Err(), "MOVED")
+ require.EqualValues(t, 0, rdb0.Exists(ctx, util.SlotTable[14]).Val())
+ })
+
+ t.Run("MIGRATE - Slow migrate speed", func(t *testing.T) {
+ require.NoError(t, rdb0.ConfigSet(ctx, "migrate-speed", "16").Err())
+ require.Equal(t, map[string]string{"migrate-speed": "16"}, rdb0.ConfigGet(ctx, "migrate-speed").Val())
+ require.NoError(t, rdb0.Del(ctx, util.SlotTable[16]).Err())
+ // more than pipeline size(16) and max items(16) in command
+ cnt := 1000
+ for i := 0; i < cnt; i++ {
+ require.NoError(t, rdb0.LPush(ctx, util.SlotTable[16], i).Err())
+ }
+ require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", "16", id1).Val())
+ // should not finish 1.5s
+ time.Sleep(1500 * time.Millisecond)
+ requireMigrateState(t, rdb0, "16", "start")
+ waitForMigrateState(t, rdb0, "16", "success")
+ })
+
+ t.Run("MIGRATE - Data of migrated slot can't be written to source but can be written to destination", func(t *testing.T) {
+ require.NoError(t, rdb0.Del(ctx, util.SlotTable[17]).Err())
+ cnt := 100
+ for i := 0; i < cnt; i++ {
+ require.NoError(t, rdb0.LPush(ctx, util.SlotTable[17], i).Err())
+ }
+ require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", "17", id1).Val())
+ waitForMigrateState(t, rdb0, "17", "success")
+ require.EqualValues(t, cnt, rdb1.LLen(ctx, util.SlotTable[17]).Val())
+ // write the migrated slot to source server
+ k := fmt.Sprintf("{%s}_1", util.SlotTable[17])
+ require.ErrorContains(t, rdb0.Set(ctx, k, "slot17_value1", 0).Err(), "MOVED")
+ // write the migrated slot to destination server
+ require.NoError(t, rdb1.Set(ctx, k, "slot17_value1", 0).Err())
+ })
+}
+
func waitForMigrateState(t testing.TB, client *redis.Client, n, state string) {
+ waitForMigrateStateInDuration(t, client, n, state, 5*time.Second)
+}
+
+func waitForMigrateStateInDuration(t testing.TB, client *redis.Client, n, state string, d time.Duration) {
require.Eventually(t, func() bool {
i := client.ClusterInfo(context.Background()).Val()
return strings.Contains(i, fmt.Sprintf("migrating_slot: %s", n)) &&
strings.Contains(i, fmt.Sprintf("migrating_state: %s", state))
- }, 5*time.Second, 100*time.Millisecond)
+ }, d, 100*time.Millisecond)
}
func requireMigrateState(t testing.TB, client *redis.Client, n, state string) {
diff --git a/tests/tcl/tests/integration/slotmigrate.tcl b/tests/tcl/tests/integration/slotmigrate.tcl
deleted file mode 100644
index e407917d..00000000
--- a/tests/tcl/tests/integration/slotmigrate.tcl
+++ /dev/null
@@ -1,554 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# Import crc16 table
-source "tests/helpers/crc16_slottable.tcl"
-
-start_server {tags {"Src migration server"} overrides {cluster-enabled yes}} {
- set r0 [srv 0 client]
- set node0_host [srv 0 host]
- set node0_port [srv 0 port]
- set node0_id "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00"
- $r0 clusterx setnodeid $node0_id
- start_server {tags {"Dst migration server"} overrides {cluster-enabled yes}} {
- set r1 [srv 0 client]
- set node1_host [srv 0 host]
- set node1_port [srv 0 port]
- set node1_id "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01"
- $r1 clusterx setnodeid $node1_id
-
- set cluster_nodes "$node0_id $node0_host $node0_port master - 0-10000"
- set cluster_nodes "$cluster_nodes\n$node1_id $node1_host $node1_port master - 10001-16383"
- $r0 clusterx setnodes $cluster_nodes 1
- $r1 clusterx setnodes $cluster_nodes 1
-
- test {MIGRATE - Cannot migrate two slot at the same time} {
- # Write some keys
- set slot0_key [lindex $::CRC16_SLOT_TABLE 0]
- set count 2000
- for {set i 0} {$i < $count} {incr i} {
- $r0 lpush $slot0_key $i
- }
-
- # Migrate slot 0
- set ret [$r0 clusterx migrate 0 $node1_id]
- assert { $ret == "OK"}
-
- # Migrate slot 2
- catch {[$r0 clusterx migrate 2 $node1_id]} e
- assert_match {*There is already a migrating slot*} $e
-
- # Migrate slot 0 success
- wait_for_condition 50 100 {
- [string match "*migrating_slot: 0*migrating_state: success*" [$r0 cluster info]]
- } else {
- fail "Fail to migrate slot 0"
- }
-
- # Check migrated data on destination server
- assert {[$r1 llen $slot0_key] == $count}
- }
-
- test {MIGRATE - Slot migrate all types of existing data} {
- # Set keys
- set slot1_tag [lindex $::CRC16_SLOT_TABLE 1]
- set slot1_key_string string_{$slot1_tag}
- set slot1_key_string2 string2_{$slot1_tag}
- set slot1_key_list list_{$slot1_tag}
- set slot1_key_hash hash_{$slot1_tag}
- set slot1_key_set set_{$slot1_tag}
- set slot1_key_zset zset_{$slot1_tag}
- set slot1_key_bitmap bitmap_{$slot1_tag}
- set slot1_key_sortint sortint_{$slot1_tag}
-
- # Clear keys
- $r0 del $slot1_key_string
- $r0 del $slot1_key_list
- $r0 del $slot1_key_hash
- $r0 del $slot1_key_set
- $r0 del $slot1_key_zset
- $r0 del $slot1_key_bitmap
- $r0 del $slot1_key_sortint
-
- # All keys belong to slot 1
- # Type: string
- $r0 set $slot1_key_string $slot1_key_string
- $r0 expire $slot1_key_string 10000
-
- # Expired string key
- $r0 set $slot1_key_string2 $slot1_key_string2 ex 1
- after 3000
- assert_equal [$r0 get $slot1_key_string2] {}
-
- # Type: list
- $r0 rpush $slot1_key_list 0 1 2 3 4 5
- $r0 lpush $slot1_key_list 9 3 7 3 5 4
- $r0 lset $slot1_key_list 5 0
- $r0 linsert $slot1_key_list before 9 3
- $r0 ltrim $slot1_key_list 3 -3
- $r0 rpop $slot1_key_list
- $r0 lpop $slot1_key_list
- $r0 lrem $slot1_key_list 4 3
- $r0 expire $slot1_key_list 10000
-
- # Type: hash
- $r0 hmset $slot1_key_hash 0 1 2 3 4 5 6 7
- $r0 hdel $slot1_key_hash 2
- $r0 expire $slot1_key_hash 10000
-
- # Type: set
- $r0 sadd $slot1_key_set 0 1 2 3 4 5
- $r0 srem $slot1_key_set 1 3
- $r0 expire $slot1_key_set 10000
-
- # Type: zset
- $r0 zadd $slot1_key_zset 0 1 2 3 4 5 6 7
- $r0 zrem $slot1_key_zset 1 3
- $r0 expire $slot1_key_zset 10000
-
- # Type: bitmap
- for {set i 1} {$i < 20} {incr i 2} {
- $r0 setbit $slot1_key_bitmap $i 1
- }
- for {set i 10000} {$i < 11000} {incr i 2} {
- $r0 setbit $slot1_key_bitmap $i 1
- }
- $r0 expire $slot1_key_bitmap 10000
-
- # Type: sortint
- $r0 siadd $slot1_key_sortint 2 4 1 3
- $r0 sirem $slot1_key_sortint 1
- $r0 expire $slot1_key_sortint 10000
-
- # Check src data
- assert {[$r0 exists $slot1_key_string] == 1}
- assert {[$r0 exists $slot1_key_list] == 1}
- assert {[$r0 exists $slot1_key_hash] == 1}
- assert {[$r0 exists $slot1_key_set] == 1}
- assert {[$r0 exists $slot1_key_zset] == 1}
- assert {[$r0 exists $slot1_key_bitmap] == 1}
- assert {[$r0 exists $slot1_key_sortint] == 1}
-
- # Get src data
- set lvalue [$r0 lrange $slot1_key_list 0 -1]
- set havlue [$r0 hgetall $slot1_key_hash]
- set svalue [$r0 smembers $slot1_key_set]
- set zvalue [$r0 zrange $slot1_key_zset 0 -1 WITHSCORES]
- set sivalue [$r0 sirange $slot1_key_sortint 0 -1]
-
- # Migrate slot 1, all keys above are belong to slot 1
- set ret [$r0 clusterx migrate 1 $node1_id]
- assert {$ret == "OK"}
-
- # Wait finish slot migrating
- wait_for_condition 50 100 {
- [string match "*migrating_slot: 1*migrating_state: success*" [$r0 cluster info]]
- } else {
- fail "Fail to migrate slot 1"
- }
- after 10
-
- # Check dst data
- # Check string and expired time
- assert {[$r1 get $slot1_key_string] == "$slot1_key_string"}
- set expire_time [$r1 ttl $slot1_key_string]
- assert {$expire_time > 1000 && $expire_time <= 10000}
-
- assert_equal [$r1 get $slot1_key_string2] {}
-
- # Check list and expired time
- assert {[$r1 lrange $slot1_key_list 0 -1] eq $lvalue}
- set expire_time [$r1 ttl $slot1_key_list]
- assert {$expire_time > 1000 && $expire_time <= 10000}
-
- # Check hash and expired time
- assert {[$r1 hgetall $slot1_key_hash] eq $havlue}
- set expire_time [$r1 ttl $slot1_key_hash]
- assert {$expire_time > 1000 && $expire_time <= 10000}
-
- # Check set and expired time
- assert {[$r1 smembers $slot1_key_set] eq $svalue}
- set expire_time [$r1 ttl $slot1_key_set]
- assert {$expire_time > 1000 && $expire_time <= 10000}
-
- # Check zset and expired time
- assert {[$r1 zrange $slot1_key_zset 0 -1 WITHSCORES] eq $zvalue}
- set expire_time [$r1 ttl $slot1_key_zset]
- assert {$expire_time > 1000 && $expire_time <= 10000}
-
- # Check bitmap and expired time
- for {set i 1} {$i < 20} {incr i 2} {
- assert {[$r1 getbit $slot1_key_bitmap $i] == {1}}
- }
- for {set i 10000} {$i < 11000} {incr i 2} {
- assert {[$r1 getbit $slot1_key_bitmap $i] == {1}}
- }
- for {set i 0} {$i < 20} {incr i 2} {
- assert {[$r1 getbit $slot1_key_bitmap $i] == {0}}
- }
- set expire_time [$r1 ttl $slot1_key_bitmap]
- assert {$expire_time > 1000 && $expire_time <= 10000}
-
- # Check sortint and expired time
- assert {[$r1 sirange $slot1_key_sortint 0 -1] eq $sivalue}
- set expire_time [$r1 ttl $slot1_key_sortint]
- assert {$expire_time > 1000 && $expire_time <= 10000}
-
- # Topology is changed on src server
- catch {$r0 exists $slot1_key_string} e
- assert_match {*MOVED*} $e
- catch {$r0 exists $slot1_key_list} e
- assert_match {*MOVED*} $e
- catch {$r0 exists $slot1_key_hash} e
- assert_match {*MOVED*} $e
- catch {$r0 exists $slot1_key_set} e
- assert_match {*MOVED*} $e
- catch {$r0 exists $slot1_key_zset} e
- assert_match {*MOVED*} $e
- catch {$r0 exists $slot1_key_bitmap} e
- assert_match {*MOVED*} $e
- catch {$r0 exists $slot1_key_sortint} e
- assert_match {*MOVED*} $e
- }
-
-
- test {MIGRATE - Accessing slot is forbidden on source server but not on destination server} {
- # migrate slot 3
- set slot3_key [lindex $::CRC16_SLOT_TABLE 3]
- $r0 set $slot3_key 3
-
- set ret [$r0 clusterx migrate 3 $node1_id]
- assert {$ret == "OK"}
- wait_for_condition 50 100 {
- [string match "*migrating_slot: 3*migrating_state: success*" [$r0 cluster info]]
- } else {
- fail "Slot 3 can't migrate"
- }
- after 10
-
- # Writing the key belongs to slot 3 should be MOVED
- catch {$r0 set $slot3_key slot3} e
- assert_match {*MOVED*} $e
-
- catch {$r0 del $slot3_key} e
- assert_match {*MOVED*} $e
-
- # Reading the key belongs to slot 3 should be MOVED
- catch {$r0 exists $slot3_key} e
- assert_match {*MOVED*} $e
-
- # Writing the key belongs to slot 4 must be ok
- set slot4_key [lindex $::CRC16_SLOT_TABLE 4]
- set ret [$r0 set $slot4_key slot4]
- assert {$ret == "OK"}
- }
-
- test {MIGRATE - Slot isn't forbidden writing when starting migrating} {
- # Write much data for slot 5
- set slot5_key [lindex $::CRC16_SLOT_TABLE 5]
- set count 10000
- for {set i 0} {$i < $count} {incr i} {
- $r0 lpush $slot5_key $i
- }
-
- # Migrate slot 5
- set ret [$r0 clusterx migrate 5 $node1_id]
- assert {$ret == "OK"}
-
- # Migrate status START(migrating)
- if {[string match "*migrating_slot: 5*migrating_state: start*" [$r0 cluster info]]} {
- # Write during migrating
- set num [$r0 lpush $slot5_key $count]
- assert {$num == [expr $count + 1]}
- } else {
- puts "Migrating too quickly? Please retry."
- }
-
- # Check dst server receiving all data when migrate slot snapshot
- wait_for_condition 50 1000 {
- [string match "*migrating_slot: 5*migrating_state: success*" [$r0 cluster info]]
- } else {
- fail "Slot 5 migrating is not finished"
- }
-
- # Check value which is written during migrating
- set lastval [$r1 lpop $slot5_key]
- assert {$lastval == $count}
- }
-
- test {MIGRATE - Slot keys are not cleared after migration but cleared after setslot} {
- set slot6_key [lindex $::CRC16_SLOT_TABLE 6]
- assert {[$r0 set $slot6_key "slot6"] == "OK"}
- # Check key in src server
- assert {[$r0 get $slot6_key] == "slot6"}
- set ret [$r0 clusterx migrate 6 $node1_id]
- assert {$ret == "OK"}
-
- # Migrate slot
- wait_for_condition 50 100 {
- [string match "*migrating_slot: 6*migrating_state: success*" [$r0 cluster info]]
- } else {
- fail "Fail to migrate slot 6"
- }
- # Check key in destination server
- assert {[$r1 get $slot6_key] == "slot6"}
- # Check key in source server
- assert {[string match "*$slot6_key*" [$r0 keys *]] != 0}
-
- # Change topology by 'setslot'
- $r0 clusterx setslot 6 node $node1_id 2
- # Check key is cleared after 'setslot'
- assert {[string match "*$slot6_key*" [$r0 keys *]] == 0}
- }
-
- test {MIGRATE - Migrate incremental data via parsing and filtering data in WAL} {
- # Slot15 keys
- set slot15_key_1 [lindex $::CRC16_SLOT_TABLE 15]
- set slot15_key_2 key:000042915392
- set slot15_key_3 key:000043146202
- set slot15_key_4 key:000044434182
- set slot15_key_5 key:000045189446
- set slot15_key_6 key:000047413016
- set slot15_key_7 key:000049190069
- set slot15_key_8 key:000049930003
- set slot15_key_9 key:000049980785
- set slot15_key_10 key:000056730838
-
- # Slot15 key for slowing migrate-speed when migrating existing data
- $r0 del $slot15_key_1
- # Slot15 all types keys string/hash/set/zset/list/sortint
- $r0 del $slot15_key_2
- $r0 del $slot15_key_3
- $r0 del $slot15_key_4
- $r0 del $slot15_key_5
- $r0 del $slot15_key_6
- $r0 del $slot15_key_7
- $r0 del $slot15_key_8
- $r0 del $slot15_key_9
- $r0 del $slot15_key_10
-
- # Set slow migrate
- $r0 config set migrate-speed 64
- catch {$r0 config get migrate-speed} e
- assert_match {*64*} $e
-
- set count 2000
- for {set i 0} {$i < $count} {incr i} {
- $r0 lpush $slot15_key_1 $i
- }
- set ret [$r0 clusterx migrate 15 $node1_id]
- assert {$ret == "OK"}
-
- # Write key that doesn't belong to this slot
- set slot12_key [lindex $::CRC16_SLOT_TABLE 12]
- $r0 del $slot12_key
- $r0 set $slot12_key slot12
-
- # Write increment operations include all kinds of types
- # Type: string
- # Use 'setex' to replace 'set' and 'expire', 'setex' will generate
- # set key and expire key
- $r0 setex $slot15_key_2 10000 15
- $r0 incrby $slot15_key_2 2
- $r0 decrby $slot15_key_2 1
- $r0 set $slot15_key_3 val
- $r0 del $slot15_key_3
- $r0 setbit $slot15_key_4 10086 1
- $r0 expire $slot15_key_4 1000
- set slot13_key [lindex $::CRC16_SLOT_TABLE 13]
- $r0 del $slot13_key
- # Just verify expireat binlog could be parsed
- $r0 set $slot13_key slot13
- $r0 expireat $slot13_key [expr 100 + [clock seconds]]
- # Verify del command
- set slot14_key [lindex $::CRC16_SLOT_TABLE 14]
- $r0 set $slot14_key slot14
- $r0 del $slot14_key
-
- # Type: hash
- $r0 hmset $slot15_key_5 f1 1 f2 2
- $r0 hdel $slot15_key_5 f1
- $r0 hincrby $slot15_key_5 f2 2
- $r0 hincrby $slot15_key_5 f2 -1
-
- # Type: set
- $r0 sadd $slot15_key_6 1 2
- $r0 srem $slot15_key_6 1
-
- # Type: zset
- $r0 zadd $slot15_key_7 2 m1
- $r0 zincrby $slot15_key_7 2 m1
- $r0 zincrby $slot15_key_7 -1 m1
- $r0 zadd $slot15_key_7 3 m3
- $r0 zrem $slot15_key_7 m3
- $r0 zadd $slot15_key_7 1 ""
- $r0 zrem $slot15_key_7 ""
-
- # Type: list
- $r0 lpush $slot15_key_8 item1
- $r0 rpush $slot15_key_8 item2
- $r0 lpush $slot15_key_8 item3
- $r0 rpush $slot15_key_8 item4
- assert {[$r0 lpop $slot15_key_8] == {item3}}
- assert {[$r0 rpop $slot15_key_8] == {item4}}
- $r0 lpush $slot15_key_8 item7
- $r0 rpush $slot15_key_8 item8
- $r0 lset $slot15_key_8 0 item5
- $r0 linsert $slot15_key_8 before item2 item6
- $r0 lrem $slot15_key_8 1 item7
- $r0 ltrim $slot15_key_8 1 -1
-
- # Type: bitmap
- for {set i 1} {$i < 20} {incr i 2} {
- $r0 setbit $slot15_key_9 $i 1
- }
- for {set i 10000} {$i < 11000} {incr i 2} {
- $r0 setbit $slot15_key_9 $i 1
- }
-
- # Type: sortint
- $r0 siadd $slot15_key_10 2 4 1 3
- $r0 sirem $slot15_key_10 2
-
- # Chek data in src server
- assert {[$r0 llen $slot15_key_1] == $count}
- set strvalue [$r0 get $slot15_key_2]
- set strttl [$r0 ttl $slot15_key_2]
- set bitvalue [$r0 getbit $slot15_key_4 10086]
- set bitttl [$r0 ttl $slot15_key_4]
- set hvalue [$r0 hgetall $slot15_key_5]
- set svalue [$r0 smembers $slot15_key_6]
- set zvalue [$r0 zrange $slot15_key_7 0 -1 withscores]
- set lvalue [$r0 lrange $slot15_key_8 0 -1]
- set sivalue [$r0 sirange $slot15_key_10 0 -1]
-
- # Wait for finishing
- while 1 {
- if {[string match "*migrating_slot: 15*migrating_state: success*" [$r0 cluster info]]} {
- break
- }
- }
- while 1 {
- if {[string match "*15*success*" [$r1 cluster info]]} {
- break
- }
- }
-
- # Check if the data is consistent
- # List key of existing data
- assert {[$r1 llen $slot15_key_1] == $count}
- # String
- assert {[$r1 get $slot15_key_2] eq $strvalue}
- assert {[expr $strttl - [$r1 ttl $slot15_key_2]] < 100}
- assert {[$r1 get $slot15_key_3] eq {}}
- assert {[$r1 getbit $slot15_key_4 10086] == $bitvalue}
- assert {[expr $bitttl - [$r1 ttl $slot15_key_4]] < 100}
- catch {$r1 exists $slot13_key} e
- assert_match {*MOVED*} $e
- # Hash
- assert {[$r1 hgetall $slot15_key_5] eq $hvalue}
- assert {[$r1 hget $slot15_key_5 f2] == 3}
- # Set
- assert {[$r1 smembers $slot15_key_6] eq $svalue}
- # Zset
- assert {[$r1 zrange $slot15_key_7 0 -1 withscores] eq $zvalue}
- assert {[$r1 zscore $slot15_key_7 m1] == 3}
- # List
- assert {[$r1 lrange $slot15_key_8 0 -1] eq $lvalue}
- # Bitmap
- for {set i 1} {$i < 20} {incr i 2} {
- assert {[$r1 getbit $slot15_key_9 $i] == {1}}
- }
- for {set i 10000} {$i < 11000} {incr i 2} {
- assert {[$r1 getbit $slot15_key_9 $i] == {1}}
- }
- for {set i 0} {$i < 20} {incr i 2} {
- assert {[$r1 getbit $slot15_key_9 $i] == {0}}
- }
- # Sortint
- assert {[$r1 sirange $slot15_key_10 0 -1] eq $sivalue}
- # Not migrate if the key doesn't belong to slot 1
- assert {[$r0 get $slot12_key] eq "slot12"}
- catch {$r1 exists $slot12_key} e
- assert_match {*MOVED*} $e
- assert {[$r0 exists $slot14_key] == 0}
- }
-
- test {MIGRATE - Slow migrate speed} {
- # Set slow speed
- $r0 config set migrate-speed 16
- catch {$r0 config get migrate-speed} e
- assert_match {*16*} $e
-
- # Construct key
- set slot16_key [lindex $::CRC16_SLOT_TABLE 16]
- $r0 del $slot16_key
-
- # More than pipeline size(16) and max items(16) in command
- set count 1000
- for {set i 0} {$i < $count} {incr i} {
- $r0 lpush $slot16_key $i
- }
- # Migrate slot 16 from node1 to node0
- set ret [$r0 clusterx migrate 16 $node1_id]
- assert {$ret == "OK"}
-
- # Should not finish 1.5s
- after 1500
- catch {[$r0 cluster info]} e
- assert_match {*migrating_slot: 16*start*} $e
-
- # Check if finish
- wait_for_condition 50 1000 {
- [string match "*migrating_slot: 16*success*" [$r0 cluster info]]
- } else {
- fail "Slot 16 importing is not finished"
- }
- }
-
- test {MIGRATE - Data of migrated slot can't be written to source but can be written to destination} {
- # Construct key
- set slot17_key [lindex $::CRC16_SLOT_TABLE 17]
- $r0 del $slot17_key
-
- # Write data
- set count 100
- for {set i 0} {$i < $count} {incr i} {
- $r0 lpush $slot17_key $i
- }
- # Migrate slot 17 from node0 to node1
- set ret [$r0 clusterx migrate 17 $node1_id]
- assert {$ret == "OK"}
- # Check if finished
- wait_for_condition 50 1000 {
- [string match "*migrating_slot: 17*success*" [$r0 cluster info]]
- } else {
- fail "Slot 17 importing is not finished"
- }
- # Check data
- assert {[$r1 llen $slot17_key] == $count}
- # Write the migrated slot to source server
- set slot17_key1 "{$slot17_key}_1"
- catch {$r0 set $slot17_key1 slot17_value1} e
- assert_match {*MOVED*} $e
-
- # Write the migrated slot to destination server
- assert {[$r1 set $slot17_key1 slot17_value1] == "OK"}
- }
- }
-
-}
diff --git a/tests/tcl/tests/test_helper.tcl b/tests/tcl/tests/test_helper.tcl
index 22ac3530..2d99606e 100644
--- a/tests/tcl/tests/test_helper.tcl
+++ b/tests/tcl/tests/test_helper.tcl
@@ -34,7 +34,6 @@ source tests/support/util.tcl
set ::all_tests {
unit/type/list
- integration/slotmigrate
}
# Index to the next test to run in the ::all_tests list.
@@ -686,7 +685,7 @@ if {[llength $::skipunits] > 0} {
# Override the list of tests with the specific tests we want to run
# in case there was some filter, that is --single, -skipunit or --skip-till options.
-if {[llength $filtered_tests] < [llength $::all_tests]} {
+if {[llength $filtered_tests] > 0} {
set ::all_tests $filtered_tests
}