You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kvrocks.apache.org by ti...@apache.org on 2022/10/13 23:45:38 UTC
[incubator-kvrocks] branch unstable updated: test: migrate xstream ids (#986)
This is an automated email from the ASF dual-hosted git repository.
tison pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/incubator-kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 84b8c28 test: migrate xstream ids (#986)
84b8c28 is described below
commit 84b8c28a17346e5f0ef5fac300c9b1c0944a4310
Author: tison <wa...@gmail.com>
AuthorDate: Fri Oct 14 07:45:33 2022 +0800
test: migrate xstream ids (#986)
Signed-off-by: tison <wa...@gmail.com>
---
tests/gocase/unit/type/stream/stream_test.go | 97 ++++++++++++++++++++++++++++
tests/tcl/tests/unit/type/stream.tcl | 85 ------------------------
2 files changed, 97 insertions(+), 85 deletions(-)
diff --git a/tests/gocase/unit/type/stream/stream_test.go b/tests/gocase/unit/type/stream/stream_test.go
index 996cd96..9061bbc 100644
--- a/tests/gocase/unit/type/stream/stream_test.go
+++ b/tests/gocase/unit/type/stream/stream_test.go
@@ -29,6 +29,103 @@ import (
"github.com/stretchr/testify/require"
)
+func TestStream(t *testing.T) {
+ srv := util.StartServer(t, map[string]string{})
+ defer srv.Close()
+ ctx := context.Background()
+ rdb := srv.NewClient()
+ defer func() { require.NoError(t, rdb.Close()) }()
+
+ t.Run("XADD wrong number of args", func(t *testing.T) {
+ require.ErrorContains(t, rdb.Do(ctx, "XADD", "mystream").Err(), "wrong number of arguments")
+ require.ErrorContains(t, rdb.Do(ctx, "XADD", "mystream", "*").Err(), "wrong number of arguments")
+ require.ErrorContains(t, rdb.Do(ctx, "XADD", "mystream", "*", "field").Err(), "wrong number of arguments")
+ })
+
+ t.Run("XADD can add entries into a stream that XRANGE can fetch", func(t *testing.T) {
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: "mystream", Values: []string{"item", "1", "value", "a"}}).Err())
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: "mystream", Values: []string{"item", "2", "value", "b"}}).Err())
+
+ require.EqualValues(t, 2, rdb.XLen(ctx, "mystream").Val())
+
+ items := rdb.XRange(ctx, "mystream", "-", "+").Val()
+ require.Len(t, items, 2)
+ require.EqualValues(t, map[string]interface{}{"item": "1", "value": "a"}, items[0].Values)
+ require.EqualValues(t, map[string]interface{}{"item": "2", "value": "b"}, items[1].Values)
+ })
+
+ t.Run("XADD stores entry value with respect to case sensitivity", func(t *testing.T) {
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: "myStream", Values: []string{"iTeM", "1", "vAluE", "a"}}).Err())
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: "myStream", Values: []string{"ItEm", "2", "VaLUe", "B"}}).Err())
+
+ require.EqualValues(t, 2, rdb.XLen(ctx, "myStream").Val())
+
+ items := rdb.XRange(ctx, "myStream", "-", "+").Val()
+ require.Len(t, items, 2)
+ require.EqualValues(t, map[string]interface{}{"iTeM": "1", "vAluE": "a"}, items[0].Values)
+ require.EqualValues(t, map[string]interface{}{"ItEm": "2", "VaLUe": "B"}, items[1].Values)
+ })
+
+ t.Run("XADD IDs are incremental", func(t *testing.T) {
+ x1 := rdb.XAdd(ctx, &redis.XAddArgs{Stream: "mystream", Values: []string{"item", "1", "value", "a"}}).Val()
+ x2 := rdb.XAdd(ctx, &redis.XAddArgs{Stream: "mystream", Values: []string{"item", "2", "value", "b"}}).Val()
+ x3 := rdb.XAdd(ctx, &redis.XAddArgs{Stream: "mystream", Values: []string{"item", "3", "value", "c"}}).Val()
+ require.Less(t, x1, x2)
+ require.Less(t, x2, x3)
+ })
+
+ t.Run("XADD IDs are incremental when ms is the same as well", func(t *testing.T) {
+ require.NoError(t, rdb.Do(ctx, "MULTI").Err())
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: "mystream", Values: []string{"item", "1", "value", "a"}}).Err())
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: "mystream", Values: []string{"item", "2", "value", "b"}}).Err())
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: "mystream", Values: []string{"item", "3", "value", "c"}}).Err())
+ v := rdb.Do(ctx, "EXEC").Val().([]interface{})
+ require.Len(t, v, 3)
+ require.Less(t, v[0], v[1])
+ require.Less(t, v[1], v[2])
+ })
+
+ t.Run("XADD IDs correctly report an error when overflowing", func(t *testing.T) {
+ require.NoError(t, rdb.Del(ctx, "mystream").Err())
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: "mystream", ID: "18446744073709551615-18446744073709551615", Values: []string{"a", "b"}}).Err())
+ require.ErrorContains(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: "mystream", ID: "*", Values: []string{"c", "d"}}).Err(), "ERR")
+ })
+
+ t.Run("XADD auto-generated sequence is incremented for last ID", func(t *testing.T) {
+ require.NoError(t, rdb.Del(ctx, "mystream").Err())
+ x1 := rdb.XAdd(ctx, &redis.XAddArgs{Stream: "mystream", ID: "123-456", Values: []string{"item", "1", "value", "a"}}).Val()
+ x2 := rdb.XAdd(ctx, &redis.XAddArgs{Stream: "mystream", ID: "123-*", Values: []string{"item", "2", "value", "b"}}).Val()
+ require.Equal(t, "123-457", x2)
+ require.Less(t, x1, x2)
+ })
+
+ t.Run("XADD auto-generated sequence is zero for future timestamp ID", func(t *testing.T) {
+ require.NoError(t, rdb.Del(ctx, "mystream").Err())
+ x1 := rdb.XAdd(ctx, &redis.XAddArgs{Stream: "mystream", ID: "123-456", Values: []string{"item", "1", "value", "a"}}).Val()
+ x2 := rdb.XAdd(ctx, &redis.XAddArgs{Stream: "mystream", ID: "789-*", Values: []string{"item", "2", "value", "b"}}).Val()
+ require.Equal(t, "789-0", x2)
+ require.Less(t, x1, x2)
+ })
+
+ t.Run("XADD auto-generated sequence can't be smaller than last ID", func(t *testing.T) {
+ require.NoError(t, rdb.Del(ctx, "mystream").Err())
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: "mystream", ID: "123-456", Values: []string{"item", "1", "value", "a"}}).Err())
+ require.ErrorContains(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: "mystream", ID: "42-*", Values: []string{"item", "2", "value", "b"}}).Err(), "ERR")
+ })
+
+ t.Run("XADD auto-generated sequence can't overflow", func(t *testing.T) {
+ require.NoError(t, rdb.Del(ctx, "mystream").Err())
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: "mystream", ID: "1-18446744073709551615", Values: []string{"a", "b"}}).Err())
+ require.ErrorContains(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: "mystream", ID: "1-*", Values: []string{"c", "d"}}).Err(), "ERR")
+ })
+
+ t.Run("XADD 0-* should succeed", func(t *testing.T) {
+ require.NoError(t, rdb.Del(ctx, "mystream").Err())
+ x := rdb.XAdd(ctx, &redis.XAddArgs{Stream: "mystream", ID: "0-*", Values: []string{"a", "b"}}).Val()
+ require.Equal(t, "0-1", x)
+ })
+}
+
func TestStreamOffset(t *testing.T) {
srv := util.StartServer(t, map[string]string{})
defer srv.Close()
diff --git a/tests/tcl/tests/unit/type/stream.tcl b/tests/tcl/tests/unit/type/stream.tcl
index a6c121c..629f65f 100644
--- a/tests/tcl/tests/unit/type/stream.tcl
+++ b/tests/tcl/tests/unit/type/stream.tcl
@@ -75,91 +75,6 @@ set content {} ;# Will be populated with Tcl side copy of the stream content.
start_server {
tags {"stream"}
} {
- test "XADD wrong number of args" {
- assert_error {*wrong number of arguments*} {r XADD mystream}
- assert_error {*wrong number of arguments*} {r XADD mystream *}
- assert_error {*wrong number of arguments*} {r XADD mystream * field}
- }
-
- test {XADD can add entries into a stream that XRANGE can fetch} {
- r XADD mystream * item 1 value a
- r XADD mystream * item 2 value b
- assert_equal 2 [r XLEN mystream]
- set items [r XRANGE mystream - +]
- assert_equal [lindex $items 0 1] {item 1 value a}
- assert_equal [lindex $items 1 1] {item 2 value b}
- }
-
- test {XADD stores entry value with respect to case sensitivity } {
- r XADD myStream * iTeM 1 vAluE a
- r XADD myStream * ItEm 2 VaLUe B
- assert_equal 2 [r XLEN myStream]
- set items [r XRANGE myStream - +]
- assert_equal [lindex $items 0 1] {iTeM 1 vAluE a}
- assert_equal [lindex $items 1 1] {ItEm 2 VaLUe B}
- }
-
- test {XADD IDs are incremental} {
- set id1 [r XADD mystream * item 1 value a]
- set id2 [r XADD mystream * item 2 value b]
- set id3 [r XADD mystream * item 3 value c]
- assert {[streamCompareID $id1 $id2] == -1}
- assert {[streamCompareID $id2 $id3] == -1}
- }
-
- test {XADD IDs are incremental when ms is the same as well} {
- r multi
- r XADD mystream * item 1 value a
- r XADD mystream * item 2 value b
- r XADD mystream * item 3 value c
- lassign [r exec] id1 id2 id3
- assert {[streamCompareID $id1 $id2] == -1}
- assert {[streamCompareID $id2 $id3] == -1}
- }
-
- test {XADD IDs correctly report an error when overflowing} {
- r DEL mystream
- r xadd mystream 18446744073709551615-18446744073709551615 a b
- assert_error ERR* {r xadd mystream * c d}
- }
-
- test {XADD auto-generated sequence is incremented for last ID} {
- r DEL mystream
- set id1 [r XADD mystream 123-456 item 1 value a]
- set id2 [r XADD mystream 123-* item 2 value b]
- lassign [split $id2 -] _ seq
- assert {$seq == 457}
- assert {[streamCompareID $id1 $id2] == -1}
- }
-
- test {XADD auto-generated sequence is zero for future timestamp ID} {
- r DEL mystream
- set id1 [r XADD mystream 123-456 item 1 value a]
- set id2 [r XADD mystream 789-* item 2 value b]
- lassign [split $id2 -] _ seq
- assert {$seq == 0}
- assert {[streamCompareID $id1 $id2] == -1}
- }
-
- test {XADD auto-generated sequence can't be smaller than last ID} {
- r DEL mystream
- r XADD mystream 123-456 item 1 value a
- assert_error ERR* {r XADD mystream 42-* item 2 value b}
- }
-
- test {XADD auto-generated sequence can't overflow} {
- r DEL mystream
- r xadd mystream 1-18446744073709551615 a b
- assert_error ERR* {r xadd mystream 1-* c d}
- }
-
- test {XADD 0-* should succeed} {
- r DEL mystream
- set id [r xadd mystream 0-* a b]
- lassign [split $id -] _ seq
- assert {$seq == 1}
- }
-
test {XADD with MAXLEN option} {
r DEL mystream
for {set j 0} {$j < 1000} {incr j} {