You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by hu...@apache.org on 2019/08/15 07:22:30 UTC

[rocketmq-client-go] branch native updated: update defaultConsumer.client init AND add push_consumer testing (#156)

This is an automated email from the ASF dual-hosted git repository.

huzongtang pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new e48203d  update defaultConsumer.client init AND add push_consumer testing (#156)
e48203d is described below

commit e48203df5a6546e414657e470267f81199243828
Author: cloes <wa...@163.com>
AuthorDate: Thu Aug 15 15:22:26 2019 +0800

    update defaultConsumer.client init AND add push_consumer testing (#156)
    
    * update mockgen command line,in order to fix generate file "import cycle not allowed" problem
    
    * update defaultConsumer.client init AND add push_consumer testing
    
    * fix bug:UpdateTopicRouteInfo() now can be called anytime
    
    * fix bug:fix defaultConsumer.client init problem
    
    * fix bug:recommit after go fmt
---
 consumer/consumer.go           |  2 +-
 consumer/push_consumer.go      |  1 +
 consumer/push_consumer_test.go | 55 ++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 57 insertions(+), 1 deletion(-)

diff --git a/consumer/consumer.go b/consumer/consumer.go
index 56836ae..b5bf22f 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -275,7 +275,7 @@ func (dc *defaultConsumer) start() error {
 		dc.subscriptionDataTable.Store(retryTopic, sub)
 	}
 
-	dc.client = internal.GetOrNewRocketMQClient(dc.option.ClientOptions, nil)
+	//dc.client = internal.GetOrNewRocketMQClient(dc.option.ClientOptions, nil)
 	if dc.model == Clustering {
 		dc.option.ChangeInstanceNameToPID()
 		dc.storage = NewRemoteOffsetStore(dc.consumerGroup, dc.client)
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 88ab362..3b9747c 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -67,6 +67,7 @@ func NewPushConsumer(opts ...Option) (*pushConsumer, error) {
 	internal.RegisterNamsrv(srvs)
 
 	dc := &defaultConsumer{
+		client:         internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil),
 		consumerGroup:  defaultOpts.GroupName,
 		cType:          _PushConsume,
 		state:          internal.StateCreateJust,
diff --git a/consumer/push_consumer_test.go b/consumer/push_consumer_test.go
index f7bc454..2f4dbeb 100644
--- a/consumer/push_consumer_test.go
+++ b/consumer/push_consumer_test.go
@@ -16,3 +16,58 @@ limitations under the License.
 */
 
 package consumer
+
+import (
+	"context"
+	"fmt"
+	"github.com/apache/rocketmq-client-go/internal"
+	"github.com/apache/rocketmq-client-go/primitive"
+	"github.com/golang/mock/gomock"
+	. "github.com/smartystreets/goconvey/convey"
+	"testing"
+)
+
+func mockB4Start(c *pushConsumer) {
+	c.topicSubscribeInfoTable.Store("TopicTest", []*primitive.MessageQueue{})
+}
+
+func TestStart(t *testing.T) {
+	Convey("test Start method", t, func() {
+		c, _ := NewPushConsumer(
+			WithGroupName("testGroup"),
+			WithNameServer([]string{"127.0.0.1:9876"}),
+		)
+
+		ctrl := gomock.NewController(t)
+		defer ctrl.Finish()
+
+		client := internal.NewMockRMQClient(ctrl)
+		c.client = client
+
+		err := c.Subscribe("TopicTest", MessageSelector{}, func(ctx context.Context,
+			msgs ...*primitive.MessageExt) (ConsumeResult, error) {
+			fmt.Printf("subscribe callback: %v \n", msgs)
+			return ConsumeSuccess, nil
+		})
+
+		client.EXPECT().ClientID().Return("127.0.0.1@DEFAULT")
+		client.EXPECT().Start().Return()
+		client.EXPECT().RegisterConsumer(gomock.Any(), gomock.Any()).Return(nil)
+		client.EXPECT().UpdateTopicRouteInfo().AnyTimes().Return()
+
+		Convey("test topic route info not found", func() {
+			client.EXPECT().Shutdown().Return()
+			err = c.Start()
+			So(err.Error(), ShouldContainSubstring, "route info not found")
+		})
+
+		Convey("test topic route info found", func() {
+			client.EXPECT().RebalanceImmediately().Return()
+			client.EXPECT().CheckClientInBroker().Return()
+			client.EXPECT().SendHeartbeatToAllBrokerWithLock().Return()
+			mockB4Start(c)
+			err = c.Start()
+			So(err, ShouldBeNil)
+		})
+	})
+}