You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by vo...@apache.org on 2021/12/01 00:44:15 UTC
[rocketmq-client-go] branch master updated: [ISSUE #744] check multiple topics in one batch
This is an automated email from the ASF dual-hosted git repository.
vongosling pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 613ba90 [ISSUE #744] check multiple topics in one batch
613ba90 is described below
commit 613ba90f0c1fc04dccd8b84c966a4e9eb3ccda86
Author: Oreoreoreo <32...@users.noreply.github.com>
AuthorDate: Wed Dec 1 08:37:26 2021 +0800
[ISSUE #744] check multiple topics in one batch
Co-authored-by: maoruilei3120 <ma...@ipalfish.com>
---
errors/errors.go | 1 +
producer/producer.go | 8 ++++++++
producer/producer_test.go | 34 ++++++++++++++++++++++++++++++++++
3 files changed, 43 insertions(+)
diff --git a/errors/errors.go b/errors/errors.go
index 793fcda..43b49ca 100644
--- a/errors/errors.go
+++ b/errors/errors.go
@@ -47,4 +47,5 @@ var (
ErrMessageEmpty = errors.New("message is nil")
ErrNotRunning = errors.New("producer not started")
ErrPullConsumer = errors.New("pull consumer has not supported")
+ ErrMultipleTopics = errors.New("the topic of the messages in one batch should be the same")
)
diff --git a/producer/producer.go b/producer/producer.go
index 2ab2445..226eedb 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -103,6 +103,14 @@ func (p *defaultProducer) checkMsg(msgs ...*primitive.Message) error {
if len(msgs[0].Topic) == 0 {
return errors2.ErrTopicEmpty
}
+
+ topic := msgs[0].Topic
+ for _, msg := range msgs {
+ if msg.Topic != topic {
+ return errors2.ErrMultipleTopics
+ }
+ }
+
return nil
}
diff --git a/producer/producer_test.go b/producer/producer_test.go
index b6ec84d..a7c15c1 100644
--- a/producer/producer_test.go
+++ b/producer/producer_test.go
@@ -310,3 +310,37 @@ func TestSyncWithNamespace(t *testing.T) {
assert.Equal(t, expectedResp, resp)
assert.Equal(t, namespaceTopic, msg.Topic)
}
+
+func TestBatchSendDifferentTopics(t *testing.T) {
+ p, _ := NewDefaultProducer(
+ WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
+ WithRetry(2),
+ WithQueueSelector(NewManualQueueSelector()),
+ )
+
+ ctrl := gomock.NewController(t)
+ defer ctrl.Finish()
+ client := internal.NewMockRMQClient(ctrl)
+ p.client = client
+
+ client.EXPECT().RegisterProducer(gomock.Any(), gomock.Any()).Return()
+ client.EXPECT().Start().Return()
+ err := p.Start()
+ assert.Nil(t, err)
+
+ ctx := context.Background()
+ msgToA := &primitive.Message{
+ Topic: "topic-A",
+ Body: []byte("this is a message body"),
+ }
+
+ msgToB := &primitive.Message{
+ Topic: "topic-B",
+ Body: []byte("this is a message body"),
+ }
+
+ resp, err := p.SendSync(ctx, []*primitive.Message{msgToA, msgToB}...)
+ assert.Nil(t, resp)
+ assert.NotNil(t, err)
+ assert.Equal(t, err, errors.ErrMultipleTopics)
+}