You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kvrocks.apache.org by ti...@apache.org on 2022/10/18 01:46:15 UTC
[incubator-kvrocks] branch unstable updated: test: migrate part of slotmigrate tests (#1005)
This is an automated email from the ASF dual-hosted git repository.
tison pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/incubator-kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new fcb3a193 test: migrate part of slotmigrate tests (#1005)
fcb3a193 is described below
commit fcb3a19389fbf29ea6f72678ed6923e7fbcdc717
Author: tison <wa...@gmail.com>
AuthorDate: Tue Oct 18 09:46:10 2022 +0800
test: migrate part of slotmigrate tests (#1005)
Signed-off-by: tison <wa...@gmail.com>
---
.../integration/slotmigrate/slotmigrate_test.go | 290 ++++++++++++++++-
tests/gocase/unit/type/zset/zset_test.go | 3 +-
tests/tcl/tests/integration/slotmigrate.tcl | 348 ---------------------
3 files changed, 287 insertions(+), 354 deletions(-)
diff --git a/tests/gocase/integration/slotmigrate/slotmigrate_test.go b/tests/gocase/integration/slotmigrate/slotmigrate_test.go
index 67309d5e..49698566 100644
--- a/tests/gocase/integration/slotmigrate/slotmigrate_test.go
+++ b/tests/gocase/integration/slotmigrate/slotmigrate_test.go
@@ -22,11 +22,14 @@ package slotmigrate
import (
"context"
"fmt"
+ "strings"
"testing"
"time"
"github.com/apache/incubator-kvrocks/tests/gocase/util"
+ "github.com/go-redis/redis/v9"
"github.com/stretchr/testify/require"
+ "golang.org/x/exp/slices"
)
func TestSlotMigrateFromSlave(t *testing.T) {
@@ -60,7 +63,7 @@ func TestSlotMigrateFromSlave(t *testing.T) {
})
}
-func TestSlotMigrateServerFailed(t *testing.T) {
+func TestSlotMigrateDestServerKilled(t *testing.T) {
ctx := context.Background()
srv0 := util.StartServer(t, map[string]string{"cluster-enabled": "yes"})
@@ -101,8 +104,287 @@ func TestSlotMigrateServerFailed(t *testing.T) {
srv1Alive = false
require.NoError(t, rdb0.Do(ctx, "clusterx", "migrate", "1", id1).Err())
time.Sleep(50 * time.Millisecond)
- i := rdb0.ClusterInfo(ctx).Val()
- require.Contains(t, i, "migrating_slot: 1")
- require.Contains(t, i, "migrating_state: fail")
+ requireMigrateState(t, rdb0, "1", "fail")
})
}
+
+func TestSlotMigrateDestServerKilledAgain(t *testing.T) {
+ ctx := context.Background()
+
+ srv0 := util.StartServer(t, map[string]string{"cluster-enabled": "yes"})
+ defer func() { srv0.Close() }()
+ rdb0 := srv0.NewClient()
+ defer func() { require.NoError(t, rdb0.Close()) }()
+ id0 := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00"
+ require.NoError(t, rdb0.Do(ctx, "clusterx", "SETNODEID", id0).Err())
+
+ srv1 := util.StartServer(t, map[string]string{"cluster-enabled": "yes"})
+ srv1Alive := true
+ defer func() {
+ if srv1Alive {
+ srv1.Close()
+ }
+ }()
+ rdb1 := srv1.NewClient()
+ defer func() { require.NoError(t, rdb1.Close()) }()
+ id1 := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01"
+ require.NoError(t, rdb1.Do(ctx, "clusterx", "SETNODEID", id1).Err())
+
+ clusterNodes := fmt.Sprintf("%s %s %d master - 0-10000\n", id0, srv0.Host(), srv0.Port())
+ clusterNodes += fmt.Sprintf("%s %s %d master - 10001-16383", id1, srv1.Host(), srv1.Port())
+ require.NoError(t, rdb0.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err())
+ require.NoError(t, rdb1.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err())
+
+ t.Run("MIGRATE - Migrate slot with empty string key or value", func(t *testing.T) {
+ require.NoError(t, rdb0.Set(ctx, "", "slot0", 0).Err())
+ require.NoError(t, rdb0.Del(ctx, util.SlotTable[0]).Err())
+ require.NoError(t, rdb0.Set(ctx, util.SlotTable[0], "", 0).Err())
+ time.Sleep(500 * time.Millisecond)
+ require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", "0", id1).Val())
+ waitForMigrateState(t, rdb0, "0", "success")
+ require.Equal(t, "slot0", rdb1.Get(ctx, "").Val())
+ require.Equal(t, "", rdb1.Get(ctx, util.SlotTable[0]).Val())
+ require.NoError(t, rdb1.Del(ctx, util.SlotTable[0]).Err())
+ })
+
+ t.Run("MIGRATE - Migrate binary key-value", func(t *testing.T) {
+ k1 := fmt.Sprintf("\x3a\x88{%s}\x3d\xaa", util.SlotTable[1])
+ cnt := 257
+ for i := 0; i < cnt; i++ {
+ require.NoError(t, rdb0.LPush(ctx, k1, "\0000\0001").Err())
+ }
+ require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", "1", id1).Val())
+ k2 := fmt.Sprintf("\x49\x1f\x7f{%s}\xaf", util.SlotTable[1])
+ require.NoError(t, rdb0.Set(ctx, k2, "\0000\0001", 0).Err())
+ time.Sleep(time.Second)
+ waitForImportSate(t, rdb1, "1", "success")
+ require.EqualValues(t, cnt, rdb1.LLen(ctx, k1).Val())
+ require.Equal(t, "\0000\0001", rdb1.LPop(ctx, k1).Val())
+ require.Equal(t, "\0000\0001", rdb1.Get(ctx, k2).Val())
+ })
+
+ t.Run("MIGRATE - Migrate empty slot", func(t *testing.T) {
+ require.NoError(t, rdb0.FlushDB(ctx).Err())
+ require.NoError(t, rdb1.FlushDB(ctx).Err())
+ time.Sleep(500 * time.Millisecond)
+ require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", "2", id1).Val())
+ waitForMigrateState(t, rdb0, "2", "success")
+ require.NoError(t, rdb1.Keys(ctx, "*").Err())
+ })
+
+ t.Run("MIGRATE - Fail to migrate slot because destination server is killed while migrating", func(t *testing.T) {
+ for i := 0; i < 20000; i++ {
+ require.NoError(t, rdb0.LPush(ctx, util.SlotTable[8], i).Err())
+ }
+ require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", "8", id1).Val())
+ requireMigrateState(t, rdb0, "8", "start")
+ srv1.Close()
+ srv1Alive = false
+ time.Sleep(time.Second)
+ requireMigrateState(t, rdb0, "8", "fail")
+ })
+}
+
+func TestSlotMigrateSourceServerFlushedOrKilled(t *testing.T) {
+ ctx := context.Background()
+
+ srv0 := util.StartServer(t, map[string]string{"cluster-enabled": "yes"})
+ srv0Alive := true
+ defer func() {
+ if srv0Alive {
+ srv0.Close()
+ }
+ }()
+ rdb0 := srv0.NewClient()
+ defer func() { require.NoError(t, rdb0.Close()) }()
+ id0 := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00"
+ require.NoError(t, rdb0.Do(ctx, "clusterx", "SETNODEID", id0).Err())
+
+ srv1 := util.StartServer(t, map[string]string{"cluster-enabled": "yes"})
+ defer func() { srv1.Close() }()
+ rdb1 := srv1.NewClient()
+ defer func() { require.NoError(t, rdb1.Close()) }()
+ id1 := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01"
+ require.NoError(t, rdb1.Do(ctx, "clusterx", "SETNODEID", id1).Err())
+
+ clusterNodes := fmt.Sprintf("%s %s %d master - 0-10000\n", id0, srv0.Host(), srv0.Port())
+ clusterNodes += fmt.Sprintf("%s %s %d master - 10001-16383", id1, srv1.Host(), srv1.Port())
+ require.NoError(t, rdb0.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err())
+ require.NoError(t, rdb1.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err())
+
+ t.Run("MIGRATE - Fail to migrate slot because source server is flushed", func(t *testing.T) {
+ for i := 0; i < 20000; i++ {
+ require.NoError(t, rdb0.LPush(ctx, util.SlotTable[11], i).Err())
+ }
+ require.NoError(t, rdb0.ConfigSet(ctx, "migrate-speed", "32").Err())
+ require.Equal(t, map[string]string{"migrate-speed": "32"}, rdb0.ConfigGet(ctx, "migrate-speed").Val())
+ require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", "11", id1).Val())
+ waitForMigrateState(t, rdb0, "11", "start")
+ require.NoError(t, rdb0.FlushDB(ctx).Err())
+ time.Sleep(time.Second)
+ waitForMigrateState(t, rdb0, "11", "fail")
+ })
+
+ t.Run("MIGRATE - Fail to migrate slot because source server is killed while migrating", func(t *testing.T) {
+ for i := 0; i < 20000; i++ {
+ require.NoError(t, rdb0.LPush(ctx, util.SlotTable[20], i).Err())
+ }
+ require.NoError(t, rdb0.ConfigSet(ctx, "migrate-speed", "32").Err())
+ require.Equal(t, map[string]string{"migrate-speed": "32"}, rdb0.ConfigGet(ctx, "migrate-speed").Val())
+ require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", "20", id1).Val())
+ require.Eventually(t, func() bool {
+ return slices.Contains(rdb1.Keys(ctx, "*").Val(), util.SlotTable[20])
+ }, 5*time.Second, 100*time.Millisecond)
+
+ srv0.Close()
+ srv0Alive = false
+ time.Sleep(100 * time.Millisecond)
+ require.NotContains(t, rdb1.Keys(ctx, "*").Val(), util.SlotTable[20])
+ })
+}
+
+func TestSlotMigrateNewNodeAndAuth(t *testing.T) {
+ ctx := context.Background()
+
+ srv0 := util.StartServer(t, map[string]string{"cluster-enabled": "yes"})
+ defer func() { srv0.Close() }()
+ rdb0 := srv0.NewClient()
+ defer func() { require.NoError(t, rdb0.Close()) }()
+ id0 := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00"
+ require.NoError(t, rdb0.Do(ctx, "clusterx", "SETNODEID", id0).Err())
+
+ srv1 := util.StartServer(t, map[string]string{"cluster-enabled": "yes"})
+ defer func() { srv1.Close() }()
+ rdb1 := srv1.NewClient()
+ defer func() { require.NoError(t, rdb1.Close()) }()
+ id1 := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01"
+ require.NoError(t, rdb1.Do(ctx, "clusterx", "SETNODEID", id1).Err())
+
+ clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", id0, srv0.Host(), srv0.Port())
+ clusterNodes += fmt.Sprintf("%s %s %d master -", id1, srv1.Host(), srv1.Port())
+ require.NoError(t, rdb0.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err())
+ require.NoError(t, rdb1.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err())
+
+ t.Run("MIGRATE - Migrate slot to newly added node", func(t *testing.T) {
+ require.NoError(t, rdb0.Del(ctx, util.SlotTable[21]).Err())
+ require.ErrorContains(t, rdb1.Set(ctx, util.SlotTable[21], "foobar", 0).Err(), "MOVED")
+
+ cnt := 100
+ for i := 0; i < cnt; i++ {
+ require.NoError(t, rdb0.LPush(ctx, util.SlotTable[21], i).Err())
+ }
+ require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", "21", id1).Val())
+ waitForMigrateState(t, rdb0, "21", "success")
+ require.EqualValues(t, cnt, rdb1.LLen(ctx, util.SlotTable[21]).Val())
+
+ k := fmt.Sprintf("{%s}_1", util.SlotTable[21])
+ require.ErrorContains(t, rdb0.Set(ctx, k, "slot21_value1", 0).Err(), "MOVED")
+ require.Equal(t, "OK", rdb1.Set(ctx, k, "slot21_value1", 0).Val())
+ })
+
+ t.Run("MIGRATE - Auth before migrating slot", func(t *testing.T) {
+ require.NoError(t, rdb1.ConfigSet(ctx, "requirepass", "password").Err())
+ cnt := 100
+ for i := 0; i < cnt; i++ {
+ require.NoError(t, rdb0.LPush(ctx, util.SlotTable[22], i).Err())
+ }
+
+ // migrating slot will fail if no auth
+ require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", "22", id1).Val())
+ waitForMigrateState(t, rdb0, "22", "fail")
+ require.ErrorContains(t, rdb1.Exists(ctx, util.SlotTable[22]).Err(), "MOVED")
+
+ // migrating slot will fail if auth with wrong password
+ require.NoError(t, rdb0.ConfigSet(ctx, "requirepass", "pass").Err())
+ require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", "22", id1).Val())
+ waitForMigrateState(t, rdb0, "22", "fail")
+ require.ErrorContains(t, rdb1.Exists(ctx, util.SlotTable[22]).Err(), "MOVED")
+
+ // migrating slot will succeed if auth with right password
+ require.NoError(t, rdb0.ConfigSet(ctx, "requirepass", "password").Err())
+ require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", "22", id1).Val())
+ waitForMigrateState(t, rdb0, "22", "success")
+ require.EqualValues(t, 1, rdb1.Exists(ctx, util.SlotTable[21]).Val())
+ require.EqualValues(t, cnt, rdb1.LLen(ctx, util.SlotTable[22]).Val())
+ })
+}
+
+func TestSlotMigrateThreeNodes(t *testing.T) {
+ ctx := context.Background()
+
+ srv0 := util.StartServer(t, map[string]string{"cluster-enabled": "yes"})
+ defer func() { srv0.Close() }()
+ rdb0 := srv0.NewClient()
+ defer func() { require.NoError(t, rdb0.Close()) }()
+ id0 := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00"
+ require.NoError(t, rdb0.Do(ctx, "clusterx", "SETNODEID", id0).Err())
+
+ srv1 := util.StartServer(t, map[string]string{"cluster-enabled": "yes"})
+ defer func() { srv1.Close() }()
+ rdb1 := srv1.NewClient()
+ defer func() { require.NoError(t, rdb1.Close()) }()
+ id1 := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01"
+ require.NoError(t, rdb1.Do(ctx, "clusterx", "SETNODEID", id1).Err())
+
+ srv2 := util.StartServer(t, map[string]string{"cluster-enabled": "yes"})
+ defer func() { srv2.Close() }()
+ rdb2 := srv2.NewClient()
+ defer func() { require.NoError(t, rdb2.Close()) }()
+ id2 := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx02"
+ require.NoError(t, rdb2.Do(ctx, "clusterx", "SETNODEID", id2).Err())
+
+ clusterNodes := fmt.Sprintf("%s %s %d master - 0-10000\n", id0, srv0.Host(), srv0.Port())
+ clusterNodes += fmt.Sprintf("%s %s %d slave %s\n", id1, srv1.Host(), srv1.Port(), id0)
+ clusterNodes += fmt.Sprintf("%s %s %d master - 10001-16383", id2, srv2.Host(), srv2.Port())
+ require.NoError(t, rdb0.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err())
+ require.NoError(t, rdb1.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err())
+ require.NoError(t, rdb2.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err())
+
+ t.Run("MIGRATE - Fail to migrate slot because source server is changed to slave during migrating", func(t *testing.T) {
+ for i := 0; i < 10000; i++ {
+ require.NoError(t, rdb0.LPush(ctx, util.SlotTable[10], i).Err())
+ }
+ require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", "10", id2).Val())
+ requireMigrateState(t, rdb0, "10", "start")
+
+ // change source server to slave by set topology
+ clusterNodes := fmt.Sprintf("%s %s %d master - 0-10000\n", id1, srv1.Host(), srv1.Port())
+ clusterNodes += fmt.Sprintf("%s %s %d slave %s\n", id0, srv0.Host(), srv0.Port(), id1)
+ clusterNodes += fmt.Sprintf("%s %s %d master - 10001-16383", id2, srv2.Host(), srv2.Port())
+ require.NoError(t, rdb0.Do(ctx, "clusterx", "SETNODES", clusterNodes, "2").Err())
+ require.NoError(t, rdb1.Do(ctx, "clusterx", "SETNODES", clusterNodes, "2").Err())
+ require.NoError(t, rdb2.Do(ctx, "clusterx", "SETNODES", clusterNodes, "2").Err())
+ time.Sleep(time.Second)
+
+ // check destination importing status
+ requireImportState(t, rdb2, "10", "error")
+ })
+}
+
+func waitForMigrateState(t testing.TB, client *redis.Client, n, state string) {
+ require.Eventually(t, func() bool {
+ i := client.ClusterInfo(context.Background()).Val()
+ return strings.Contains(i, fmt.Sprintf("migrating_slot: %s", n)) &&
+ strings.Contains(i, fmt.Sprintf("migrating_state: %s", state))
+ }, 5*time.Second, 100*time.Millisecond)
+}
+
+func requireMigrateState(t testing.TB, client *redis.Client, n, state string) {
+ i := client.ClusterInfo(context.Background()).Val()
+ require.Contains(t, i, fmt.Sprintf("migrating_slot: %s", n))
+ require.Contains(t, i, fmt.Sprintf("migrating_state: %s", state))
+}
+
+func waitForImportSate(t testing.TB, client *redis.Client, n, state string) {
+ require.Eventually(t, func() bool {
+ i := client.ClusterInfo(context.Background()).Val()
+ return strings.Contains(i, fmt.Sprintf("importing_slot: %s", n)) &&
+ strings.Contains(i, fmt.Sprintf("import_state: %s", state))
+ }, 5*time.Second, 100*time.Millisecond)
+}
+
+func requireImportState(t testing.TB, client *redis.Client, n, state string) {
+ i := client.ClusterInfo(context.Background()).Val()
+ require.Contains(t, i, fmt.Sprintf("importing_slot: %s", n))
+ require.Contains(t, i, fmt.Sprintf("import_state: %s", state))
+}
diff --git a/tests/gocase/unit/type/zset/zset_test.go b/tests/gocase/unit/type/zset/zset_test.go
index d6ba6356..616387e1 100644
--- a/tests/gocase/unit/type/zset/zset_test.go
+++ b/tests/gocase/unit/type/zset/zset_test.go
@@ -29,11 +29,10 @@ import (
"strings"
"testing"
- "golang.org/x/exp/slices"
-
"github.com/apache/incubator-kvrocks/tests/gocase/util"
"github.com/go-redis/redis/v9"
"github.com/stretchr/testify/require"
+ "golang.org/x/exp/slices"
)
func createZset(rdb *redis.Client, ctx context.Context, key string, items []redis.Z) {
diff --git a/tests/tcl/tests/integration/slotmigrate.tcl b/tests/tcl/tests/integration/slotmigrate.tcl
index 7dd56346..e407917d 100644
--- a/tests/tcl/tests/integration/slotmigrate.tcl
+++ b/tests/tcl/tests/integration/slotmigrate.tcl
@@ -552,351 +552,3 @@ start_server {tags {"Src migration server"} overrides {cluster-enabled yes}} {
}
}
-
-start_server {tags {"Src migration server"} overrides {cluster-enabled yes}} {
- set r0 [srv 0 client]
- set node0_host [srv 0 host]
- set node0_port [srv 0 port]
- set node0_id "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00"
- $r0 clusterx setnodeid $node0_id
- start_server {tags {"Dst migration server"} overrides {cluster-enabled yes}} {
- set r1 [srv 0 client]
- set node1_host [srv 0 host]
- set node1_port [srv 0 port]
- set node1_pid [srv 0 pid]
- set node1_id "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01"
- $r1 clusterx setnodeid $node1_id
-
- set cluster_nodes "$node0_id $node0_host $node0_port master - 0-10000"
- set cluster_nodes "$cluster_nodes\n$node1_id $node1_host $node1_port master - 10001-16383"
- $r0 clusterx setnodes $cluster_nodes 1
- $r1 clusterx setnodes $cluster_nodes 1
-
- test {MIGRATE - Migrate slot with empty string key or value} {
- # Migrate slot 0
- $r0 set "" slot0
- set slot0_key [lindex $::CRC16_SLOT_TABLE 0]
- $r0 del $slot0_key
- $r0 set $slot0_key ""
- after 500
- set ret [$r0 clusterx migrate 0 $node1_id]
- assert { $ret == "OK"}
-
- # Check migration task
- wait_for_condition 50 100 {
- [string match "*0*success*" [$r0 cluster info]]
- } else {
- fail "Fail to migrate slot 0"
- }
-
- # Check migrated data
- assert {[$r1 get ""] eq "slot0"}
- assert {[$r1 get $slot0_key] eq {}}
- $r1 del $slot0_key
- }
-
- test {MIGRATE - Migrate binary key-value} {
- # Slot 1 key using hash tag
- set slot1_tag [lindex $::CRC16_SLOT_TABLE 1]
- set slot1_key "\x3a\x88{$slot1_tag}\x3d\xaa"
- set count 257
- for {set i 0} {$i < $count} {incr i} {
- $r0 lpush $slot1_key "\00\01"
- }
- set ret [$r0 clusterx migrate 1 $node1_id]
- assert {$ret == "OK"}
- set slot1_key_2 "\x49\x1f\x7f{$slot1_tag}\xaf"
- $r0 set $slot1_key_2 "\00\01"
- after 1000
- # Check if finish
- wait_for_condition 50 100 {
- [string match "*1*success*" [$r1 cluster info]]
- } else {
- fail "Slot 1 importing is not finished"
- }
- # Check result
- assert {[$r1 llen $slot1_key] == $count}
- assert {[$r1 lpop $slot1_key] == "\00\01"}
- assert {[$r1 get $slot1_key_2] == "\00\01"}
- }
-
- test {MIGRATE - Migrate empty slot} {
- # Clear data of src and dst server
- $r0 flushall
- $r1 flushall
- after 500
-
- # Migrate slot 2
- set ret [$r0 clusterx migrate 2 $node1_id]
- assert { $ret == "OK"}
- wait_for_condition 50 100 {
- [string match "*migrating_slot: 2*migrating_state: success*" [$r0 cluster info]]
- } else {
- fail "Fail to migrate slot 2"
- }
-
- # Check data of dst server
- catch {$r1 keys *} e
- assert_match {} $e
- }
-
- test {MIGRATE - Fail to migrate slot because destination server is killed while migrating} {
- set slot8_key [lindex $::CRC16_SLOT_TABLE 8]
- set count 20000
- for {set i 0} {$i < $count} {incr i} {
- $r0 lpush $slot8_key $i
- }
-
- # Migrate data
- set ret [$r0 clusterx migrate 8 $node1_id]
- assert {$ret == "OK"}
-
- # Check migrating start
- catch {[$r0 cluster info]} e
- assert_match {*migrating_slot: 8*start*} $e
-
- # Kill destination server itself
- exec kill -9 $node1_pid
- # Wait migration timeout
- after 1000
- # Can't success
- catch {[$r0 cluster info]} e
- assert_match {*migrating_slot: 8*migrating_state: fail*} $e
- }
- }
-}
-
-start_server {tags {"Source server will be changed to slave"} overrides {cluster-enabled yes}} {
- set r0 [srv 0 client]
- set node0_host [srv 0 host]
- set node0_port [srv 0 port]
- set node0_id "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00"
- $r0 clusterx SETNODEID $node0_id
- start_server {overrides {cluster-enabled yes}} {
- set r1 [srv 0 client]
- set node1_host [srv 0 host]
- set node1_port [srv 0 port]
- set node1_id "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01"
- $r1 clusterx SETNODEID $node1_id
- start_server {tags {"dst server"} overrides {cluster-enabled yes}} {
- set r2 [srv 0 client]
- set node2_host [srv 0 host]
- set node2_port [srv 0 port]
- set node2_id "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx02"
- $r2 clusterx SETNODEID $node2_id
-
- set cluster_nodes "$node0_id $node0_host $node0_port master - 0-10000"
- set cluster_nodes "$cluster_nodes\n$node1_id $node1_host $node1_port slave $node0_id"
- set cluster_nodes "$cluster_nodes\n$node2_id $node2_host $node2_port master - 10001-16383"
- $r0 clusterx SETNODES $cluster_nodes 1
- $r1 clusterx SETNODES $cluster_nodes 1
- $r2 clusterx SETNODES $cluster_nodes 1
-
- test {MIGRATE - Fail to migrate slot because source server is changed to slave during migrating} {
- set slot10_key [lindex $::CRC16_SLOT_TABLE 10]
- set count 10000
- for {set i 0} {$i < $count} {incr i} {
- $r0 lpush $slot10_key $i
- }
- # Start migrating
- set ret [$r0 clusterx migrate 10 $node2_id]
- assert {$ret == "OK"}
- catch {[$r0 cluster info]} e
- assert_match {*10*start*} $e
- # Change source server to slave by set topology
- set cluster_nodes "$node1_id $node1_host $node1_port master - 0-10000"
- set cluster_nodes "$cluster_nodes\n$node0_id $node0_host $node0_port slave $node1_id"
- set cluster_nodes "$cluster_nodes\n$node2_id $node2_host $node2_port master - 10001-16383"
- $r0 clusterx SETNODES $cluster_nodes 2
- $r1 clusterx SETNODES $cluster_nodes 2
- $r2 clusterx SETNODES $cluster_nodes 2
- after 1000
- # Check destination importing status
- catch {[$r2 cluster info]} e
- assert_match {*10*error*} $e
- }
-
- }
- }
-}
-
-start_server {tags {"Source server will be flushed"} overrides {cluster-enabled yes}} {
- set r0 [srv 0 client]
- set node0_host [srv 0 host]
- set node0_port [srv 0 port]
- set node0_pid [srv 0 pid]
- set node0_id "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00"
- $r0 clusterx SETNODEID $node0_id
- start_server {overrides {cluster-enabled yes}} {
- set r1 [srv 0 client]
- set node1_host [srv 0 host]
- set node1_port [srv 0 port]
- set node1_id "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01"
- $r1 clusterx SETNODEID $node1_id
-
- set cluster_nodes "$node0_id $node0_host $node0_port master - 0-10000"
- set cluster_nodes "$cluster_nodes\n$node1_id $node1_host $node1_port master - 10001-16383"
- $r0 clusterx SETNODES $cluster_nodes 1
- $r1 clusterx SETNODES $cluster_nodes 1
-
- test {MIGRATE - Fail to migrate slot because source server is flushed} {
- set slot11_key [lindex $::CRC16_SLOT_TABLE 11]
- set count 2000
- for {set i 0} {$i < $count} {incr i} {
- $r0 lpush $slot11_key $i
- }
- # Slow down migration speed
- $r0 config set migrate-speed 32
- catch {$r0 config get migrate-speed} e
- assert_match {*32*} $e
- # Migrate slot
- set ret [$r0 clusterx migrate 11 $node1_id]
- assert {$ret == "OK"}
- # Ensure migration started
- wait_for_condition 10 50 {
- [string match "*11*start*" [$r0 cluster info]]
- } else {
- fail "Fail to start migrating slot 11"
- }
- # Flush source server
- set ret [$r0 flushdb]
- assert {$ret == "OK"}
- after 1000
- wait_for_condition 10 100 {
- [string match "*11*fail*" [$r0 cluster info]]
- } else {
- fail "Fail to flush server while migrating slot 11"
- }
- }
-
- test {MIGRATE - Fail to migrate slot because source server is killed while migrating} {
- set slot20_key [lindex $::CRC16_SLOT_TABLE 20]
- set count 2000
- for {set i 0} {$i < $count} {incr i} {
- $r0 lpush $slot20_key $i
- }
-
- # Slow down migration speed
- $r0 config set migrate-speed 32
- catch {$r0 config get migrate-speed} e
- assert_match {*32*} $e
-
- # Migrate slot
- set ret [$r0 clusterx migrate 20 $node1_id]
- assert {$ret == "OK"}
- # Check key has been migrated successfully
- wait_for_condition 10 100 {
- [string match "*$slot20_key*" [$r1 keys *]]
- } else {
- fail "Fail to migrate slot 20"
- }
-
- # Kill source server
- exec kill -9 $node0_pid
- after 100
-
- # Check that migrated data have been deleted in dst server
- assert {[string match "*$slot20_key*" [$r1 keys *]] == 0}
- }
- }
-}
-
-start_server {tags {"Source server"} overrides {cluster-enabled yes}} {
- set r0 [srv 0 client]
- set node0_host [srv 0 host]
- set node0_port [srv 0 port]
- set node0_pid [srv 0 pid]
- set node0_id "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00"
- $r0 clusterx SETNODEID $node0_id
- start_server {overrides {cluster-enabled yes}} {
- set r1 [srv 0 client]
- set node1_host [srv 0 host]
- set node1_port [srv 0 port]
- set node1_id "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01"
- $r1 clusterx SETNODEID $node1_id
-
- set cluster_nodes "$node0_id $node0_host $node0_port master - 0-16383"
- set cluster_nodes "$cluster_nodes\n$node1_id $node1_host $node1_port master -"
- $r0 clusterx SETNODES $cluster_nodes 1
- $r1 clusterx SETNODES $cluster_nodes 1
-
- test {MIGRATE - Migrate slot to newly added node} {
- # Construct key
- set slot21_key [lindex $::CRC16_SLOT_TABLE 21]
- $r0 del $slot21_key
-
- # Write to newly added node1 will be moved
- catch {$r1 set $slot21_key foobar} ret
- assert_match {*MOVED*} $ret
-
- # Write data
- set count 100
- for {set i 0} {$i < $count} {incr i} {
- $r0 lpush $slot21_key $i
- }
- # Migrate slot 21 from node0 to node1
- set ret [$r0 clusterx migrate 21 $node1_id]
- assert {$ret == "OK"}
- # Check if finished
- wait_for_condition 50 1000 {
- [string match "*migrating_slot: 21*success*" [$r0 cluster info]]
- } else {
- fail "Slot 21 importing is not finished"
- }
- # Check data
- assert {[$r1 llen $slot21_key] == $count}
-
- # Write the migrated slot on source server
- set slot21_key1 "{$slot21_key}_1"
- catch {$r0 set $slot21_key1 slot21_value1} e
- assert_match {*MOVED*} $e
-
- # Write the migrated slot on destination server
- assert {[$r1 set $slot21_key1 slot21_value1] == "OK"}
- }
-
- test {MIGRATE - Auth before migrating slot} {
- $r1 config set requirepass password
- set slot22_key [lindex $::CRC16_SLOT_TABLE 22]
- set count 100
- for {set i 0} {$i < $count} {incr i} {
- $r0 lpush $slot22_key $i
- }
-
- # Migrating slot will fail if no auth
- set ret [$r0 clusterx migrate 22 $node1_id]
- assert {$ret == "OK"}
- wait_for_condition 50 1000 {
- [string match "*migrating_slot: 22*fail*" [$r0 cluster info]]
- } else {
- fail "Slot 22 importing is not finished"
- }
- catch {[$r1 exists $slot22_key]} e
- assert_match {*MOVED*} $e
-
- # Migrating slot will fail if auth with wrong password
- $r0 config set requirepass pass
- set ret [$r0 clusterx migrate 22 $node1_id]
- assert {$ret == "OK"}
- wait_for_condition 50 1000 {
- [string match "*migrating_slot: 22*fail*" [$r0 cluster info]]
- } else {
- fail "Slot 22 importing is not finished"
- }
- catch {[$r1 exists $slot22_key]} e
- assert_match {*MOVED*} $e
-
- # Migrating slot will succeed if auth with right password
- $r0 config set requirepass password
- set ret [$r0 clusterx migrate 22 $node1_id]
- assert {$ret == "OK"}
- wait_for_condition 50 1000 {
- [string match "*migrating_slot: 22*success*" [$r0 cluster info]]
- } else {
- fail "Slot 22 importing is not finished"
- }
- assert {[$r1 exists $slot22_key] == 1}
- assert {[$r1 llen $slot22_key] == $count}
- }
- }
-}