You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by hu...@apache.org on 2019/08/15 07:22:30 UTC
[rocketmq-client-go] branch native updated: update
defaultConsumer.client init AND add push_consumer testing (#156)
This is an automated email from the ASF dual-hosted git repository.
huzongtang pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/native by this push:
new e48203d update defaultConsumer.client init AND add push_consumer testing (#156)
e48203d is described below
commit e48203df5a6546e414657e470267f81199243828
Author: cloes <wa...@163.com>
AuthorDate: Thu Aug 15 15:22:26 2019 +0800
update defaultConsumer.client init AND add push_consumer testing (#156)
* update mockgen command line,in order to fix generate file "import cycle not allowed" problem
* update defaultConsumer.client init AND add push_consumer testing
* fix bug:UpdateTopicRouteInfo() now can be called anytime
* fix bug:fix defaultConsumer.client init problem
* fix bug:recommit after go fmt
---
consumer/consumer.go | 2 +-
consumer/push_consumer.go | 1 +
consumer/push_consumer_test.go | 55 ++++++++++++++++++++++++++++++++++++++++++
3 files changed, 57 insertions(+), 1 deletion(-)
diff --git a/consumer/consumer.go b/consumer/consumer.go
index 56836ae..b5bf22f 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -275,7 +275,7 @@ func (dc *defaultConsumer) start() error {
dc.subscriptionDataTable.Store(retryTopic, sub)
}
- dc.client = internal.GetOrNewRocketMQClient(dc.option.ClientOptions, nil)
+ //dc.client = internal.GetOrNewRocketMQClient(dc.option.ClientOptions, nil)
if dc.model == Clustering {
dc.option.ChangeInstanceNameToPID()
dc.storage = NewRemoteOffsetStore(dc.consumerGroup, dc.client)
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 88ab362..3b9747c 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -67,6 +67,7 @@ func NewPushConsumer(opts ...Option) (*pushConsumer, error) {
internal.RegisterNamsrv(srvs)
dc := &defaultConsumer{
+ client: internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil),
consumerGroup: defaultOpts.GroupName,
cType: _PushConsume,
state: internal.StateCreateJust,
diff --git a/consumer/push_consumer_test.go b/consumer/push_consumer_test.go
index f7bc454..2f4dbeb 100644
--- a/consumer/push_consumer_test.go
+++ b/consumer/push_consumer_test.go
@@ -16,3 +16,58 @@ limitations under the License.
*/
package consumer
+
+import (
+ "context"
+ "fmt"
+ "github.com/apache/rocketmq-client-go/internal"
+ "github.com/apache/rocketmq-client-go/primitive"
+ "github.com/golang/mock/gomock"
+ . "github.com/smartystreets/goconvey/convey"
+ "testing"
+)
+
+func mockB4Start(c *pushConsumer) {
+ c.topicSubscribeInfoTable.Store("TopicTest", []*primitive.MessageQueue{})
+}
+
+func TestStart(t *testing.T) {
+ Convey("test Start method", t, func() {
+ c, _ := NewPushConsumer(
+ WithGroupName("testGroup"),
+ WithNameServer([]string{"127.0.0.1:9876"}),
+ )
+
+ ctrl := gomock.NewController(t)
+ defer ctrl.Finish()
+
+ client := internal.NewMockRMQClient(ctrl)
+ c.client = client
+
+ err := c.Subscribe("TopicTest", MessageSelector{}, func(ctx context.Context,
+ msgs ...*primitive.MessageExt) (ConsumeResult, error) {
+ fmt.Printf("subscribe callback: %v \n", msgs)
+ return ConsumeSuccess, nil
+ })
+
+ client.EXPECT().ClientID().Return("127.0.0.1@DEFAULT")
+ client.EXPECT().Start().Return()
+ client.EXPECT().RegisterConsumer(gomock.Any(), gomock.Any()).Return(nil)
+ client.EXPECT().UpdateTopicRouteInfo().AnyTimes().Return()
+
+ Convey("test topic route info not found", func() {
+ client.EXPECT().Shutdown().Return()
+ err = c.Start()
+ So(err.Error(), ShouldContainSubstring, "route info not found")
+ })
+
+ Convey("test topic route info found", func() {
+ client.EXPECT().RebalanceImmediately().Return()
+ client.EXPECT().CheckClientInBroker().Return()
+ client.EXPECT().SendHeartbeatToAllBrokerWithLock().Return()
+ mockB4Start(c)
+ err = c.Start()
+ So(err, ShouldBeNil)
+ })
+ })
+}