You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by vo...@apache.org on 2021/12/01 00:44:15 UTC

[rocketmq-client-go] branch master updated: [ISSUE #744] check multiple topics in one batch

This is an automated email from the ASF dual-hosted git repository.

vongosling pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 613ba90  [ISSUE #744] check multiple topics in one batch
613ba90 is described below

commit 613ba90f0c1fc04dccd8b84c966a4e9eb3ccda86
Author: Oreoreoreo <32...@users.noreply.github.com>
AuthorDate: Wed Dec 1 08:37:26 2021 +0800

    [ISSUE #744] check multiple topics in one batch
    
    Co-authored-by: maoruilei3120 <ma...@ipalfish.com>
---
 errors/errors.go          |  1 +
 producer/producer.go      |  8 ++++++++
 producer/producer_test.go | 34 ++++++++++++++++++++++++++++++++++
 3 files changed, 43 insertions(+)

diff --git a/errors/errors.go b/errors/errors.go
index 793fcda..43b49ca 100644
--- a/errors/errors.go
+++ b/errors/errors.go
@@ -47,4 +47,5 @@ var (
 	ErrMessageEmpty      = errors.New("message is nil")
 	ErrNotRunning        = errors.New("producer not started")
 	ErrPullConsumer      = errors.New("pull consumer has not supported")
+	ErrMultipleTopics    = errors.New("the topic of the messages in one batch should be the same")
 )
diff --git a/producer/producer.go b/producer/producer.go
index 2ab2445..226eedb 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -103,6 +103,14 @@ func (p *defaultProducer) checkMsg(msgs ...*primitive.Message) error {
 	if len(msgs[0].Topic) == 0 {
 		return errors2.ErrTopicEmpty
 	}
+
+	topic := msgs[0].Topic
+	for _, msg := range msgs {
+		if msg.Topic != topic {
+			return errors2.ErrMultipleTopics
+		}
+	}
+
 	return nil
 }
 
diff --git a/producer/producer_test.go b/producer/producer_test.go
index b6ec84d..a7c15c1 100644
--- a/producer/producer_test.go
+++ b/producer/producer_test.go
@@ -310,3 +310,37 @@ func TestSyncWithNamespace(t *testing.T) {
 	assert.Equal(t, expectedResp, resp)
 	assert.Equal(t, namespaceTopic, msg.Topic)
 }
+
+func TestBatchSendDifferentTopics(t *testing.T) {
+	p, _ := NewDefaultProducer(
+		WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
+		WithRetry(2),
+		WithQueueSelector(NewManualQueueSelector()),
+	)
+
+	ctrl := gomock.NewController(t)
+	defer ctrl.Finish()
+	client := internal.NewMockRMQClient(ctrl)
+	p.client = client
+
+	client.EXPECT().RegisterProducer(gomock.Any(), gomock.Any()).Return()
+	client.EXPECT().Start().Return()
+	err := p.Start()
+	assert.Nil(t, err)
+
+	ctx := context.Background()
+	msgToA := &primitive.Message{
+		Topic: "topic-A",
+		Body:  []byte("this is a message body"),
+	}
+
+	msgToB := &primitive.Message{
+		Topic: "topic-B",
+		Body:  []byte("this is a message body"),
+	}
+
+	resp, err := p.SendSync(ctx, []*primitive.Message{msgToA, msgToB}...)
+	assert.Nil(t, resp)
+	assert.NotNil(t, err)
+	assert.Equal(t, err, errors.ErrMultipleTopics)
+}