You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@kvrocks.apache.org by GitBox <gi...@apache.org> on 2022/10/13 04:43:04 UTC

[GitHub] [incubator-kvrocks] tisonkun opened a new pull request, #982: Move TCL test unit/pubsub to Go case

tisonkun opened a new pull request, #982:
URL: https://github.com/apache/incubator-kvrocks/pull/982

   This closes #889.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-kvrocks] vmihailenco commented on a diff in pull request #982: Move TCL test unit/pubsub to Go case

Posted by GitBox <gi...@apache.org>.
vmihailenco commented on code in PR #982:
URL: https://github.com/apache/incubator-kvrocks/pull/982#discussion_r994426236


##########
tests/gocase/unit/pubsub/pubsub_test.go:
##########
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package pubsub
+
+import (
+	"context"
+	"fmt"
+	"testing"
+
+	"github.com/apache/incubator-kvrocks/tests/gocase/util"
+	"github.com/go-redis/redis/v9"
+	"github.com/stretchr/testify/require"
+)
+
+func receiveType[T any](t *testing.T, pubsub *redis.PubSub, typ T) T {
+	msg, err := pubsub.Receive(context.Background())
+	require.NoError(t, err)
+	require.IsType(t, typ, msg)
+	return msg.(T)
+}
+
+func TestPubSub(t *testing.T) {
+	srv := util.StartServer(t, map[string]string{})
+	defer srv.Close()
+
+	ctx := context.Background()
+	rdb := srv.NewClient()
+	defer func() { require.NoError(t, rdb.Close()) }()
+
+	t.Run("PUBLISH/SUBSCRIBE PING", func(t *testing.T) {
+		pubsub := rdb.Subscribe(ctx, "somechannel")
+		receiveType(t, pubsub, &redis.Subscription{})
+		require.NoError(t, pubsub.Ping(ctx))
+		require.NoError(t, pubsub.Ping(ctx))
+		require.NoError(t, pubsub.Unsubscribe(ctx, "somechannel"))
+		require.Equal(t, "PONG", rdb.Ping(ctx).Val())
+		receiveType(t, pubsub, &redis.Pong{})
+		receiveType(t, pubsub, &redis.Pong{})
+	})
+
+	t.Run("PUBLISH/SUBSCRIBE basics", func(t *testing.T) {
+		// subscribe to two channels
+		pubsub := rdb.Subscribe(ctx, "chan1", "chan2")
+		require.EqualValues(t, 1, receiveType(t, pubsub, &redis.Subscription{}).Count)
+		require.EqualValues(t, 2, receiveType(t, pubsub, &redis.Subscription{}).Count)
+
+		require.EqualValues(t, 1, rdb.Publish(ctx, "chan1", "hello").Val())
+		require.EqualValues(t, 1, rdb.Publish(ctx, "chan2", "world").Val())
+
+		msg := receiveType(t, pubsub, &redis.Message{})
+		require.Equal(t, "chan1", msg.Channel)
+		require.Equal(t, "hello", msg.Payload)
+		msg = receiveType(t, pubsub, &redis.Message{})
+		require.Equal(t, "chan2", msg.Channel)
+		require.Equal(t, "world", msg.Payload)
+
+		// unsubscribe from one of the channels
+		require.NoError(t, pubsub.Unsubscribe(ctx, "chan1"))
+		require.EqualValues(t, 0, rdb.Publish(ctx, "chan1", "hello").Val())
+		require.EqualValues(t, 1, rdb.Publish(ctx, "chan2", "world").Val())
+
+		// unsubscribe from the remaining channel
+		require.NoError(t, pubsub.Unsubscribe(ctx, "chan2"))
+		require.EqualValues(t, 0, rdb.Publish(ctx, "chan1", "hello").Val())
+		require.EqualValues(t, 0, rdb.Publish(ctx, "chan2", "world").Val())
+	})
+
+	t.Run("PUBLISH/SUBSCRIBE with two clients", func(t *testing.T) {
+		c1 := srv.NewClient()
+		defer func() { require.NoError(t, c1.Close()) }()
+		c2 := srv.NewClient()
+		defer func() { require.NoError(t, c2.Close()) }()
+
+		p1 := c1.Subscribe(ctx, "chan1")
+		p2 := c2.Subscribe(ctx, "chan1")
+		require.EqualValues(t, 1, receiveType(t, p1, &redis.Subscription{}).Count)
+		require.EqualValues(t, 1, receiveType(t, p2, &redis.Subscription{}).Count)
+
+		require.EqualValues(t, 2, rdb.Publish(ctx, "chan1", "hello").Val())
+		msg := receiveType(t, p1, &redis.Message{})
+		require.Equal(t, "chan1", msg.Channel)
+		require.Equal(t, "hello", msg.Payload)
+		msg = receiveType(t, p2, &redis.Message{})
+		require.Equal(t, "chan1", msg.Channel)
+		require.Equal(t, "hello", msg.Payload)
+	})
+
+	t.Run("PUBLISH/SUBSCRIBE after UNSUBSCRIBE without arguments", func(t *testing.T) {
+		pubsub := rdb.Subscribe(ctx, "chan1", "chan2", "chan3")
+		require.EqualValues(t, 1, receiveType(t, pubsub, &redis.Subscription{}).Count)
+		require.EqualValues(t, 2, receiveType(t, pubsub, &redis.Subscription{}).Count)
+		require.EqualValues(t, 3, receiveType(t, pubsub, &redis.Subscription{}).Count)
+
+		require.NoError(t, pubsub.Unsubscribe(ctx))

Review Comment:
   @tisonkun looks like you need to add channels here - "chan1", "chan2", "chan3". Not sure why the test is not failing earlier though and you still receive `unsubscribe` events... Perhaps some magic by kvrocks? I wonder if Redis behaves the same.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-kvrocks] tisonkun commented on a diff in pull request #982: Move TCL test unit/pubsub to Go case

Posted by GitBox <gi...@apache.org>.
tisonkun commented on code in PR #982:
URL: https://github.com/apache/incubator-kvrocks/pull/982#discussion_r994410204


##########
tests/gocase/unit/pubsub/pubsub_test.go:
##########
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package pubsub
+
+import (
+	"context"
+	"fmt"
+	"testing"
+
+	"github.com/apache/incubator-kvrocks/tests/gocase/util"
+	"github.com/go-redis/redis/v9"
+	"github.com/stretchr/testify/require"
+)
+
+func receiveType[T any](t *testing.T, pubsub *redis.PubSub, typ T) T {
+	msg, err := pubsub.Receive(context.Background())
+	require.NoError(t, err)
+	require.IsType(t, typ, msg)
+	return msg.(T)
+}
+
+func TestPubSub(t *testing.T) {
+	srv := util.StartServer(t, map[string]string{})
+	defer srv.Close()
+
+	ctx := context.Background()
+	rdb := srv.NewClient()
+	defer func() { require.NoError(t, rdb.Close()) }()
+
+	t.Run("PUBLISH/SUBSCRIBE PING", func(t *testing.T) {
+		pubsub := rdb.Subscribe(ctx, "somechannel")
+		receiveType(t, pubsub, &redis.Subscription{})
+		require.NoError(t, pubsub.Ping(ctx))
+		require.NoError(t, pubsub.Ping(ctx))
+		require.NoError(t, pubsub.Unsubscribe(ctx, "somechannel"))
+		require.Equal(t, "PONG", rdb.Ping(ctx).Val())
+		receiveType(t, pubsub, &redis.Pong{})
+		receiveType(t, pubsub, &redis.Pong{})
+	})
+
+	t.Run("PUBLISH/SUBSCRIBE basics", func(t *testing.T) {
+		// subscribe to two channels
+		pubsub := rdb.Subscribe(ctx, "chan1", "chan2")
+		require.EqualValues(t, 1, receiveType(t, pubsub, &redis.Subscription{}).Count)
+		require.EqualValues(t, 2, receiveType(t, pubsub, &redis.Subscription{}).Count)
+
+		require.EqualValues(t, 1, rdb.Publish(ctx, "chan1", "hello").Val())
+		require.EqualValues(t, 1, rdb.Publish(ctx, "chan2", "world").Val())
+
+		msg := receiveType(t, pubsub, &redis.Message{})
+		require.Equal(t, "chan1", msg.Channel)
+		require.Equal(t, "hello", msg.Payload)
+		msg = receiveType(t, pubsub, &redis.Message{})
+		require.Equal(t, "chan2", msg.Channel)
+		require.Equal(t, "world", msg.Payload)
+
+		// unsubscribe from one of the channels
+		require.NoError(t, pubsub.Unsubscribe(ctx, "chan1"))
+		require.EqualValues(t, 0, rdb.Publish(ctx, "chan1", "hello").Val())
+		require.EqualValues(t, 1, rdb.Publish(ctx, "chan2", "world").Val())
+
+		// unsubscribe from the remaining channel
+		require.NoError(t, pubsub.Unsubscribe(ctx, "chan2"))
+		require.EqualValues(t, 0, rdb.Publish(ctx, "chan1", "hello").Val())
+		require.EqualValues(t, 0, rdb.Publish(ctx, "chan2", "world").Val())
+	})
+
+	t.Run("PUBLISH/SUBSCRIBE with two clients", func(t *testing.T) {
+		c1 := srv.NewClient()
+		defer func() { require.NoError(t, c1.Close()) }()
+		c2 := srv.NewClient()
+		defer func() { require.NoError(t, c2.Close()) }()
+
+		p1 := c1.Subscribe(ctx, "chan1")
+		p2 := c2.Subscribe(ctx, "chan1")
+		require.EqualValues(t, 1, receiveType(t, p1, &redis.Subscription{}).Count)
+		require.EqualValues(t, 1, receiveType(t, p2, &redis.Subscription{}).Count)
+
+		require.EqualValues(t, 2, rdb.Publish(ctx, "chan1", "hello").Val())
+		msg := receiveType(t, p1, &redis.Message{})
+		require.Equal(t, "chan1", msg.Channel)
+		require.Equal(t, "hello", msg.Payload)
+		msg = receiveType(t, p2, &redis.Message{})
+		require.Equal(t, "chan1", msg.Channel)
+		require.Equal(t, "hello", msg.Payload)
+	})
+
+	t.Run("PUBLISH/SUBSCRIBE after UNSUBSCRIBE without arguments", func(t *testing.T) {
+		pubsub := rdb.Subscribe(ctx, "chan1", "chan2", "chan3")
+		require.EqualValues(t, 1, receiveType(t, pubsub, &redis.Subscription{}).Count)
+		require.EqualValues(t, 2, receiveType(t, pubsub, &redis.Subscription{}).Count)
+		require.EqualValues(t, 3, receiveType(t, pubsub, &redis.Subscription{}).Count)
+
+		require.NoError(t, pubsub.Unsubscribe(ctx))
+		require.EqualValues(t, &redis.Subscription{
+			Kind:    "unsubscribe",
+			Channel: "chan1",
+			Count:   2,
+		}, receiveType(t, pubsub, &redis.Subscription{}))
+		require.EqualValues(t, &redis.Subscription{
+			Kind:    "unsubscribe",
+			Channel: "chan2",
+			Count:   1,
+		}, receiveType(t, pubsub, &redis.Subscription{}))
+		require.EqualValues(t, &redis.Subscription{
+			Kind:    "unsubscribe",
+			Channel: "chan3",
+			Count:   0,
+		}, receiveType(t, pubsub, &redis.Subscription{}))
+
+		require.EqualValues(t, 0, rdb.Publish(ctx, "chan1", "hello").Val())
+		require.EqualValues(t, 0, rdb.Publish(ctx, "chan2", "hello").Val())

Review Comment:
   Check run: https://github.com/apache/incubator-kvrocks/actions/runs/3240746852/jobs/5311958362



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-kvrocks] tisonkun commented on a diff in pull request #982: Move TCL test unit/pubsub to Go case

Posted by GitBox <gi...@apache.org>.
tisonkun commented on code in PR #982:
URL: https://github.com/apache/incubator-kvrocks/pull/982#discussion_r994411259


##########
tests/gocase/unit/pubsub/pubsub_test.go:
##########
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package pubsub
+
+import (
+	"context"
+	"fmt"
+	"testing"
+
+	"github.com/apache/incubator-kvrocks/tests/gocase/util"
+	"github.com/go-redis/redis/v9"
+	"github.com/stretchr/testify/require"
+)
+
+func receiveType[T any](t *testing.T, pubsub *redis.PubSub, typ T) T {
+	msg, err := pubsub.Receive(context.Background())
+	require.NoError(t, err)
+	require.IsType(t, typ, msg)
+	return msg.(T)
+}
+
+func TestPubSub(t *testing.T) {
+	srv := util.StartServer(t, map[string]string{})
+	defer srv.Close()
+
+	ctx := context.Background()
+	rdb := srv.NewClient()
+	defer func() { require.NoError(t, rdb.Close()) }()
+
+	t.Run("PUBLISH/SUBSCRIBE PING", func(t *testing.T) {
+		pubsub := rdb.Subscribe(ctx, "somechannel")
+		receiveType(t, pubsub, &redis.Subscription{})
+		require.NoError(t, pubsub.Ping(ctx))
+		require.NoError(t, pubsub.Ping(ctx))
+		require.NoError(t, pubsub.Unsubscribe(ctx, "somechannel"))
+		require.Equal(t, "PONG", rdb.Ping(ctx).Val())
+		receiveType(t, pubsub, &redis.Pong{})
+		receiveType(t, pubsub, &redis.Pong{})
+	})
+
+	t.Run("PUBLISH/SUBSCRIBE basics", func(t *testing.T) {
+		// subscribe to two channels
+		pubsub := rdb.Subscribe(ctx, "chan1", "chan2")
+		require.EqualValues(t, 1, receiveType(t, pubsub, &redis.Subscription{}).Count)
+		require.EqualValues(t, 2, receiveType(t, pubsub, &redis.Subscription{}).Count)
+
+		require.EqualValues(t, 1, rdb.Publish(ctx, "chan1", "hello").Val())
+		require.EqualValues(t, 1, rdb.Publish(ctx, "chan2", "world").Val())
+
+		msg := receiveType(t, pubsub, &redis.Message{})
+		require.Equal(t, "chan1", msg.Channel)
+		require.Equal(t, "hello", msg.Payload)
+		msg = receiveType(t, pubsub, &redis.Message{})
+		require.Equal(t, "chan2", msg.Channel)
+		require.Equal(t, "world", msg.Payload)
+
+		// unsubscribe from one of the channels
+		require.NoError(t, pubsub.Unsubscribe(ctx, "chan1"))
+		require.EqualValues(t, 0, rdb.Publish(ctx, "chan1", "hello").Val())
+		require.EqualValues(t, 1, rdb.Publish(ctx, "chan2", "world").Val())
+
+		// unsubscribe from the remaining channel
+		require.NoError(t, pubsub.Unsubscribe(ctx, "chan2"))
+		require.EqualValues(t, 0, rdb.Publish(ctx, "chan1", "hello").Val())
+		require.EqualValues(t, 0, rdb.Publish(ctx, "chan2", "world").Val())
+	})
+
+	t.Run("PUBLISH/SUBSCRIBE with two clients", func(t *testing.T) {
+		c1 := srv.NewClient()
+		defer func() { require.NoError(t, c1.Close()) }()
+		c2 := srv.NewClient()
+		defer func() { require.NoError(t, c2.Close()) }()
+
+		p1 := c1.Subscribe(ctx, "chan1")
+		p2 := c2.Subscribe(ctx, "chan1")
+		require.EqualValues(t, 1, receiveType(t, p1, &redis.Subscription{}).Count)
+		require.EqualValues(t, 1, receiveType(t, p2, &redis.Subscription{}).Count)
+
+		require.EqualValues(t, 2, rdb.Publish(ctx, "chan1", "hello").Val())
+		msg := receiveType(t, p1, &redis.Message{})
+		require.Equal(t, "chan1", msg.Channel)
+		require.Equal(t, "hello", msg.Payload)
+		msg = receiveType(t, p2, &redis.Message{})
+		require.Equal(t, "chan1", msg.Channel)
+		require.Equal(t, "hello", msg.Payload)
+	})
+
+	t.Run("PUBLISH/SUBSCRIBE after UNSUBSCRIBE without arguments", func(t *testing.T) {
+		pubsub := rdb.Subscribe(ctx, "chan1", "chan2", "chan3")
+		require.EqualValues(t, 1, receiveType(t, pubsub, &redis.Subscription{}).Count)
+		require.EqualValues(t, 2, receiveType(t, pubsub, &redis.Subscription{}).Count)
+		require.EqualValues(t, 3, receiveType(t, pubsub, &redis.Subscription{}).Count)
+
+		require.NoError(t, pubsub.Unsubscribe(ctx))
+		require.EqualValues(t, &redis.Subscription{
+			Kind:    "unsubscribe",
+			Channel: "chan1",
+			Count:   2,
+		}, receiveType(t, pubsub, &redis.Subscription{}))
+		require.EqualValues(t, &redis.Subscription{
+			Kind:    "unsubscribe",
+			Channel: "chan2",
+			Count:   1,
+		}, receiveType(t, pubsub, &redis.Subscription{}))
+		require.EqualValues(t, &redis.Subscription{
+			Kind:    "unsubscribe",
+			Channel: "chan3",
+			Count:   0,
+		}, receiveType(t, pubsub, &redis.Subscription{}))
+
+		require.EqualValues(t, 0, rdb.Publish(ctx, "chan1", "hello").Val())
+		require.EqualValues(t, 0, rdb.Publish(ctx, "chan2", "hello").Val())

Review Comment:
   cc @git-hulk 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-kvrocks] tisonkun commented on a diff in pull request #982: Move TCL test unit/pubsub to Go case

Posted by GitBox <gi...@apache.org>.
tisonkun commented on code in PR #982:
URL: https://github.com/apache/incubator-kvrocks/pull/982#discussion_r994409836


##########
tests/gocase/unit/pubsub/pubsub_test.go:
##########
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package pubsub
+
+import (
+	"context"
+	"fmt"
+	"testing"
+
+	"github.com/apache/incubator-kvrocks/tests/gocase/util"
+	"github.com/go-redis/redis/v9"
+	"github.com/stretchr/testify/require"
+)
+
+func receiveType[T any](t *testing.T, pubsub *redis.PubSub, typ T) T {
+	msg, err := pubsub.Receive(context.Background())
+	require.NoError(t, err)
+	require.IsType(t, typ, msg)
+	return msg.(T)
+}
+
+func TestPubSub(t *testing.T) {
+	srv := util.StartServer(t, map[string]string{})
+	defer srv.Close()
+
+	ctx := context.Background()
+	rdb := srv.NewClient()
+	defer func() { require.NoError(t, rdb.Close()) }()
+
+	t.Run("PUBLISH/SUBSCRIBE PING", func(t *testing.T) {
+		pubsub := rdb.Subscribe(ctx, "somechannel")
+		receiveType(t, pubsub, &redis.Subscription{})
+		require.NoError(t, pubsub.Ping(ctx))
+		require.NoError(t, pubsub.Ping(ctx))
+		require.NoError(t, pubsub.Unsubscribe(ctx, "somechannel"))
+		require.Equal(t, "PONG", rdb.Ping(ctx).Val())
+		receiveType(t, pubsub, &redis.Pong{})
+		receiveType(t, pubsub, &redis.Pong{})
+	})
+
+	t.Run("PUBLISH/SUBSCRIBE basics", func(t *testing.T) {
+		// subscribe to two channels
+		pubsub := rdb.Subscribe(ctx, "chan1", "chan2")
+		require.EqualValues(t, 1, receiveType(t, pubsub, &redis.Subscription{}).Count)
+		require.EqualValues(t, 2, receiveType(t, pubsub, &redis.Subscription{}).Count)
+
+		require.EqualValues(t, 1, rdb.Publish(ctx, "chan1", "hello").Val())
+		require.EqualValues(t, 1, rdb.Publish(ctx, "chan2", "world").Val())
+
+		msg := receiveType(t, pubsub, &redis.Message{})
+		require.Equal(t, "chan1", msg.Channel)
+		require.Equal(t, "hello", msg.Payload)
+		msg = receiveType(t, pubsub, &redis.Message{})
+		require.Equal(t, "chan2", msg.Channel)
+		require.Equal(t, "world", msg.Payload)
+
+		// unsubscribe from one of the channels
+		require.NoError(t, pubsub.Unsubscribe(ctx, "chan1"))
+		require.EqualValues(t, 0, rdb.Publish(ctx, "chan1", "hello").Val())
+		require.EqualValues(t, 1, rdb.Publish(ctx, "chan2", "world").Val())
+
+		// unsubscribe from the remaining channel
+		require.NoError(t, pubsub.Unsubscribe(ctx, "chan2"))
+		require.EqualValues(t, 0, rdb.Publish(ctx, "chan1", "hello").Val())
+		require.EqualValues(t, 0, rdb.Publish(ctx, "chan2", "world").Val())
+	})
+
+	t.Run("PUBLISH/SUBSCRIBE with two clients", func(t *testing.T) {
+		c1 := srv.NewClient()
+		defer func() { require.NoError(t, c1.Close()) }()
+		c2 := srv.NewClient()
+		defer func() { require.NoError(t, c2.Close()) }()
+
+		p1 := c1.Subscribe(ctx, "chan1")
+		p2 := c2.Subscribe(ctx, "chan1")
+		require.EqualValues(t, 1, receiveType(t, p1, &redis.Subscription{}).Count)
+		require.EqualValues(t, 1, receiveType(t, p2, &redis.Subscription{}).Count)
+
+		require.EqualValues(t, 2, rdb.Publish(ctx, "chan1", "hello").Val())
+		msg := receiveType(t, p1, &redis.Message{})
+		require.Equal(t, "chan1", msg.Channel)
+		require.Equal(t, "hello", msg.Payload)
+		msg = receiveType(t, p2, &redis.Message{})
+		require.Equal(t, "chan1", msg.Channel)
+		require.Equal(t, "hello", msg.Payload)
+	})
+
+	t.Run("PUBLISH/SUBSCRIBE after UNSUBSCRIBE without arguments", func(t *testing.T) {
+		pubsub := rdb.Subscribe(ctx, "chan1", "chan2", "chan3")
+		require.EqualValues(t, 1, receiveType(t, pubsub, &redis.Subscription{}).Count)
+		require.EqualValues(t, 2, receiveType(t, pubsub, &redis.Subscription{}).Count)
+		require.EqualValues(t, 3, receiveType(t, pubsub, &redis.Subscription{}).Count)
+
+		require.NoError(t, pubsub.Unsubscribe(ctx))
+		require.EqualValues(t, &redis.Subscription{
+			Kind:    "unsubscribe",
+			Channel: "chan1",
+			Count:   2,
+		}, receiveType(t, pubsub, &redis.Subscription{}))
+		require.EqualValues(t, &redis.Subscription{
+			Kind:    "unsubscribe",
+			Channel: "chan2",
+			Count:   1,
+		}, receiveType(t, pubsub, &redis.Subscription{}))
+		require.EqualValues(t, &redis.Subscription{
+			Kind:    "unsubscribe",
+			Channel: "chan3",
+			Count:   0,
+		}, receiveType(t, pubsub, &redis.Subscription{}))
+
+		require.EqualValues(t, 0, rdb.Publish(ctx, "chan1", "hello").Val())
+		require.EqualValues(t, 0, rdb.Publish(ctx, "chan2", "hello").Val())

Review Comment:
   ```
   --- FAIL: TestPubSub (1.08s)
       --- FAIL: TestPubSub/PUBLISH/SUBSCRIBE_basics (0.00s)
           pubsub_test.go:76: 
               	Error Trace:	/home/runner/work/incubator-kvrocks/incubator-kvrocks/tests/gocase/unit/pubsub/pubsub_test.go:76
               	Error:      	Not equal: 
               	            	expected: int(0)
               	            	actual  : int64(1)
               	Test:       	TestPubSub/PUBLISH/SUBSCRIBE_basics
       --- FAIL: TestPubSub/PUBLISH/SUBSCRIBE_after_UNSUBSCRIBE_without_arguments (0.00s)
           pubsub_test.go:129: 
               	Error Trace:	/home/runner/work/incubator-kvrocks/incubator-kvrocks/tests/gocase/unit/pubsub/pubsub_test.go:129
               	Error:      	Not equal: 
               	            	expected: int(0)
               	            	actual  : int64(1)
               	Test:       	TestPubSub/PUBLISH/SUBSCRIBE_after_UNSUBSCRIBE_without_arguments
   ```
   
   @vmihailenco do you have ideas on this failure? Why can we still publish to a channel even if no one is subscribing to it? It seems like an unstable case instead of a steady one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-kvrocks] tisonkun commented on a diff in pull request #982: Move TCL test unit/pubsub to Go case

Posted by GitBox <gi...@apache.org>.
tisonkun commented on code in PR #982:
URL: https://github.com/apache/incubator-kvrocks/pull/982#discussion_r994557692


##########
tests/gocase/unit/pubsub/pubsub_test.go:
##########
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package pubsub
+
+import (
+	"context"
+	"fmt"
+	"testing"
+
+	"github.com/apache/incubator-kvrocks/tests/gocase/util"
+	"github.com/go-redis/redis/v9"
+	"github.com/stretchr/testify/require"
+)
+
+func receiveType[T any](t *testing.T, pubsub *redis.PubSub, typ T) T {
+	msg, err := pubsub.Receive(context.Background())
+	require.NoError(t, err)
+	require.IsType(t, typ, msg)
+	return msg.(T)
+}
+
+func TestPubSub(t *testing.T) {
+	srv := util.StartServer(t, map[string]string{})
+	defer srv.Close()
+
+	ctx := context.Background()
+	rdb := srv.NewClient()
+	defer func() { require.NoError(t, rdb.Close()) }()
+
+	t.Run("PUBLISH/SUBSCRIBE PING", func(t *testing.T) {
+		pubsub := rdb.Subscribe(ctx, "somechannel")
+		receiveType(t, pubsub, &redis.Subscription{})
+		require.NoError(t, pubsub.Ping(ctx))
+		require.NoError(t, pubsub.Ping(ctx))
+		require.NoError(t, pubsub.Unsubscribe(ctx, "somechannel"))
+		require.Equal(t, "PONG", rdb.Ping(ctx).Val())
+		receiveType(t, pubsub, &redis.Pong{})
+		receiveType(t, pubsub, &redis.Pong{})
+	})
+
+	t.Run("PUBLISH/SUBSCRIBE basics", func(t *testing.T) {
+		// subscribe to two channels
+		pubsub := rdb.Subscribe(ctx, "chan1", "chan2")
+		require.EqualValues(t, 1, receiveType(t, pubsub, &redis.Subscription{}).Count)
+		require.EqualValues(t, 2, receiveType(t, pubsub, &redis.Subscription{}).Count)
+
+		require.EqualValues(t, 1, rdb.Publish(ctx, "chan1", "hello").Val())
+		require.EqualValues(t, 1, rdb.Publish(ctx, "chan2", "world").Val())
+
+		msg := receiveType(t, pubsub, &redis.Message{})
+		require.Equal(t, "chan1", msg.Channel)
+		require.Equal(t, "hello", msg.Payload)
+		msg = receiveType(t, pubsub, &redis.Message{})
+		require.Equal(t, "chan2", msg.Channel)
+		require.Equal(t, "world", msg.Payload)
+
+		// unsubscribe from one of the channels
+		require.NoError(t, pubsub.Unsubscribe(ctx, "chan1"))
+		require.EqualValues(t, 0, rdb.Publish(ctx, "chan1", "hello").Val())
+		require.EqualValues(t, 1, rdb.Publish(ctx, "chan2", "world").Val())
+
+		// unsubscribe from the remaining channel
+		require.NoError(t, pubsub.Unsubscribe(ctx, "chan2"))
+		require.EqualValues(t, 0, rdb.Publish(ctx, "chan1", "hello").Val())
+		require.EqualValues(t, 0, rdb.Publish(ctx, "chan2", "world").Val())
+	})
+
+	t.Run("PUBLISH/SUBSCRIBE with two clients", func(t *testing.T) {
+		c1 := srv.NewClient()
+		defer func() { require.NoError(t, c1.Close()) }()
+		c2 := srv.NewClient()
+		defer func() { require.NoError(t, c2.Close()) }()
+
+		p1 := c1.Subscribe(ctx, "chan1")
+		p2 := c2.Subscribe(ctx, "chan1")
+		require.EqualValues(t, 1, receiveType(t, p1, &redis.Subscription{}).Count)
+		require.EqualValues(t, 1, receiveType(t, p2, &redis.Subscription{}).Count)
+
+		require.EqualValues(t, 2, rdb.Publish(ctx, "chan1", "hello").Val())
+		msg := receiveType(t, p1, &redis.Message{})
+		require.Equal(t, "chan1", msg.Channel)
+		require.Equal(t, "hello", msg.Payload)
+		msg = receiveType(t, p2, &redis.Message{})
+		require.Equal(t, "chan1", msg.Channel)
+		require.Equal(t, "hello", msg.Payload)
+	})
+
+	t.Run("PUBLISH/SUBSCRIBE after UNSUBSCRIBE without arguments", func(t *testing.T) {
+		pubsub := rdb.Subscribe(ctx, "chan1", "chan2", "chan3")
+		require.EqualValues(t, 1, receiveType(t, pubsub, &redis.Subscription{}).Count)
+		require.EqualValues(t, 2, receiveType(t, pubsub, &redis.Subscription{}).Count)
+		require.EqualValues(t, 3, receiveType(t, pubsub, &redis.Subscription{}).Count)
+
+		require.NoError(t, pubsub.Unsubscribe(ctx))

Review Comment:
   Forgive my comment above, it's about stateful tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-kvrocks] tisonkun commented on a diff in pull request #982: Move TCL test unit/pubsub to Go case

Posted by GitBox <gi...@apache.org>.
tisonkun commented on code in PR #982:
URL: https://github.com/apache/incubator-kvrocks/pull/982#discussion_r994409836


##########
tests/gocase/unit/pubsub/pubsub_test.go:
##########
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package pubsub
+
+import (
+	"context"
+	"fmt"
+	"testing"
+
+	"github.com/apache/incubator-kvrocks/tests/gocase/util"
+	"github.com/go-redis/redis/v9"
+	"github.com/stretchr/testify/require"
+)
+
+func receiveType[T any](t *testing.T, pubsub *redis.PubSub, typ T) T {
+	msg, err := pubsub.Receive(context.Background())
+	require.NoError(t, err)
+	require.IsType(t, typ, msg)
+	return msg.(T)
+}
+
+func TestPubSub(t *testing.T) {
+	srv := util.StartServer(t, map[string]string{})
+	defer srv.Close()
+
+	ctx := context.Background()
+	rdb := srv.NewClient()
+	defer func() { require.NoError(t, rdb.Close()) }()
+
+	t.Run("PUBLISH/SUBSCRIBE PING", func(t *testing.T) {
+		pubsub := rdb.Subscribe(ctx, "somechannel")
+		receiveType(t, pubsub, &redis.Subscription{})
+		require.NoError(t, pubsub.Ping(ctx))
+		require.NoError(t, pubsub.Ping(ctx))
+		require.NoError(t, pubsub.Unsubscribe(ctx, "somechannel"))
+		require.Equal(t, "PONG", rdb.Ping(ctx).Val())
+		receiveType(t, pubsub, &redis.Pong{})
+		receiveType(t, pubsub, &redis.Pong{})
+	})
+
+	t.Run("PUBLISH/SUBSCRIBE basics", func(t *testing.T) {
+		// subscribe to two channels
+		pubsub := rdb.Subscribe(ctx, "chan1", "chan2")
+		require.EqualValues(t, 1, receiveType(t, pubsub, &redis.Subscription{}).Count)
+		require.EqualValues(t, 2, receiveType(t, pubsub, &redis.Subscription{}).Count)
+
+		require.EqualValues(t, 1, rdb.Publish(ctx, "chan1", "hello").Val())
+		require.EqualValues(t, 1, rdb.Publish(ctx, "chan2", "world").Val())
+
+		msg := receiveType(t, pubsub, &redis.Message{})
+		require.Equal(t, "chan1", msg.Channel)
+		require.Equal(t, "hello", msg.Payload)
+		msg = receiveType(t, pubsub, &redis.Message{})
+		require.Equal(t, "chan2", msg.Channel)
+		require.Equal(t, "world", msg.Payload)
+
+		// unsubscribe from one of the channels
+		require.NoError(t, pubsub.Unsubscribe(ctx, "chan1"))
+		require.EqualValues(t, 0, rdb.Publish(ctx, "chan1", "hello").Val())
+		require.EqualValues(t, 1, rdb.Publish(ctx, "chan2", "world").Val())
+
+		// unsubscribe from the remaining channel
+		require.NoError(t, pubsub.Unsubscribe(ctx, "chan2"))
+		require.EqualValues(t, 0, rdb.Publish(ctx, "chan1", "hello").Val())
+		require.EqualValues(t, 0, rdb.Publish(ctx, "chan2", "world").Val())
+	})
+
+	t.Run("PUBLISH/SUBSCRIBE with two clients", func(t *testing.T) {
+		c1 := srv.NewClient()
+		defer func() { require.NoError(t, c1.Close()) }()
+		c2 := srv.NewClient()
+		defer func() { require.NoError(t, c2.Close()) }()
+
+		p1 := c1.Subscribe(ctx, "chan1")
+		p2 := c2.Subscribe(ctx, "chan1")
+		require.EqualValues(t, 1, receiveType(t, p1, &redis.Subscription{}).Count)
+		require.EqualValues(t, 1, receiveType(t, p2, &redis.Subscription{}).Count)
+
+		require.EqualValues(t, 2, rdb.Publish(ctx, "chan1", "hello").Val())
+		msg := receiveType(t, p1, &redis.Message{})
+		require.Equal(t, "chan1", msg.Channel)
+		require.Equal(t, "hello", msg.Payload)
+		msg = receiveType(t, p2, &redis.Message{})
+		require.Equal(t, "chan1", msg.Channel)
+		require.Equal(t, "hello", msg.Payload)
+	})
+
+	t.Run("PUBLISH/SUBSCRIBE after UNSUBSCRIBE without arguments", func(t *testing.T) {
+		pubsub := rdb.Subscribe(ctx, "chan1", "chan2", "chan3")
+		require.EqualValues(t, 1, receiveType(t, pubsub, &redis.Subscription{}).Count)
+		require.EqualValues(t, 2, receiveType(t, pubsub, &redis.Subscription{}).Count)
+		require.EqualValues(t, 3, receiveType(t, pubsub, &redis.Subscription{}).Count)
+
+		require.NoError(t, pubsub.Unsubscribe(ctx))
+		require.EqualValues(t, &redis.Subscription{
+			Kind:    "unsubscribe",
+			Channel: "chan1",
+			Count:   2,
+		}, receiveType(t, pubsub, &redis.Subscription{}))
+		require.EqualValues(t, &redis.Subscription{
+			Kind:    "unsubscribe",
+			Channel: "chan2",
+			Count:   1,
+		}, receiveType(t, pubsub, &redis.Subscription{}))
+		require.EqualValues(t, &redis.Subscription{
+			Kind:    "unsubscribe",
+			Channel: "chan3",
+			Count:   0,
+		}, receiveType(t, pubsub, &redis.Subscription{}))
+
+		require.EqualValues(t, 0, rdb.Publish(ctx, "chan1", "hello").Val())
+		require.EqualValues(t, 0, rdb.Publish(ctx, "chan2", "hello").Val())

Review Comment:
   ```
   --- FAIL: TestPubSub (1.08s)
       --- FAIL: TestPubSub/PUBLISH/SUBSCRIBE_basics (0.00s)
           pubsub_test.go:76: 
               	Error Trace:	/home/runner/work/incubator-kvrocks/incubator-kvrocks/tests/gocase/unit/pubsub/pubsub_test.go:76
               	Error:      	Not equal: 
               	            	expected: int(0)
               	            	actual  : int64(1)
               	Test:       	TestPubSub/PUBLISH/SUBSCRIBE_basics
       --- FAIL: TestPubSub/PUBLISH/SUBSCRIBE_after_UNSUBSCRIBE_without_arguments (0.00s)
           pubsub_test.go:129: 
               	Error Trace:	/home/runner/work/incubator-kvrocks/incubator-kvrocks/tests/gocase/unit/pubsub/pubsub_test.go:129
               	Error:      	Not equal: 
               	            	expected: int(0)
               	            	actual  : int64(1)
               	Test:       	TestPubSub/PUBLISH/SUBSCRIBE_after_UNSUBSCRIBE_without_arguments
   ```
   
   @vmihailenco do you have ideas on this failure? Why we can still publish to a channel event if no one is subscribing to it? It seems an unstable case instead of steady one.



##########
tests/gocase/unit/pubsub/pubsub_test.go:
##########
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package pubsub
+
+import (
+	"context"
+	"fmt"
+	"testing"
+
+	"github.com/apache/incubator-kvrocks/tests/gocase/util"
+	"github.com/go-redis/redis/v9"
+	"github.com/stretchr/testify/require"
+)
+
+func receiveType[T any](t *testing.T, pubsub *redis.PubSub, typ T) T {
+	msg, err := pubsub.Receive(context.Background())
+	require.NoError(t, err)
+	require.IsType(t, typ, msg)
+	return msg.(T)
+}
+
+func TestPubSub(t *testing.T) {
+	srv := util.StartServer(t, map[string]string{})
+	defer srv.Close()
+
+	ctx := context.Background()
+	rdb := srv.NewClient()
+	defer func() { require.NoError(t, rdb.Close()) }()
+
+	t.Run("PUBLISH/SUBSCRIBE PING", func(t *testing.T) {
+		pubsub := rdb.Subscribe(ctx, "somechannel")
+		receiveType(t, pubsub, &redis.Subscription{})
+		require.NoError(t, pubsub.Ping(ctx))
+		require.NoError(t, pubsub.Ping(ctx))
+		require.NoError(t, pubsub.Unsubscribe(ctx, "somechannel"))
+		require.Equal(t, "PONG", rdb.Ping(ctx).Val())
+		receiveType(t, pubsub, &redis.Pong{})
+		receiveType(t, pubsub, &redis.Pong{})
+	})
+
+	t.Run("PUBLISH/SUBSCRIBE basics", func(t *testing.T) {
+		// subscribe to two channels
+		pubsub := rdb.Subscribe(ctx, "chan1", "chan2")
+		require.EqualValues(t, 1, receiveType(t, pubsub, &redis.Subscription{}).Count)
+		require.EqualValues(t, 2, receiveType(t, pubsub, &redis.Subscription{}).Count)
+
+		require.EqualValues(t, 1, rdb.Publish(ctx, "chan1", "hello").Val())
+		require.EqualValues(t, 1, rdb.Publish(ctx, "chan2", "world").Val())
+
+		msg := receiveType(t, pubsub, &redis.Message{})
+		require.Equal(t, "chan1", msg.Channel)
+		require.Equal(t, "hello", msg.Payload)
+		msg = receiveType(t, pubsub, &redis.Message{})
+		require.Equal(t, "chan2", msg.Channel)
+		require.Equal(t, "world", msg.Payload)
+
+		// unsubscribe from one of the channels
+		require.NoError(t, pubsub.Unsubscribe(ctx, "chan1"))
+		require.EqualValues(t, 0, rdb.Publish(ctx, "chan1", "hello").Val())
+		require.EqualValues(t, 1, rdb.Publish(ctx, "chan2", "world").Val())
+
+		// unsubscribe from the remaining channel
+		require.NoError(t, pubsub.Unsubscribe(ctx, "chan2"))
+		require.EqualValues(t, 0, rdb.Publish(ctx, "chan1", "hello").Val())
+		require.EqualValues(t, 0, rdb.Publish(ctx, "chan2", "world").Val())
+	})
+
+	t.Run("PUBLISH/SUBSCRIBE with two clients", func(t *testing.T) {
+		c1 := srv.NewClient()
+		defer func() { require.NoError(t, c1.Close()) }()
+		c2 := srv.NewClient()
+		defer func() { require.NoError(t, c2.Close()) }()
+
+		p1 := c1.Subscribe(ctx, "chan1")
+		p2 := c2.Subscribe(ctx, "chan1")
+		require.EqualValues(t, 1, receiveType(t, p1, &redis.Subscription{}).Count)
+		require.EqualValues(t, 1, receiveType(t, p2, &redis.Subscription{}).Count)
+
+		require.EqualValues(t, 2, rdb.Publish(ctx, "chan1", "hello").Val())
+		msg := receiveType(t, p1, &redis.Message{})
+		require.Equal(t, "chan1", msg.Channel)
+		require.Equal(t, "hello", msg.Payload)
+		msg = receiveType(t, p2, &redis.Message{})
+		require.Equal(t, "chan1", msg.Channel)
+		require.Equal(t, "hello", msg.Payload)
+	})
+
+	t.Run("PUBLISH/SUBSCRIBE after UNSUBSCRIBE without arguments", func(t *testing.T) {
+		pubsub := rdb.Subscribe(ctx, "chan1", "chan2", "chan3")
+		require.EqualValues(t, 1, receiveType(t, pubsub, &redis.Subscription{}).Count)
+		require.EqualValues(t, 2, receiveType(t, pubsub, &redis.Subscription{}).Count)
+		require.EqualValues(t, 3, receiveType(t, pubsub, &redis.Subscription{}).Count)
+
+		require.NoError(t, pubsub.Unsubscribe(ctx))
+		require.EqualValues(t, &redis.Subscription{
+			Kind:    "unsubscribe",
+			Channel: "chan1",
+			Count:   2,
+		}, receiveType(t, pubsub, &redis.Subscription{}))
+		require.EqualValues(t, &redis.Subscription{
+			Kind:    "unsubscribe",
+			Channel: "chan2",
+			Count:   1,
+		}, receiveType(t, pubsub, &redis.Subscription{}))
+		require.EqualValues(t, &redis.Subscription{
+			Kind:    "unsubscribe",
+			Channel: "chan3",
+			Count:   0,
+		}, receiveType(t, pubsub, &redis.Subscription{}))
+
+		require.EqualValues(t, 0, rdb.Publish(ctx, "chan1", "hello").Val())
+		require.EqualValues(t, 0, rdb.Publish(ctx, "chan2", "hello").Val())

Review Comment:
   ```
   --- FAIL: TestPubSub (1.08s)
       --- FAIL: TestPubSub/PUBLISH/SUBSCRIBE_basics (0.00s)
           pubsub_test.go:76: 
               	Error Trace:	/home/runner/work/incubator-kvrocks/incubator-kvrocks/tests/gocase/unit/pubsub/pubsub_test.go:76
               	Error:      	Not equal: 
               	            	expected: int(0)
               	            	actual  : int64(1)
               	Test:       	TestPubSub/PUBLISH/SUBSCRIBE_basics
       --- FAIL: TestPubSub/PUBLISH/SUBSCRIBE_after_UNSUBSCRIBE_without_arguments (0.00s)
           pubsub_test.go:129: 
               	Error Trace:	/home/runner/work/incubator-kvrocks/incubator-kvrocks/tests/gocase/unit/pubsub/pubsub_test.go:129
               	Error:      	Not equal: 
               	            	expected: int(0)
               	            	actual  : int64(1)
               	Test:       	TestPubSub/PUBLISH/SUBSCRIBE_after_UNSUBSCRIBE_without_arguments
   ```
   
   @vmihailenco do you have ideas on this failure? Why we can still publish to a channel event if no one is subscribing to it? It seems an unstable case instead of a steady one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-kvrocks] tisonkun commented on a diff in pull request #982: Move TCL test unit/pubsub to Go case

Posted by GitBox <gi...@apache.org>.
tisonkun commented on code in PR #982:
URL: https://github.com/apache/incubator-kvrocks/pull/982#discussion_r994511421


##########
tests/gocase/unit/pubsub/pubsub_test.go:
##########
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package pubsub
+
+import (
+	"context"
+	"fmt"
+	"testing"
+
+	"github.com/apache/incubator-kvrocks/tests/gocase/util"
+	"github.com/go-redis/redis/v9"
+	"github.com/stretchr/testify/require"
+)
+
+func receiveType[T any](t *testing.T, pubsub *redis.PubSub, typ T) T {
+	msg, err := pubsub.Receive(context.Background())
+	require.NoError(t, err)
+	require.IsType(t, typ, msg)
+	return msg.(T)
+}
+
+func TestPubSub(t *testing.T) {
+	srv := util.StartServer(t, map[string]string{})
+	defer srv.Close()
+
+	ctx := context.Background()
+	rdb := srv.NewClient()
+	defer func() { require.NoError(t, rdb.Close()) }()
+
+	t.Run("PUBLISH/SUBSCRIBE PING", func(t *testing.T) {
+		pubsub := rdb.Subscribe(ctx, "somechannel")
+		receiveType(t, pubsub, &redis.Subscription{})
+		require.NoError(t, pubsub.Ping(ctx))
+		require.NoError(t, pubsub.Ping(ctx))
+		require.NoError(t, pubsub.Unsubscribe(ctx, "somechannel"))
+		require.Equal(t, "PONG", rdb.Ping(ctx).Val())
+		receiveType(t, pubsub, &redis.Pong{})
+		receiveType(t, pubsub, &redis.Pong{})
+	})
+
+	t.Run("PUBLISH/SUBSCRIBE basics", func(t *testing.T) {
+		// subscribe to two channels
+		pubsub := rdb.Subscribe(ctx, "chan1", "chan2")
+		require.EqualValues(t, 1, receiveType(t, pubsub, &redis.Subscription{}).Count)
+		require.EqualValues(t, 2, receiveType(t, pubsub, &redis.Subscription{}).Count)
+
+		require.EqualValues(t, 1, rdb.Publish(ctx, "chan1", "hello").Val())
+		require.EqualValues(t, 1, rdb.Publish(ctx, "chan2", "world").Val())
+
+		msg := receiveType(t, pubsub, &redis.Message{})
+		require.Equal(t, "chan1", msg.Channel)
+		require.Equal(t, "hello", msg.Payload)
+		msg = receiveType(t, pubsub, &redis.Message{})
+		require.Equal(t, "chan2", msg.Channel)
+		require.Equal(t, "world", msg.Payload)
+
+		// unsubscribe from one of the channels
+		require.NoError(t, pubsub.Unsubscribe(ctx, "chan1"))
+		require.EqualValues(t, 0, rdb.Publish(ctx, "chan1", "hello").Val())
+		require.EqualValues(t, 1, rdb.Publish(ctx, "chan2", "world").Val())
+
+		// unsubscribe from the remaining channel
+		require.NoError(t, pubsub.Unsubscribe(ctx, "chan2"))
+		require.EqualValues(t, 0, rdb.Publish(ctx, "chan1", "hello").Val())
+		require.EqualValues(t, 0, rdb.Publish(ctx, "chan2", "world").Val())
+	})
+
+	t.Run("PUBLISH/SUBSCRIBE with two clients", func(t *testing.T) {
+		c1 := srv.NewClient()
+		defer func() { require.NoError(t, c1.Close()) }()
+		c2 := srv.NewClient()
+		defer func() { require.NoError(t, c2.Close()) }()
+
+		p1 := c1.Subscribe(ctx, "chan1")
+		p2 := c2.Subscribe(ctx, "chan1")
+		require.EqualValues(t, 1, receiveType(t, p1, &redis.Subscription{}).Count)
+		require.EqualValues(t, 1, receiveType(t, p2, &redis.Subscription{}).Count)
+
+		require.EqualValues(t, 2, rdb.Publish(ctx, "chan1", "hello").Val())
+		msg := receiveType(t, p1, &redis.Message{})
+		require.Equal(t, "chan1", msg.Channel)
+		require.Equal(t, "hello", msg.Payload)
+		msg = receiveType(t, p2, &redis.Message{})
+		require.Equal(t, "chan1", msg.Channel)
+		require.Equal(t, "hello", msg.Payload)
+	})
+
+	t.Run("PUBLISH/SUBSCRIBE after UNSUBSCRIBE without arguments", func(t *testing.T) {
+		pubsub := rdb.Subscribe(ctx, "chan1", "chan2", "chan3")
+		require.EqualValues(t, 1, receiveType(t, pubsub, &redis.Subscription{}).Count)
+		require.EqualValues(t, 2, receiveType(t, pubsub, &redis.Subscription{}).Count)
+		require.EqualValues(t, 3, receiveType(t, pubsub, &redis.Subscription{}).Count)
+
+		require.NoError(t, pubsub.Unsubscribe(ctx))

Review Comment:
   @vmihailenco from [Redis command docs](https://redis.io/commands/unsubscribe):
   
   > When no channels are specified, the client is unsubscribed from all the previously subscribed channels. In this case, a message for every unsubscribed channel will be sent to the client.
   
   ... from go-redis method docs:
   
   ```go
   // Unsubscribe the client from the given channels, or from all of
   // them if none is given.
   func (c *PubSub) Unsubscribe(ctx context.Context, channels ...string) error {
   ```
   
   So I think this command should behave as "unsubscribe all" otherwise it may be a bug.
   
   The interesting point is that, even we check the unsubscription event happened, we can still publish to those channels. @git-hulk this can be a multi-thread concurrency issue for Kvrocks specifically - I'm unsure about it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-kvrocks] vmihailenco commented on a diff in pull request #982: Move TCL test unit/pubsub to Go case

Posted by GitBox <gi...@apache.org>.
vmihailenco commented on code in PR #982:
URL: https://github.com/apache/incubator-kvrocks/pull/982#discussion_r994558202


##########
tests/gocase/unit/pubsub/pubsub_test.go:
##########
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package pubsub
+
+import (
+	"context"
+	"fmt"
+	"testing"
+
+	"github.com/apache/incubator-kvrocks/tests/gocase/util"
+	"github.com/go-redis/redis/v9"
+	"github.com/stretchr/testify/require"
+)
+
+func receiveType[T any](t *testing.T, pubsub *redis.PubSub, typ T) T {
+	msg, err := pubsub.Receive(context.Background())
+	require.NoError(t, err)
+	require.IsType(t, typ, msg)
+	return msg.(T)
+}
+
+func TestPubSub(t *testing.T) {
+	srv := util.StartServer(t, map[string]string{})
+	defer srv.Close()
+
+	ctx := context.Background()
+	rdb := srv.NewClient()
+	defer func() { require.NoError(t, rdb.Close()) }()
+
+	t.Run("PUBLISH/SUBSCRIBE PING", func(t *testing.T) {
+		pubsub := rdb.Subscribe(ctx, "somechannel")
+		receiveType(t, pubsub, &redis.Subscription{})
+		require.NoError(t, pubsub.Ping(ctx))
+		require.NoError(t, pubsub.Ping(ctx))
+		require.NoError(t, pubsub.Unsubscribe(ctx, "somechannel"))
+		require.Equal(t, "PONG", rdb.Ping(ctx).Val())
+		receiveType(t, pubsub, &redis.Pong{})
+		receiveType(t, pubsub, &redis.Pong{})
+	})
+
+	t.Run("PUBLISH/SUBSCRIBE basics", func(t *testing.T) {
+		// subscribe to two channels
+		pubsub := rdb.Subscribe(ctx, "chan1", "chan2")
+		require.EqualValues(t, 1, receiveType(t, pubsub, &redis.Subscription{}).Count)
+		require.EqualValues(t, 2, receiveType(t, pubsub, &redis.Subscription{}).Count)
+
+		require.EqualValues(t, 1, rdb.Publish(ctx, "chan1", "hello").Val())
+		require.EqualValues(t, 1, rdb.Publish(ctx, "chan2", "world").Val())
+
+		msg := receiveType(t, pubsub, &redis.Message{})
+		require.Equal(t, "chan1", msg.Channel)
+		require.Equal(t, "hello", msg.Payload)
+		msg = receiveType(t, pubsub, &redis.Message{})
+		require.Equal(t, "chan2", msg.Channel)
+		require.Equal(t, "world", msg.Payload)
+
+		// unsubscribe from one of the channels
+		require.NoError(t, pubsub.Unsubscribe(ctx, "chan1"))
+		require.EqualValues(t, 0, rdb.Publish(ctx, "chan1", "hello").Val())
+		require.EqualValues(t, 1, rdb.Publish(ctx, "chan2", "world").Val())
+
+		// unsubscribe from the remaining channel
+		require.NoError(t, pubsub.Unsubscribe(ctx, "chan2"))
+		require.EqualValues(t, 0, rdb.Publish(ctx, "chan1", "hello").Val())
+		require.EqualValues(t, 0, rdb.Publish(ctx, "chan2", "world").Val())
+	})
+
+	t.Run("PUBLISH/SUBSCRIBE with two clients", func(t *testing.T) {
+		c1 := srv.NewClient()
+		defer func() { require.NoError(t, c1.Close()) }()
+		c2 := srv.NewClient()
+		defer func() { require.NoError(t, c2.Close()) }()
+
+		p1 := c1.Subscribe(ctx, "chan1")
+		p2 := c2.Subscribe(ctx, "chan1")
+		require.EqualValues(t, 1, receiveType(t, p1, &redis.Subscription{}).Count)
+		require.EqualValues(t, 1, receiveType(t, p2, &redis.Subscription{}).Count)
+
+		require.EqualValues(t, 2, rdb.Publish(ctx, "chan1", "hello").Val())
+		msg := receiveType(t, p1, &redis.Message{})
+		require.Equal(t, "chan1", msg.Channel)
+		require.Equal(t, "hello", msg.Payload)
+		msg = receiveType(t, p2, &redis.Message{})
+		require.Equal(t, "chan1", msg.Channel)
+		require.Equal(t, "hello", msg.Payload)
+	})
+
+	t.Run("PUBLISH/SUBSCRIBE after UNSUBSCRIBE without arguments", func(t *testing.T) {
+		pubsub := rdb.Subscribe(ctx, "chan1", "chan2", "chan3")
+		require.EqualValues(t, 1, receiveType(t, pubsub, &redis.Subscription{}).Count)
+		require.EqualValues(t, 2, receiveType(t, pubsub, &redis.Subscription{}).Count)
+		require.EqualValues(t, 3, receiveType(t, pubsub, &redis.Subscription{}).Count)
+
+		require.NoError(t, pubsub.Unsubscribe(ctx))

Review Comment:
   You can add `defer pubsub.Close()` to the tests to properly cleanup pubsub and go-redis should fix https://github.com/go-redis/redis/issues/2248



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-kvrocks] tisonkun merged pull request #982: Move TCL test unit/pubsub to Go case

Posted by GitBox <gi...@apache.org>.
tisonkun merged PR #982:
URL: https://github.com/apache/incubator-kvrocks/pull/982


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-kvrocks] vmihailenco commented on a diff in pull request #982: Move TCL test unit/pubsub to Go case

Posted by GitBox <gi...@apache.org>.
vmihailenco commented on code in PR #982:
URL: https://github.com/apache/incubator-kvrocks/pull/982#discussion_r994532012


##########
tests/gocase/unit/pubsub/pubsub_test.go:
##########
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package pubsub
+
+import (
+	"context"
+	"fmt"
+	"testing"
+
+	"github.com/apache/incubator-kvrocks/tests/gocase/util"
+	"github.com/go-redis/redis/v9"
+	"github.com/stretchr/testify/require"
+)
+
+func receiveType[T any](t *testing.T, pubsub *redis.PubSub, typ T) T {
+	msg, err := pubsub.Receive(context.Background())
+	require.NoError(t, err)
+	require.IsType(t, typ, msg)
+	return msg.(T)
+}
+
+func TestPubSub(t *testing.T) {
+	srv := util.StartServer(t, map[string]string{})
+	defer srv.Close()
+
+	ctx := context.Background()
+	rdb := srv.NewClient()
+	defer func() { require.NoError(t, rdb.Close()) }()
+
+	t.Run("PUBLISH/SUBSCRIBE PING", func(t *testing.T) {
+		pubsub := rdb.Subscribe(ctx, "somechannel")
+		receiveType(t, pubsub, &redis.Subscription{})
+		require.NoError(t, pubsub.Ping(ctx))
+		require.NoError(t, pubsub.Ping(ctx))
+		require.NoError(t, pubsub.Unsubscribe(ctx, "somechannel"))
+		require.Equal(t, "PONG", rdb.Ping(ctx).Val())
+		receiveType(t, pubsub, &redis.Pong{})
+		receiveType(t, pubsub, &redis.Pong{})
+	})
+
+	t.Run("PUBLISH/SUBSCRIBE basics", func(t *testing.T) {
+		// subscribe to two channels
+		pubsub := rdb.Subscribe(ctx, "chan1", "chan2")
+		require.EqualValues(t, 1, receiveType(t, pubsub, &redis.Subscription{}).Count)
+		require.EqualValues(t, 2, receiveType(t, pubsub, &redis.Subscription{}).Count)
+
+		require.EqualValues(t, 1, rdb.Publish(ctx, "chan1", "hello").Val())
+		require.EqualValues(t, 1, rdb.Publish(ctx, "chan2", "world").Val())
+
+		msg := receiveType(t, pubsub, &redis.Message{})
+		require.Equal(t, "chan1", msg.Channel)
+		require.Equal(t, "hello", msg.Payload)
+		msg = receiveType(t, pubsub, &redis.Message{})
+		require.Equal(t, "chan2", msg.Channel)
+		require.Equal(t, "world", msg.Payload)
+
+		// unsubscribe from one of the channels
+		require.NoError(t, pubsub.Unsubscribe(ctx, "chan1"))
+		require.EqualValues(t, 0, rdb.Publish(ctx, "chan1", "hello").Val())
+		require.EqualValues(t, 1, rdb.Publish(ctx, "chan2", "world").Val())
+
+		// unsubscribe from the remaining channel
+		require.NoError(t, pubsub.Unsubscribe(ctx, "chan2"))
+		require.EqualValues(t, 0, rdb.Publish(ctx, "chan1", "hello").Val())
+		require.EqualValues(t, 0, rdb.Publish(ctx, "chan2", "world").Val())
+	})
+
+	t.Run("PUBLISH/SUBSCRIBE with two clients", func(t *testing.T) {
+		c1 := srv.NewClient()
+		defer func() { require.NoError(t, c1.Close()) }()
+		c2 := srv.NewClient()
+		defer func() { require.NoError(t, c2.Close()) }()
+
+		p1 := c1.Subscribe(ctx, "chan1")
+		p2 := c2.Subscribe(ctx, "chan1")
+		require.EqualValues(t, 1, receiveType(t, p1, &redis.Subscription{}).Count)
+		require.EqualValues(t, 1, receiveType(t, p2, &redis.Subscription{}).Count)
+
+		require.EqualValues(t, 2, rdb.Publish(ctx, "chan1", "hello").Val())
+		msg := receiveType(t, p1, &redis.Message{})
+		require.Equal(t, "chan1", msg.Channel)
+		require.Equal(t, "hello", msg.Payload)
+		msg = receiveType(t, p2, &redis.Message{})
+		require.Equal(t, "chan1", msg.Channel)
+		require.Equal(t, "hello", msg.Payload)
+	})
+
+	t.Run("PUBLISH/SUBSCRIBE after UNSUBSCRIBE without arguments", func(t *testing.T) {
+		pubsub := rdb.Subscribe(ctx, "chan1", "chan2", "chan3")
+		require.EqualValues(t, 1, receiveType(t, pubsub, &redis.Subscription{}).Count)
+		require.EqualValues(t, 2, receiveType(t, pubsub, &redis.Subscription{}).Count)
+		require.EqualValues(t, 3, receiveType(t, pubsub, &redis.Subscription{}).Count)
+
+		require.NoError(t, pubsub.Unsubscribe(ctx))

Review Comment:
   >from go-redis method docs:
   
   Indeed! I honestly can't find where that is implemented and can't check it right away so don't trust it blindly :)
   
   Perhaps it indeed unsubscribes thanks to Redis/kvrocks behavior, but does not reset the internal state of `PubSub` which can be a cause for surprises.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-kvrocks] vmihailenco commented on a diff in pull request #982: Move TCL test unit/pubsub to Go case

Posted by GitBox <gi...@apache.org>.
vmihailenco commented on code in PR #982:
URL: https://github.com/apache/incubator-kvrocks/pull/982#discussion_r994426236


##########
tests/gocase/unit/pubsub/pubsub_test.go:
##########
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package pubsub
+
+import (
+	"context"
+	"fmt"
+	"testing"
+
+	"github.com/apache/incubator-kvrocks/tests/gocase/util"
+	"github.com/go-redis/redis/v9"
+	"github.com/stretchr/testify/require"
+)
+
+func receiveType[T any](t *testing.T, pubsub *redis.PubSub, typ T) T {
+	msg, err := pubsub.Receive(context.Background())
+	require.NoError(t, err)
+	require.IsType(t, typ, msg)
+	return msg.(T)
+}
+
+func TestPubSub(t *testing.T) {
+	srv := util.StartServer(t, map[string]string{})
+	defer srv.Close()
+
+	ctx := context.Background()
+	rdb := srv.NewClient()
+	defer func() { require.NoError(t, rdb.Close()) }()
+
+	t.Run("PUBLISH/SUBSCRIBE PING", func(t *testing.T) {
+		pubsub := rdb.Subscribe(ctx, "somechannel")
+		receiveType(t, pubsub, &redis.Subscription{})
+		require.NoError(t, pubsub.Ping(ctx))
+		require.NoError(t, pubsub.Ping(ctx))
+		require.NoError(t, pubsub.Unsubscribe(ctx, "somechannel"))
+		require.Equal(t, "PONG", rdb.Ping(ctx).Val())
+		receiveType(t, pubsub, &redis.Pong{})
+		receiveType(t, pubsub, &redis.Pong{})
+	})
+
+	t.Run("PUBLISH/SUBSCRIBE basics", func(t *testing.T) {
+		// subscribe to two channels
+		pubsub := rdb.Subscribe(ctx, "chan1", "chan2")
+		require.EqualValues(t, 1, receiveType(t, pubsub, &redis.Subscription{}).Count)
+		require.EqualValues(t, 2, receiveType(t, pubsub, &redis.Subscription{}).Count)
+
+		require.EqualValues(t, 1, rdb.Publish(ctx, "chan1", "hello").Val())
+		require.EqualValues(t, 1, rdb.Publish(ctx, "chan2", "world").Val())
+
+		msg := receiveType(t, pubsub, &redis.Message{})
+		require.Equal(t, "chan1", msg.Channel)
+		require.Equal(t, "hello", msg.Payload)
+		msg = receiveType(t, pubsub, &redis.Message{})
+		require.Equal(t, "chan2", msg.Channel)
+		require.Equal(t, "world", msg.Payload)
+
+		// unsubscribe from one of the channels
+		require.NoError(t, pubsub.Unsubscribe(ctx, "chan1"))
+		require.EqualValues(t, 0, rdb.Publish(ctx, "chan1", "hello").Val())
+		require.EqualValues(t, 1, rdb.Publish(ctx, "chan2", "world").Val())
+
+		// unsubscribe from the remaining channel
+		require.NoError(t, pubsub.Unsubscribe(ctx, "chan2"))
+		require.EqualValues(t, 0, rdb.Publish(ctx, "chan1", "hello").Val())
+		require.EqualValues(t, 0, rdb.Publish(ctx, "chan2", "world").Val())
+	})
+
+	t.Run("PUBLISH/SUBSCRIBE with two clients", func(t *testing.T) {
+		c1 := srv.NewClient()
+		defer func() { require.NoError(t, c1.Close()) }()
+		c2 := srv.NewClient()
+		defer func() { require.NoError(t, c2.Close()) }()
+
+		p1 := c1.Subscribe(ctx, "chan1")
+		p2 := c2.Subscribe(ctx, "chan1")
+		require.EqualValues(t, 1, receiveType(t, p1, &redis.Subscription{}).Count)
+		require.EqualValues(t, 1, receiveType(t, p2, &redis.Subscription{}).Count)
+
+		require.EqualValues(t, 2, rdb.Publish(ctx, "chan1", "hello").Val())
+		msg := receiveType(t, p1, &redis.Message{})
+		require.Equal(t, "chan1", msg.Channel)
+		require.Equal(t, "hello", msg.Payload)
+		msg = receiveType(t, p2, &redis.Message{})
+		require.Equal(t, "chan1", msg.Channel)
+		require.Equal(t, "hello", msg.Payload)
+	})
+
+	t.Run("PUBLISH/SUBSCRIBE after UNSUBSCRIBE without arguments", func(t *testing.T) {
+		pubsub := rdb.Subscribe(ctx, "chan1", "chan2", "chan3")
+		require.EqualValues(t, 1, receiveType(t, pubsub, &redis.Subscription{}).Count)
+		require.EqualValues(t, 2, receiveType(t, pubsub, &redis.Subscription{}).Count)
+		require.EqualValues(t, 3, receiveType(t, pubsub, &redis.Subscription{}).Count)
+
+		require.NoError(t, pubsub.Unsubscribe(ctx))

Review Comment:
   @tisonkun looks like you need to add channels here - "chan1", "chan2", "chan3". Not sure why the test is not failing earlier though and you still receive `unsubscribe` events... Perhaps some magic by kvrocks when the list of channels is empty? I wonder if Redis behaves the same.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-kvrocks] tisonkun commented on a diff in pull request #982: Move TCL test unit/pubsub to Go case

Posted by GitBox <gi...@apache.org>.
tisonkun commented on code in PR #982:
URL: https://github.com/apache/incubator-kvrocks/pull/982#discussion_r994561467


##########
tests/gocase/unit/pubsub/pubsub_test.go:
##########
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package pubsub
+
+import (
+	"context"
+	"fmt"
+	"testing"
+
+	"github.com/apache/incubator-kvrocks/tests/gocase/util"
+	"github.com/go-redis/redis/v9"
+	"github.com/stretchr/testify/require"
+)
+
+func receiveType[T any](t *testing.T, pubsub *redis.PubSub, typ T) T {
+	msg, err := pubsub.Receive(context.Background())
+	require.NoError(t, err)
+	require.IsType(t, typ, msg)
+	return msg.(T)
+}
+
+func TestPubSub(t *testing.T) {
+	srv := util.StartServer(t, map[string]string{})
+	defer srv.Close()
+
+	ctx := context.Background()
+	rdb := srv.NewClient()
+	defer func() { require.NoError(t, rdb.Close()) }()
+
+	t.Run("PUBLISH/SUBSCRIBE PING", func(t *testing.T) {
+		pubsub := rdb.Subscribe(ctx, "somechannel")
+		receiveType(t, pubsub, &redis.Subscription{})
+		require.NoError(t, pubsub.Ping(ctx))
+		require.NoError(t, pubsub.Ping(ctx))
+		require.NoError(t, pubsub.Unsubscribe(ctx, "somechannel"))
+		require.Equal(t, "PONG", rdb.Ping(ctx).Val())
+		receiveType(t, pubsub, &redis.Pong{})
+		receiveType(t, pubsub, &redis.Pong{})
+	})
+
+	t.Run("PUBLISH/SUBSCRIBE basics", func(t *testing.T) {
+		// subscribe to two channels
+		pubsub := rdb.Subscribe(ctx, "chan1", "chan2")
+		require.EqualValues(t, 1, receiveType(t, pubsub, &redis.Subscription{}).Count)
+		require.EqualValues(t, 2, receiveType(t, pubsub, &redis.Subscription{}).Count)
+
+		require.EqualValues(t, 1, rdb.Publish(ctx, "chan1", "hello").Val())
+		require.EqualValues(t, 1, rdb.Publish(ctx, "chan2", "world").Val())
+
+		msg := receiveType(t, pubsub, &redis.Message{})
+		require.Equal(t, "chan1", msg.Channel)
+		require.Equal(t, "hello", msg.Payload)
+		msg = receiveType(t, pubsub, &redis.Message{})
+		require.Equal(t, "chan2", msg.Channel)
+		require.Equal(t, "world", msg.Payload)
+
+		// unsubscribe from one of the channels
+		require.NoError(t, pubsub.Unsubscribe(ctx, "chan1"))
+		require.EqualValues(t, 0, rdb.Publish(ctx, "chan1", "hello").Val())
+		require.EqualValues(t, 1, rdb.Publish(ctx, "chan2", "world").Val())
+
+		// unsubscribe from the remaining channel
+		require.NoError(t, pubsub.Unsubscribe(ctx, "chan2"))
+		require.EqualValues(t, 0, rdb.Publish(ctx, "chan1", "hello").Val())
+		require.EqualValues(t, 0, rdb.Publish(ctx, "chan2", "world").Val())
+	})
+
+	t.Run("PUBLISH/SUBSCRIBE with two clients", func(t *testing.T) {
+		c1 := srv.NewClient()
+		defer func() { require.NoError(t, c1.Close()) }()
+		c2 := srv.NewClient()
+		defer func() { require.NoError(t, c2.Close()) }()
+
+		p1 := c1.Subscribe(ctx, "chan1")
+		p2 := c2.Subscribe(ctx, "chan1")
+		require.EqualValues(t, 1, receiveType(t, p1, &redis.Subscription{}).Count)
+		require.EqualValues(t, 1, receiveType(t, p2, &redis.Subscription{}).Count)
+
+		require.EqualValues(t, 2, rdb.Publish(ctx, "chan1", "hello").Val())
+		msg := receiveType(t, p1, &redis.Message{})
+		require.Equal(t, "chan1", msg.Channel)
+		require.Equal(t, "hello", msg.Payload)
+		msg = receiveType(t, p2, &redis.Message{})
+		require.Equal(t, "chan1", msg.Channel)
+		require.Equal(t, "hello", msg.Payload)
+	})
+
+	t.Run("PUBLISH/SUBSCRIBE after UNSUBSCRIBE without arguments", func(t *testing.T) {
+		pubsub := rdb.Subscribe(ctx, "chan1", "chan2", "chan3")
+		require.EqualValues(t, 1, receiveType(t, pubsub, &redis.Subscription{}).Count)
+		require.EqualValues(t, 2, receiveType(t, pubsub, &redis.Subscription{}).Count)
+		require.EqualValues(t, 3, receiveType(t, pubsub, &redis.Subscription{}).Count)
+
+		require.NoError(t, pubsub.Unsubscribe(ctx))

Review Comment:
   We may try to fail fast or reduce share state...But pubsub state are on the server side so the only approach is start a new server for each case?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-kvrocks] tisonkun commented on a diff in pull request #982: Move TCL test unit/pubsub to Go case

Posted by GitBox <gi...@apache.org>.
tisonkun commented on code in PR #982:
URL: https://github.com/apache/incubator-kvrocks/pull/982#discussion_r994558756


##########
tests/gocase/unit/pubsub/pubsub_test.go:
##########
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package pubsub
+
+import (
+	"context"
+	"fmt"
+	"testing"
+
+	"github.com/apache/incubator-kvrocks/tests/gocase/util"
+	"github.com/go-redis/redis/v9"
+	"github.com/stretchr/testify/require"
+)
+
+func receiveType[T any](t *testing.T, pubsub *redis.PubSub, typ T) T {
+	msg, err := pubsub.Receive(context.Background())
+	require.NoError(t, err)
+	require.IsType(t, typ, msg)
+	return msg.(T)
+}
+
+func TestPubSub(t *testing.T) {
+	srv := util.StartServer(t, map[string]string{})
+	defer srv.Close()
+
+	ctx := context.Background()
+	rdb := srv.NewClient()
+	defer func() { require.NoError(t, rdb.Close()) }()
+
+	t.Run("PUBLISH/SUBSCRIBE PING", func(t *testing.T) {
+		pubsub := rdb.Subscribe(ctx, "somechannel")
+		receiveType(t, pubsub, &redis.Subscription{})
+		require.NoError(t, pubsub.Ping(ctx))
+		require.NoError(t, pubsub.Ping(ctx))
+		require.NoError(t, pubsub.Unsubscribe(ctx, "somechannel"))
+		require.Equal(t, "PONG", rdb.Ping(ctx).Val())
+		receiveType(t, pubsub, &redis.Pong{})
+		receiveType(t, pubsub, &redis.Pong{})
+	})
+
+	t.Run("PUBLISH/SUBSCRIBE basics", func(t *testing.T) {
+		// subscribe to two channels
+		pubsub := rdb.Subscribe(ctx, "chan1", "chan2")
+		require.EqualValues(t, 1, receiveType(t, pubsub, &redis.Subscription{}).Count)
+		require.EqualValues(t, 2, receiveType(t, pubsub, &redis.Subscription{}).Count)
+
+		require.EqualValues(t, 1, rdb.Publish(ctx, "chan1", "hello").Val())
+		require.EqualValues(t, 1, rdb.Publish(ctx, "chan2", "world").Val())
+
+		msg := receiveType(t, pubsub, &redis.Message{})
+		require.Equal(t, "chan1", msg.Channel)
+		require.Equal(t, "hello", msg.Payload)
+		msg = receiveType(t, pubsub, &redis.Message{})
+		require.Equal(t, "chan2", msg.Channel)
+		require.Equal(t, "world", msg.Payload)
+
+		// unsubscribe from one of the channels
+		require.NoError(t, pubsub.Unsubscribe(ctx, "chan1"))
+		require.EqualValues(t, 0, rdb.Publish(ctx, "chan1", "hello").Val())
+		require.EqualValues(t, 1, rdb.Publish(ctx, "chan2", "world").Val())
+
+		// unsubscribe from the remaining channel
+		require.NoError(t, pubsub.Unsubscribe(ctx, "chan2"))
+		require.EqualValues(t, 0, rdb.Publish(ctx, "chan1", "hello").Val())
+		require.EqualValues(t, 0, rdb.Publish(ctx, "chan2", "world").Val())
+	})
+
+	t.Run("PUBLISH/SUBSCRIBE with two clients", func(t *testing.T) {
+		c1 := srv.NewClient()
+		defer func() { require.NoError(t, c1.Close()) }()
+		c2 := srv.NewClient()
+		defer func() { require.NoError(t, c2.Close()) }()
+
+		p1 := c1.Subscribe(ctx, "chan1")
+		p2 := c2.Subscribe(ctx, "chan1")
+		require.EqualValues(t, 1, receiveType(t, p1, &redis.Subscription{}).Count)
+		require.EqualValues(t, 1, receiveType(t, p2, &redis.Subscription{}).Count)
+
+		require.EqualValues(t, 2, rdb.Publish(ctx, "chan1", "hello").Val())
+		msg := receiveType(t, p1, &redis.Message{})
+		require.Equal(t, "chan1", msg.Channel)
+		require.Equal(t, "hello", msg.Payload)
+		msg = receiveType(t, p2, &redis.Message{})
+		require.Equal(t, "chan1", msg.Channel)
+		require.Equal(t, "hello", msg.Payload)
+	})
+
+	t.Run("PUBLISH/SUBSCRIBE after UNSUBSCRIBE without arguments", func(t *testing.T) {
+		pubsub := rdb.Subscribe(ctx, "chan1", "chan2", "chan3")
+		require.EqualValues(t, 1, receiveType(t, pubsub, &redis.Subscription{}).Count)
+		require.EqualValues(t, 2, receiveType(t, pubsub, &redis.Subscription{}).Count)
+		require.EqualValues(t, 3, receiveType(t, pubsub, &redis.Subscription{}).Count)
+
+		require.NoError(t, pubsub.Unsubscribe(ctx))

Review Comment:
   So now I get the point:
   
   Because we don't failed in L76, the "chan2" subscription will never be unsubscribed and leak.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-kvrocks] tisonkun commented on a diff in pull request #982: Move TCL test unit/pubsub to Go case

Posted by GitBox <gi...@apache.org>.
tisonkun commented on code in PR #982:
URL: https://github.com/apache/incubator-kvrocks/pull/982#discussion_r994561916


##########
tests/gocase/unit/pubsub/pubsub_test.go:
##########
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package pubsub
+
+import (
+	"context"
+	"fmt"
+	"testing"
+
+	"github.com/apache/incubator-kvrocks/tests/gocase/util"
+	"github.com/go-redis/redis/v9"
+	"github.com/stretchr/testify/require"
+)
+
+func receiveType[T any](t *testing.T, pubsub *redis.PubSub, typ T) T {
+	msg, err := pubsub.Receive(context.Background())
+	require.NoError(t, err)
+	require.IsType(t, typ, msg)
+	return msg.(T)
+}
+
+func TestPubSub(t *testing.T) {
+	srv := util.StartServer(t, map[string]string{})
+	defer srv.Close()
+
+	ctx := context.Background()
+	rdb := srv.NewClient()
+	defer func() { require.NoError(t, rdb.Close()) }()
+
+	t.Run("PUBLISH/SUBSCRIBE PING", func(t *testing.T) {
+		pubsub := rdb.Subscribe(ctx, "somechannel")
+		receiveType(t, pubsub, &redis.Subscription{})
+		require.NoError(t, pubsub.Ping(ctx))
+		require.NoError(t, pubsub.Ping(ctx))
+		require.NoError(t, pubsub.Unsubscribe(ctx, "somechannel"))
+		require.Equal(t, "PONG", rdb.Ping(ctx).Val())
+		receiveType(t, pubsub, &redis.Pong{})
+		receiveType(t, pubsub, &redis.Pong{})
+	})
+
+	t.Run("PUBLISH/SUBSCRIBE basics", func(t *testing.T) {
+		// subscribe to two channels
+		pubsub := rdb.Subscribe(ctx, "chan1", "chan2")
+		require.EqualValues(t, 1, receiveType(t, pubsub, &redis.Subscription{}).Count)
+		require.EqualValues(t, 2, receiveType(t, pubsub, &redis.Subscription{}).Count)
+
+		require.EqualValues(t, 1, rdb.Publish(ctx, "chan1", "hello").Val())
+		require.EqualValues(t, 1, rdb.Publish(ctx, "chan2", "world").Val())
+
+		msg := receiveType(t, pubsub, &redis.Message{})
+		require.Equal(t, "chan1", msg.Channel)
+		require.Equal(t, "hello", msg.Payload)
+		msg = receiveType(t, pubsub, &redis.Message{})
+		require.Equal(t, "chan2", msg.Channel)
+		require.Equal(t, "world", msg.Payload)
+
+		// unsubscribe from one of the channels
+		require.NoError(t, pubsub.Unsubscribe(ctx, "chan1"))
+		require.EqualValues(t, 0, rdb.Publish(ctx, "chan1", "hello").Val())
+		require.EqualValues(t, 1, rdb.Publish(ctx, "chan2", "world").Val())
+
+		// unsubscribe from the remaining channel
+		require.NoError(t, pubsub.Unsubscribe(ctx, "chan2"))
+		require.EqualValues(t, 0, rdb.Publish(ctx, "chan1", "hello").Val())
+		require.EqualValues(t, 0, rdb.Publish(ctx, "chan2", "world").Val())
+	})
+
+	t.Run("PUBLISH/SUBSCRIBE with two clients", func(t *testing.T) {
+		c1 := srv.NewClient()
+		defer func() { require.NoError(t, c1.Close()) }()
+		c2 := srv.NewClient()
+		defer func() { require.NoError(t, c2.Close()) }()
+
+		p1 := c1.Subscribe(ctx, "chan1")
+		p2 := c2.Subscribe(ctx, "chan1")
+		require.EqualValues(t, 1, receiveType(t, p1, &redis.Subscription{}).Count)
+		require.EqualValues(t, 1, receiveType(t, p2, &redis.Subscription{}).Count)
+
+		require.EqualValues(t, 2, rdb.Publish(ctx, "chan1", "hello").Val())
+		msg := receiveType(t, p1, &redis.Message{})
+		require.Equal(t, "chan1", msg.Channel)
+		require.Equal(t, "hello", msg.Payload)
+		msg = receiveType(t, p2, &redis.Message{})
+		require.Equal(t, "chan1", msg.Channel)
+		require.Equal(t, "hello", msg.Payload)
+	})
+
+	t.Run("PUBLISH/SUBSCRIBE after UNSUBSCRIBE without arguments", func(t *testing.T) {
+		pubsub := rdb.Subscribe(ctx, "chan1", "chan2", "chan3")
+		require.EqualValues(t, 1, receiveType(t, pubsub, &redis.Subscription{}).Count)
+		require.EqualValues(t, 2, receiveType(t, pubsub, &redis.Subscription{}).Count)
+		require.EqualValues(t, 3, receiveType(t, pubsub, &redis.Subscription{}).Count)
+
+		require.NoError(t, pubsub.Unsubscribe(ctx))

Review Comment:
   @vmihailenco thank you! Let me think of it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-kvrocks] tisonkun commented on a diff in pull request #982: Move TCL test unit/pubsub to Go case

Posted by GitBox <gi...@apache.org>.
tisonkun commented on code in PR #982:
URL: https://github.com/apache/incubator-kvrocks/pull/982#discussion_r994409836


##########
tests/gocase/unit/pubsub/pubsub_test.go:
##########
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package pubsub
+
+import (
+	"context"
+	"fmt"
+	"testing"
+
+	"github.com/apache/incubator-kvrocks/tests/gocase/util"
+	"github.com/go-redis/redis/v9"
+	"github.com/stretchr/testify/require"
+)
+
+func receiveType[T any](t *testing.T, pubsub *redis.PubSub, typ T) T {
+	msg, err := pubsub.Receive(context.Background())
+	require.NoError(t, err)
+	require.IsType(t, typ, msg)
+	return msg.(T)
+}
+
+func TestPubSub(t *testing.T) {
+	srv := util.StartServer(t, map[string]string{})
+	defer srv.Close()
+
+	ctx := context.Background()
+	rdb := srv.NewClient()
+	defer func() { require.NoError(t, rdb.Close()) }()
+
+	t.Run("PUBLISH/SUBSCRIBE PING", func(t *testing.T) {
+		pubsub := rdb.Subscribe(ctx, "somechannel")
+		receiveType(t, pubsub, &redis.Subscription{})
+		require.NoError(t, pubsub.Ping(ctx))
+		require.NoError(t, pubsub.Ping(ctx))
+		require.NoError(t, pubsub.Unsubscribe(ctx, "somechannel"))
+		require.Equal(t, "PONG", rdb.Ping(ctx).Val())
+		receiveType(t, pubsub, &redis.Pong{})
+		receiveType(t, pubsub, &redis.Pong{})
+	})
+
+	t.Run("PUBLISH/SUBSCRIBE basics", func(t *testing.T) {
+		// subscribe to two channels
+		pubsub := rdb.Subscribe(ctx, "chan1", "chan2")
+		require.EqualValues(t, 1, receiveType(t, pubsub, &redis.Subscription{}).Count)
+		require.EqualValues(t, 2, receiveType(t, pubsub, &redis.Subscription{}).Count)
+
+		require.EqualValues(t, 1, rdb.Publish(ctx, "chan1", "hello").Val())
+		require.EqualValues(t, 1, rdb.Publish(ctx, "chan2", "world").Val())
+
+		msg := receiveType(t, pubsub, &redis.Message{})
+		require.Equal(t, "chan1", msg.Channel)
+		require.Equal(t, "hello", msg.Payload)
+		msg = receiveType(t, pubsub, &redis.Message{})
+		require.Equal(t, "chan2", msg.Channel)
+		require.Equal(t, "world", msg.Payload)
+
+		// unsubscribe from one of the channels
+		require.NoError(t, pubsub.Unsubscribe(ctx, "chan1"))
+		require.EqualValues(t, 0, rdb.Publish(ctx, "chan1", "hello").Val())
+		require.EqualValues(t, 1, rdb.Publish(ctx, "chan2", "world").Val())
+
+		// unsubscribe from the remaining channel
+		require.NoError(t, pubsub.Unsubscribe(ctx, "chan2"))
+		require.EqualValues(t, 0, rdb.Publish(ctx, "chan1", "hello").Val())
+		require.EqualValues(t, 0, rdb.Publish(ctx, "chan2", "world").Val())
+	})
+
+	t.Run("PUBLISH/SUBSCRIBE with two clients", func(t *testing.T) {
+		c1 := srv.NewClient()
+		defer func() { require.NoError(t, c1.Close()) }()
+		c2 := srv.NewClient()
+		defer func() { require.NoError(t, c2.Close()) }()
+
+		p1 := c1.Subscribe(ctx, "chan1")
+		p2 := c2.Subscribe(ctx, "chan1")
+		require.EqualValues(t, 1, receiveType(t, p1, &redis.Subscription{}).Count)
+		require.EqualValues(t, 1, receiveType(t, p2, &redis.Subscription{}).Count)
+
+		require.EqualValues(t, 2, rdb.Publish(ctx, "chan1", "hello").Val())
+		msg := receiveType(t, p1, &redis.Message{})
+		require.Equal(t, "chan1", msg.Channel)
+		require.Equal(t, "hello", msg.Payload)
+		msg = receiveType(t, p2, &redis.Message{})
+		require.Equal(t, "chan1", msg.Channel)
+		require.Equal(t, "hello", msg.Payload)
+	})
+
+	t.Run("PUBLISH/SUBSCRIBE after UNSUBSCRIBE without arguments", func(t *testing.T) {
+		pubsub := rdb.Subscribe(ctx, "chan1", "chan2", "chan3")
+		require.EqualValues(t, 1, receiveType(t, pubsub, &redis.Subscription{}).Count)
+		require.EqualValues(t, 2, receiveType(t, pubsub, &redis.Subscription{}).Count)
+		require.EqualValues(t, 3, receiveType(t, pubsub, &redis.Subscription{}).Count)
+
+		require.NoError(t, pubsub.Unsubscribe(ctx))
+		require.EqualValues(t, &redis.Subscription{
+			Kind:    "unsubscribe",
+			Channel: "chan1",
+			Count:   2,
+		}, receiveType(t, pubsub, &redis.Subscription{}))
+		require.EqualValues(t, &redis.Subscription{
+			Kind:    "unsubscribe",
+			Channel: "chan2",
+			Count:   1,
+		}, receiveType(t, pubsub, &redis.Subscription{}))
+		require.EqualValues(t, &redis.Subscription{
+			Kind:    "unsubscribe",
+			Channel: "chan3",
+			Count:   0,
+		}, receiveType(t, pubsub, &redis.Subscription{}))
+
+		require.EqualValues(t, 0, rdb.Publish(ctx, "chan1", "hello").Val())
+		require.EqualValues(t, 0, rdb.Publish(ctx, "chan2", "hello").Val())

Review Comment:
   ```
       --- FAIL: TestPubSub/PUBLISH/SUBSCRIBE_after_UNSUBSCRIBE_without_arguments (0.00s)
           pubsub_test.go:129: 
               	Error Trace:	/home/runner/work/incubator-kvrocks/incubator-kvrocks/tests/gocase/unit/pubsub/pubsub_test.go:129
               	Error:      	Not equal: 
               	            	expected: int(0)
               	            	actual  : int64(1)
               	Test:       	TestPubSub/PUBLISH/SUBSCRIBE_after_UNSUBSCRIBE_without_arguments
   ```
   
   @vmihailenco do you have ideas on this failure? Why we can still publish to a channel event if no one is subscribing it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-kvrocks] tisonkun commented on a diff in pull request #982: Move TCL test unit/pubsub to Go case

Posted by GitBox <gi...@apache.org>.
tisonkun commented on code in PR #982:
URL: https://github.com/apache/incubator-kvrocks/pull/982#discussion_r994552835


##########
tests/gocase/unit/pubsub/pubsub_test.go:
##########
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package pubsub
+
+import (
+	"context"
+	"fmt"
+	"testing"
+
+	"github.com/apache/incubator-kvrocks/tests/gocase/util"
+	"github.com/go-redis/redis/v9"
+	"github.com/stretchr/testify/require"
+)
+
+func receiveType[T any](t *testing.T, pubsub *redis.PubSub, typ T) T {
+	msg, err := pubsub.Receive(context.Background())
+	require.NoError(t, err)
+	require.IsType(t, typ, msg)
+	return msg.(T)
+}
+
+func TestPubSub(t *testing.T) {
+	srv := util.StartServer(t, map[string]string{})
+	defer srv.Close()
+
+	ctx := context.Background()
+	rdb := srv.NewClient()
+	defer func() { require.NoError(t, rdb.Close()) }()
+
+	t.Run("PUBLISH/SUBSCRIBE PING", func(t *testing.T) {
+		pubsub := rdb.Subscribe(ctx, "somechannel")
+		receiveType(t, pubsub, &redis.Subscription{})
+		require.NoError(t, pubsub.Ping(ctx))
+		require.NoError(t, pubsub.Ping(ctx))
+		require.NoError(t, pubsub.Unsubscribe(ctx, "somechannel"))
+		require.Equal(t, "PONG", rdb.Ping(ctx).Val())
+		receiveType(t, pubsub, &redis.Pong{})
+		receiveType(t, pubsub, &redis.Pong{})
+	})
+
+	t.Run("PUBLISH/SUBSCRIBE basics", func(t *testing.T) {
+		// subscribe to two channels
+		pubsub := rdb.Subscribe(ctx, "chan1", "chan2")
+		require.EqualValues(t, 1, receiveType(t, pubsub, &redis.Subscription{}).Count)
+		require.EqualValues(t, 2, receiveType(t, pubsub, &redis.Subscription{}).Count)
+
+		require.EqualValues(t, 1, rdb.Publish(ctx, "chan1", "hello").Val())
+		require.EqualValues(t, 1, rdb.Publish(ctx, "chan2", "world").Val())
+
+		msg := receiveType(t, pubsub, &redis.Message{})
+		require.Equal(t, "chan1", msg.Channel)
+		require.Equal(t, "hello", msg.Payload)
+		msg = receiveType(t, pubsub, &redis.Message{})
+		require.Equal(t, "chan2", msg.Channel)
+		require.Equal(t, "world", msg.Payload)
+
+		// unsubscribe from one of the channels
+		require.NoError(t, pubsub.Unsubscribe(ctx, "chan1"))
+		require.EqualValues(t, 0, rdb.Publish(ctx, "chan1", "hello").Val())
+		require.EqualValues(t, 1, rdb.Publish(ctx, "chan2", "world").Val())
+
+		// unsubscribe from the remaining channel
+		require.NoError(t, pubsub.Unsubscribe(ctx, "chan2"))
+		require.EqualValues(t, 0, rdb.Publish(ctx, "chan1", "hello").Val())
+		require.EqualValues(t, 0, rdb.Publish(ctx, "chan2", "world").Val())
+	})
+
+	t.Run("PUBLISH/SUBSCRIBE with two clients", func(t *testing.T) {
+		c1 := srv.NewClient()
+		defer func() { require.NoError(t, c1.Close()) }()
+		c2 := srv.NewClient()
+		defer func() { require.NoError(t, c2.Close()) }()
+
+		p1 := c1.Subscribe(ctx, "chan1")
+		p2 := c2.Subscribe(ctx, "chan1")
+		require.EqualValues(t, 1, receiveType(t, p1, &redis.Subscription{}).Count)
+		require.EqualValues(t, 1, receiveType(t, p2, &redis.Subscription{}).Count)
+
+		require.EqualValues(t, 2, rdb.Publish(ctx, "chan1", "hello").Val())
+		msg := receiveType(t, p1, &redis.Message{})
+		require.Equal(t, "chan1", msg.Channel)
+		require.Equal(t, "hello", msg.Payload)
+		msg = receiveType(t, p2, &redis.Message{})
+		require.Equal(t, "chan1", msg.Channel)
+		require.Equal(t, "hello", msg.Payload)
+	})
+
+	t.Run("PUBLISH/SUBSCRIBE after UNSUBSCRIBE without arguments", func(t *testing.T) {
+		pubsub := rdb.Subscribe(ctx, "chan1", "chan2", "chan3")
+		require.EqualValues(t, 1, receiveType(t, pubsub, &redis.Subscription{}).Count)
+		require.EqualValues(t, 2, receiveType(t, pubsub, &redis.Subscription{}).Count)
+		require.EqualValues(t, 3, receiveType(t, pubsub, &redis.Subscription{}).Count)
+
+		require.NoError(t, pubsub.Unsubscribe(ctx))

Review Comment:
   Interesting. I can reproduce it locally and when I rename the channel name from "chan2" to "xx" the error vanished.
   
   ```diff
   diff --git a/tests/gocase/unit/pubsub/pubsub_test.go b/tests/gocase/unit/pubsub/pubsub_test.go
   index b48b10a..97b807b 100644
   --- a/tests/gocase/unit/pubsub/pubsub_test.go
   +++ b/tests/gocase/unit/pubsub/pubsub_test.go
   @@ -73,11 +73,26 @@ func TestPubSub(t *testing.T) {
    
    		// unsubscribe from one of the channels
    		require.NoError(t, pubsub.Unsubscribe(ctx, "chan1"))
   +		require.EqualValues(t, &redis.Subscription{
   +			Kind:    "unsubscribe",
   +			Channel: "chan1",
   +			Count:   1,
   +		}, receiveType(t, pubsub, &redis.Subscription{}))
    		require.EqualValues(t, 0, rdb.Publish(ctx, "chan1", "hello").Val())
    		require.EqualValues(t, 1, rdb.Publish(ctx, "chan2", "world").Val())
    
   +		// drain incoming messages
   +		// TODO: we should receive once here, but this bug cause the below unstable test steady to fail.
   +		receiveType(t, pubsub, &redis.Message{})
   +		receiveType(t, pubsub, &redis.Message{})
   +
    		// unsubscribe from the remaining channel
    		require.NoError(t, pubsub.Unsubscribe(ctx, "chan2"))
   +		require.EqualValues(t, &redis.Subscription{
   +			Kind:    "unsubscribe",
   +			Channel: "chan2",
   +			Count:   0,
   +		}, receiveType(t, pubsub, &redis.Subscription{}))
    		require.EqualValues(t, 0, rdb.Publish(ctx, "chan1", "hello").Val())
    		require.EqualValues(t, 0, rdb.Publish(ctx, "chan2", "world").Val())
    	})
   @@ -103,7 +118,7 @@ func TestPubSub(t *testing.T) {
    	})
    
    	t.Run("PUBLISH/SUBSCRIBE after UNSUBSCRIBE without arguments", func(t *testing.T) {
   -		pubsub := rdb.Subscribe(ctx, "chan1", "chan2", "chan3")
   +		pubsub := rdb.Subscribe(ctx, "chan1", "xx", "chan3")
    		require.EqualValues(t, 1, receiveType(t, pubsub, &redis.Subscription{}).Count)
    		require.EqualValues(t, 2, receiveType(t, pubsub, &redis.Subscription{}).Count)
    		require.EqualValues(t, 3, receiveType(t, pubsub, &redis.Subscription{}).Count)
   @@ -116,7 +131,7 @@ func TestPubSub(t *testing.T) {
    		}, receiveType(t, pubsub, &redis.Subscription{}))
    		require.EqualValues(t, &redis.Subscription{
    			Kind:    "unsubscribe",
   -			Channel: "chan2",
   +			Channel: "xx",
    			Count:   1,
    		}, receiveType(t, pubsub, &redis.Subscription{}))
    		require.EqualValues(t, &redis.Subscription{
   @@ -126,7 +141,7 @@ func TestPubSub(t *testing.T) {
    		}, receiveType(t, pubsub, &redis.Subscription{}))
    
    		require.EqualValues(t, 0, rdb.Publish(ctx, "chan1", "hello").Val())
   -		require.EqualValues(t, 0, rdb.Publish(ctx, "chan2", "hello").Val())
   +		require.EqualValues(t, 0, rdb.Publish(ctx, "xx", "hello").Val())
    		require.EqualValues(t, 0, rdb.Publish(ctx, "chan3", "hello").Val())
    	})
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-kvrocks] tisonkun commented on a diff in pull request #982: Move TCL test unit/pubsub to Go case

Posted by GitBox <gi...@apache.org>.
tisonkun commented on code in PR #982:
URL: https://github.com/apache/incubator-kvrocks/pull/982#discussion_r994558756


##########
tests/gocase/unit/pubsub/pubsub_test.go:
##########
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package pubsub
+
+import (
+	"context"
+	"fmt"
+	"testing"
+
+	"github.com/apache/incubator-kvrocks/tests/gocase/util"
+	"github.com/go-redis/redis/v9"
+	"github.com/stretchr/testify/require"
+)
+
+func receiveType[T any](t *testing.T, pubsub *redis.PubSub, typ T) T {
+	msg, err := pubsub.Receive(context.Background())
+	require.NoError(t, err)
+	require.IsType(t, typ, msg)
+	return msg.(T)
+}
+
+func TestPubSub(t *testing.T) {
+	srv := util.StartServer(t, map[string]string{})
+	defer srv.Close()
+
+	ctx := context.Background()
+	rdb := srv.NewClient()
+	defer func() { require.NoError(t, rdb.Close()) }()
+
+	t.Run("PUBLISH/SUBSCRIBE PING", func(t *testing.T) {
+		pubsub := rdb.Subscribe(ctx, "somechannel")
+		receiveType(t, pubsub, &redis.Subscription{})
+		require.NoError(t, pubsub.Ping(ctx))
+		require.NoError(t, pubsub.Ping(ctx))
+		require.NoError(t, pubsub.Unsubscribe(ctx, "somechannel"))
+		require.Equal(t, "PONG", rdb.Ping(ctx).Val())
+		receiveType(t, pubsub, &redis.Pong{})
+		receiveType(t, pubsub, &redis.Pong{})
+	})
+
+	t.Run("PUBLISH/SUBSCRIBE basics", func(t *testing.T) {
+		// subscribe to two channels
+		pubsub := rdb.Subscribe(ctx, "chan1", "chan2")
+		require.EqualValues(t, 1, receiveType(t, pubsub, &redis.Subscription{}).Count)
+		require.EqualValues(t, 2, receiveType(t, pubsub, &redis.Subscription{}).Count)
+
+		require.EqualValues(t, 1, rdb.Publish(ctx, "chan1", "hello").Val())
+		require.EqualValues(t, 1, rdb.Publish(ctx, "chan2", "world").Val())
+
+		msg := receiveType(t, pubsub, &redis.Message{})
+		require.Equal(t, "chan1", msg.Channel)
+		require.Equal(t, "hello", msg.Payload)
+		msg = receiveType(t, pubsub, &redis.Message{})
+		require.Equal(t, "chan2", msg.Channel)
+		require.Equal(t, "world", msg.Payload)
+
+		// unsubscribe from one of the channels
+		require.NoError(t, pubsub.Unsubscribe(ctx, "chan1"))
+		require.EqualValues(t, 0, rdb.Publish(ctx, "chan1", "hello").Val())
+		require.EqualValues(t, 1, rdb.Publish(ctx, "chan2", "world").Val())
+
+		// unsubscribe from the remaining channel
+		require.NoError(t, pubsub.Unsubscribe(ctx, "chan2"))
+		require.EqualValues(t, 0, rdb.Publish(ctx, "chan1", "hello").Val())
+		require.EqualValues(t, 0, rdb.Publish(ctx, "chan2", "world").Val())
+	})
+
+	t.Run("PUBLISH/SUBSCRIBE with two clients", func(t *testing.T) {
+		c1 := srv.NewClient()
+		defer func() { require.NoError(t, c1.Close()) }()
+		c2 := srv.NewClient()
+		defer func() { require.NoError(t, c2.Close()) }()
+
+		p1 := c1.Subscribe(ctx, "chan1")
+		p2 := c2.Subscribe(ctx, "chan1")
+		require.EqualValues(t, 1, receiveType(t, p1, &redis.Subscription{}).Count)
+		require.EqualValues(t, 1, receiveType(t, p2, &redis.Subscription{}).Count)
+
+		require.EqualValues(t, 2, rdb.Publish(ctx, "chan1", "hello").Val())
+		msg := receiveType(t, p1, &redis.Message{})
+		require.Equal(t, "chan1", msg.Channel)
+		require.Equal(t, "hello", msg.Payload)
+		msg = receiveType(t, p2, &redis.Message{})
+		require.Equal(t, "chan1", msg.Channel)
+		require.Equal(t, "hello", msg.Payload)
+	})
+
+	t.Run("PUBLISH/SUBSCRIBE after UNSUBSCRIBE without arguments", func(t *testing.T) {
+		pubsub := rdb.Subscribe(ctx, "chan1", "chan2", "chan3")
+		require.EqualValues(t, 1, receiveType(t, pubsub, &redis.Subscription{}).Count)
+		require.EqualValues(t, 2, receiveType(t, pubsub, &redis.Subscription{}).Count)
+		require.EqualValues(t, 3, receiveType(t, pubsub, &redis.Subscription{}).Count)
+
+		require.NoError(t, pubsub.Unsubscribe(ctx))

Review Comment:
   So now I get the point:
   
   Because we don't wait for unsubscription in L76, the "chan2" subscription will never be unsubscribed and leak.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org