You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@kvrocks.apache.org by GitBox <gi...@apache.org> on 2022/10/17 13:34:18 UTC

[GitHub] [incubator-kvrocks] git-hulk commented on a diff in pull request #1005: test: migrate part of slotmigrate tests

git-hulk commented on code in PR #1005:
URL: https://github.com/apache/incubator-kvrocks/pull/1005#discussion_r997068463


##########
tests/gocase/integration/slotmigrate/slotmigrate_test.go:
##########
@@ -106,3 +108,157 @@ func TestSlotMigrateServerFailed(t *testing.T) {
 		require.Contains(t, i, "migrating_state: fail")
 	})
 }
+
+func TestSlotMigrateDestServerKilledAgain(t *testing.T) {
+	ctx := context.Background()
+
+	srv0 := util.StartServer(t, map[string]string{"cluster-enabled": "yes"})
+	defer func() { srv0.Close() }()
+	rdb0 := srv0.NewClient()
+	defer func() { require.NoError(t, rdb0.Close()) }()
+	id0 := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00"
+	require.NoError(t, rdb0.Do(ctx, "clusterx", "SETNODEID", id0).Err())
+
+	srv1 := util.StartServer(t, map[string]string{"cluster-enabled": "yes"})
+	srv1Alive := true
+	defer func() {
+		if srv1Alive {
+			srv1.Close()
+		}
+	}()
+	rdb1 := srv1.NewClient()
+	defer func() { require.NoError(t, rdb1.Close()) }()
+	id1 := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01"
+	require.NoError(t, rdb1.Do(ctx, "clusterx", "SETNODEID", id1).Err())
+
+	clusterNodes := fmt.Sprintf("%s %s %d master - 0-10000\n", id0, srv0.Host(), srv0.Port())
+	clusterNodes += fmt.Sprintf("%s %s %d master - 10001-16383", id1, srv1.Host(), srv1.Port())
+	require.NoError(t, rdb0.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err())
+	require.NoError(t, rdb1.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err())
+
+	t.Run("MIGRATE - Migrate slot with empty string key or value", func(t *testing.T) {
+		require.NoError(t, rdb0.Set(ctx, "", "slot0", 0).Err())
+		require.NoError(t, rdb0.Del(ctx, util.SlotTable[0]).Err())
+		require.NoError(t, rdb0.Set(ctx, util.SlotTable[0], "", 0).Err())
+		time.Sleep(500 * time.Millisecond)
+		require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", "0", id1).Val())
+		require.Eventually(t, func() bool {
+			i := rdb0.ClusterInfo(ctx).Val()
+			return strings.Contains(i, "migrating_slot: 0") && strings.Contains(i, "migrating_state: success")
+		}, 5*time.Second, 100*time.Millisecond)
+		require.Equal(t, "slot0", rdb1.Get(ctx, "").Val())
+		require.Equal(t, "", rdb1.Get(ctx, util.SlotTable[0]).Val())
+		require.NoError(t, rdb1.Del(ctx, util.SlotTable[0]).Err())
+	})
+
+	t.Run("MIGRATE - Migrate binary key-value", func(t *testing.T) {
+		k1 := fmt.Sprintf("\x3a\x88{%s}\x3d\xaa", util.SlotTable[1])
+		cnt := 257
+		for i := 0; i < cnt; i++ {
+			require.NoError(t, rdb0.LPush(ctx, k1, "\0000\0001").Err())
+		}
+		require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", "1", id1).Val())
+		k2 := fmt.Sprintf("\x49\x1f\x7f{%s}\xaf", util.SlotTable[1])
+		require.NoError(t, rdb0.Set(ctx, k2, "\0000\0001", 0).Err())
+		time.Sleep(time.Second)
+		require.Eventually(t, func() bool {
+			i := rdb1.ClusterInfo(ctx).Val()
+			return strings.Contains(i, "importing_slot: 1") && strings.Contains(i, "import_state: success")
+		}, 5*time.Second, 100*time.Millisecond)
+		require.EqualValues(t, cnt, rdb1.LLen(ctx, k1).Val())
+		require.Equal(t, "\0000\0001", rdb1.LPop(ctx, k1).Val())
+		require.Equal(t, "\0000\0001", rdb1.Get(ctx, k2).Val())
+	})
+
+	t.Run("MIGRATE - Migrate empty slot", func(t *testing.T) {
+		require.NoError(t, rdb0.FlushDB(ctx).Err())
+		require.NoError(t, rdb1.FlushDB(ctx).Err())
+		time.Sleep(500 * time.Millisecond)
+		require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", "2", id1).Val())
+		require.Eventually(t, func() bool {
+			i := rdb0.ClusterInfo(ctx).Val()
+			return strings.Contains(i, "migrating_slot: 2") && strings.Contains(i, "migrating_state: success")
+		}, 5*time.Second, 100*time.Millisecond)
+		require.NoError(t, rdb1.Keys(ctx, "*").Err())
+	})
+
+	t.Run("MIGRATE - Fail to migrate slot because destination server is killed while migrating", func(t *testing.T) {
+		for i := 0; i < 20000; i++ {

Review Comment:
   The original count is 2000, are we expected to change it?



##########
tests/gocase/integration/slotmigrate/slotmigrate_test.go:
##########
@@ -106,3 +108,157 @@ func TestSlotMigrateServerFailed(t *testing.T) {
 		require.Contains(t, i, "migrating_state: fail")
 	})
 }
+
+func TestSlotMigrateDestServerKilledAgain(t *testing.T) {
+	ctx := context.Background()
+
+	srv0 := util.StartServer(t, map[string]string{"cluster-enabled": "yes"})
+	defer func() { srv0.Close() }()
+	rdb0 := srv0.NewClient()
+	defer func() { require.NoError(t, rdb0.Close()) }()
+	id0 := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00"
+	require.NoError(t, rdb0.Do(ctx, "clusterx", "SETNODEID", id0).Err())
+
+	srv1 := util.StartServer(t, map[string]string{"cluster-enabled": "yes"})
+	srv1Alive := true
+	defer func() {
+		if srv1Alive {
+			srv1.Close()
+		}
+	}()
+	rdb1 := srv1.NewClient()
+	defer func() { require.NoError(t, rdb1.Close()) }()
+	id1 := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01"
+	require.NoError(t, rdb1.Do(ctx, "clusterx", "SETNODEID", id1).Err())
+
+	clusterNodes := fmt.Sprintf("%s %s %d master - 0-10000\n", id0, srv0.Host(), srv0.Port())
+	clusterNodes += fmt.Sprintf("%s %s %d master - 10001-16383", id1, srv1.Host(), srv1.Port())
+	require.NoError(t, rdb0.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err())
+	require.NoError(t, rdb1.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err())
+
+	t.Run("MIGRATE - Migrate slot with empty string key or value", func(t *testing.T) {
+		require.NoError(t, rdb0.Set(ctx, "", "slot0", 0).Err())
+		require.NoError(t, rdb0.Del(ctx, util.SlotTable[0]).Err())
+		require.NoError(t, rdb0.Set(ctx, util.SlotTable[0], "", 0).Err())
+		time.Sleep(500 * time.Millisecond)
+		require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", "0", id1).Val())
+		require.Eventually(t, func() bool {
+			i := rdb0.ClusterInfo(ctx).Val()
+			return strings.Contains(i, "migrating_slot: 0") && strings.Contains(i, "migrating_state: success")
+		}, 5*time.Second, 100*time.Millisecond)
+		require.Equal(t, "slot0", rdb1.Get(ctx, "").Val())
+		require.Equal(t, "", rdb1.Get(ctx, util.SlotTable[0]).Val())
+		require.NoError(t, rdb1.Del(ctx, util.SlotTable[0]).Err())
+	})
+
+	t.Run("MIGRATE - Migrate binary key-value", func(t *testing.T) {
+		k1 := fmt.Sprintf("\x3a\x88{%s}\x3d\xaa", util.SlotTable[1])
+		cnt := 257
+		for i := 0; i < cnt; i++ {
+			require.NoError(t, rdb0.LPush(ctx, k1, "\0000\0001").Err())
+		}
+		require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", "1", id1).Val())
+		k2 := fmt.Sprintf("\x49\x1f\x7f{%s}\xaf", util.SlotTable[1])
+		require.NoError(t, rdb0.Set(ctx, k2, "\0000\0001", 0).Err())
+		time.Sleep(time.Second)
+		require.Eventually(t, func() bool {
+			i := rdb1.ClusterInfo(ctx).Val()
+			return strings.Contains(i, "importing_slot: 1") && strings.Contains(i, "import_state: success")
+		}, 5*time.Second, 100*time.Millisecond)
+		require.EqualValues(t, cnt, rdb1.LLen(ctx, k1).Val())
+		require.Equal(t, "\0000\0001", rdb1.LPop(ctx, k1).Val())
+		require.Equal(t, "\0000\0001", rdb1.Get(ctx, k2).Val())
+	})
+
+	t.Run("MIGRATE - Migrate empty slot", func(t *testing.T) {
+		require.NoError(t, rdb0.FlushDB(ctx).Err())
+		require.NoError(t, rdb1.FlushDB(ctx).Err())
+		time.Sleep(500 * time.Millisecond)
+		require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", "2", id1).Val())
+		require.Eventually(t, func() bool {
+			i := rdb0.ClusterInfo(ctx).Val()
+			return strings.Contains(i, "migrating_slot: 2") && strings.Contains(i, "migrating_state: success")
+		}, 5*time.Second, 100*time.Millisecond)
+		require.NoError(t, rdb1.Keys(ctx, "*").Err())
+	})
+
+	t.Run("MIGRATE - Fail to migrate slot because destination server is killed while migrating", func(t *testing.T) {
+		for i := 0; i < 20000; i++ {
+			require.NoError(t, rdb0.LPush(ctx, util.SlotTable[8], i).Err())
+		}
+		require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", "8", id1).Val())
+		i := rdb0.ClusterInfo(ctx).Val()
+		require.Contains(t, i, "migrating_slot: 8")
+		require.Contains(t, i, "migrating_state: start")
+		srv1.Close()
+		srv1Alive = false
+		time.Sleep(time.Second)
+		i = rdb0.ClusterInfo(ctx).Val()
+		require.Contains(t, i, "migrating_slot: 8")
+		require.Contains(t, i, "migrating_state: fail")
+	})
+}
+
+func TestSlotMigrateSourceServerFlushedOrKilled(t *testing.T) {
+	ctx := context.Background()
+
+	srv0 := util.StartServer(t, map[string]string{"cluster-enabled": "yes"})
+	srv0Alive := true
+	defer func() {
+		if srv0Alive {
+			srv0.Close()
+		}
+	}()
+	rdb0 := srv0.NewClient()
+	defer func() { require.NoError(t, rdb0.Close()) }()
+	id0 := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00"
+	require.NoError(t, rdb0.Do(ctx, "clusterx", "SETNODEID", id0).Err())
+
+	srv1 := util.StartServer(t, map[string]string{"cluster-enabled": "yes"})
+	defer func() { srv1.Close() }()
+	rdb1 := srv1.NewClient()
+	defer func() { require.NoError(t, rdb1.Close()) }()
+	id1 := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01"
+	require.NoError(t, rdb1.Do(ctx, "clusterx", "SETNODEID", id1).Err())
+
+	clusterNodes := fmt.Sprintf("%s %s %d master - 0-10000\n", id0, srv0.Host(), srv0.Port())
+	clusterNodes += fmt.Sprintf("%s %s %d master - 10001-16383", id1, srv1.Host(), srv1.Port())
+	require.NoError(t, rdb0.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err())
+	require.NoError(t, rdb1.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err())
+
+	t.Run("MIGRATE - Fail to migrate slot because source server is flushed", func(t *testing.T) {
+		for i := 0; i < 20000; i++ {

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org