You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2019/03/12 12:50:24 UTC

[rocketmq-client-go] branch native updated: Adding consumer of original (#39)

This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new cb6cc2d  Adding consumer of original (#39)
cb6cc2d is described below

commit cb6cc2d3e1647994b5955c7e05e1e6ada7e0387b
Author: wenfeng <sx...@gmail.com>
AuthorDate: Tue Mar 12 20:50:20 2019 +0800

    Adding consumer of original (#39)
    
    * Adding more implementations of common
    
    * Adding consuemr of original
    
    * fix compile error
---
 consumer.go                                   | 269 ++++++++++++++++++++++++++
 go.mod                                        |   1 -
 go.sum                                        |   2 -
 common/manager.go => kernel/client.go         | 121 ++++--------
 {common => kernel}/message.go                 |  15 +-
 kernel/model.go                               | 241 +++++++++++++++++++++++
 common/transaction.go => kernel/mq_version.go |  15 +-
 {common => kernel}/perm.go                    |  26 +--
 {common => kernel}/request.go                 |   5 +-
 {common => kernel}/response.go                |   2 +-
 {common => kernel}/route.go                   | 134 +++++++++----
 {common => kernel}/transaction.go             |   2 +-
 remote/client.go                              |  12 ++
 common/transaction.go => utils/helper.go      |  15 +-
 common/perm.go => utils/log.go                |  64 +++---
 15 files changed, 726 insertions(+), 198 deletions(-)

diff --git a/consumer.go b/consumer.go
new file mode 100644
index 0000000..896df74
--- /dev/null
+++ b/consumer.go
@@ -0,0 +1,269 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package rocketmq
+
+import (
+	"context"
+	"errors"
+	"fmt"
+	"github.com/apache/rocketmq-client-go/kernel"
+	"strconv"
+	"sync"
+	"time"
+)
+
+type Consumer interface {
+	Pull(topic, expression string, numbers int) (*kernel.PullResult, error)
+	SubscribeWithChan(topic, expression string) (chan *kernel.Message, error)
+	SubscribeWithFunc(topic, expression string, f func(msg *kernel.Message) ConsumeResult) error
+	ACK(msg *kernel.Message, result ConsumeResult)
+}
+
+type ConsumeResult int
+
+type ConsumerType int
+
+const (
+	Original ConsumerType = iota
+	Orderly
+	Transaction
+)
+
+type ConsumerConfig struct {
+	GroupName                  string
+	Model                      kernel.MessageModel
+	UnitMode                   bool
+	MaxReconsumeTimes          int
+	PullMessageTimeout         time.Duration
+	FromWhere                  kernel.ConsumeFromWhere
+	brokerSuspendMaxTimeMillis int64
+}
+
+func NewConsumer(config ConsumerConfig) Consumer {
+	return &defaultConsumer{
+		config: config,
+	}
+}
+
+type defaultConsumer struct {
+	state  kernel.ServiceState
+	config ConsumerConfig
+}
+
+func (c *defaultConsumer) Pull(topic, expression string, numbers int) (*kernel.PullResult, error) {
+	mq := getNextQueueOf(topic)
+
+	if mq == nil {
+		return nil, fmt.Errorf("prepard to pull topic: %s, but no queue is founded", topic)
+	}
+
+	data := getSubscriptionData(mq, expression)
+	result, err := c.pull(context.Background(), mq, data, c.nextOffsetOf(mq), numbers)
+
+	if err != nil {
+		return nil, err
+	}
+
+	processPullResult(mq, result, data)
+	return result, nil
+}
+
+// SubscribeWithChan ack manually
+func (c *defaultConsumer) SubscribeWithChan(topic, expression string) (chan *kernel.Message, error) {
+	return nil, nil
+}
+
+// SubscribeWithFunc ack automatic
+func (c *defaultConsumer) SubscribeWithFunc(topic, expression string,
+	f func(msg *kernel.Message) ConsumeResult) error {
+	return nil
+}
+
+func (c *defaultConsumer) ACK(msg *kernel.Message, result ConsumeResult) {
+
+}
+
+func (c *defaultConsumer) pull(ctx context.Context, mq *kernel.MessageQueue, data *kernel.SubscriptionData,
+	offset int64, numbers int) (*kernel.PullResult, error) {
+	err := c.makeSureStateOK()
+	if err != nil {
+		return nil, err
+	}
+
+	if mq == nil {
+		return nil, errors.New("MessageQueue is nil")
+	}
+
+	if offset < 0 {
+		return nil, errors.New("offset < 0")
+	}
+
+	if numbers <= 0 {
+		numbers = 1
+	}
+	c.subscriptionAutomatically(mq.Topic)
+
+	brokerResult := tryFindBroker(mq)
+	if brokerResult == nil {
+		return nil, fmt.Errorf("the broker %s does not exist", mq.BrokerName)
+	}
+
+	if (data.ExpType == kernel.TAG) && brokerResult.BrokerVersion < kernel.V4_1_0 {
+		return nil, fmt.Errorf("the broker [%s, %v] does not upgrade to support for filter message by %v",
+			mq.BrokerName, brokerResult.BrokerVersion, data.ExpType)
+	}
+
+	sysFlag := buildSysFlag(false, true, true, false)
+
+	if brokerResult.Slave {
+		sysFlag = clearCommitOffsetFlag(sysFlag)
+	}
+	pullRequest := &kernel.PullMessageRequest{
+		ConsumerGroup:        c.config.GroupName,
+		Topic:                mq.Topic,
+		QueueId:              int32(mq.QueueId),
+		QueueOffset:          offset,
+		MaxMsgNums:           int32(numbers),
+		SysFlag:              sysFlag,
+		CommitOffset:         0,
+		SuspendTimeoutMillis: c.config.brokerSuspendMaxTimeMillis,
+		SubExpression:        data.SubString,
+		ExpressionType:       string(data.ExpType),
+	}
+
+	if data.ExpType == kernel.TAG {
+		pullRequest.SubVersion = 0
+	} else {
+		pullRequest.SubVersion = data.SubVersion
+	}
+
+	// TODO computePullFromWhichFilterServer
+	return kernel.PullMessage(ctx, brokerResult.BrokerAddr, pullRequest)
+}
+
+func (c *defaultConsumer) makeSureStateOK() error {
+	if c.state != kernel.Running {
+		return fmt.Errorf("the consumer state is [%s], not running", c.state)
+	}
+	return nil
+}
+
+func (c *defaultConsumer) subscriptionAutomatically(topic string) {
+	// TODO
+}
+
+func (c *defaultConsumer) nextOffsetOf(queue *kernel.MessageQueue) int64 {
+	return 0
+}
+
+func toMessage(messageExts []*kernel.MessageExt) []*kernel.Message {
+	msgs := make([]*kernel.Message, 0)
+
+	return msgs
+}
+
+func processPullResult(mq *kernel.MessageQueue, result *kernel.PullResult, data *kernel.SubscriptionData) {
+	updatePullFromWhichNode(mq, result.SuggestWhichBrokerId)
+	switch result.Status {
+	case kernel.PullFound:
+		msgs := result.GetMessageExts()
+		msgListFilterAgain := msgs
+		if len(data.Tags) > 0 && data.ClassFilterMode {
+			msgListFilterAgain = make([]*kernel.MessageExt, len(msgs))
+			for _, msg := range msgs {
+				_, exist := data.Tags[msg.GetTags()]
+				if exist {
+					msgListFilterAgain = append(msgListFilterAgain, msg)
+				}
+			}
+		}
+
+		// TODO hook
+
+		for _, msg := range msgListFilterAgain {
+			traFlag, _ := strconv.ParseBool(msg.Properties[kernel.TransactionPrepared])
+			if traFlag {
+				msg.TransactionId = msg.Properties[kernel.UniqueClientMessageIdKeyIndex]
+			}
+
+			msg.Properties[kernel.MinOffset] = strconv.FormatInt(result.MinOffset, 10)
+			msg.Properties[kernel.MaxOffset] = strconv.FormatInt(result.MaxOffset, 10)
+		}
+
+		result.SetMessageExts(msgListFilterAgain)
+	}
+}
+
+func getSubscriptionData(mq *kernel.MessageQueue, exp string) *kernel.SubscriptionData {
+	return nil
+}
+
+func getNextQueueOf(topic string) *kernel.MessageQueue {
+	return nil
+}
+
+func buildSysFlag(commitOffset, suspend, subscription, classFilter bool) int32 {
+	var flag int32 = 0
+	if commitOffset {
+		flag |= 0x1 << 0
+	}
+
+	if suspend {
+		flag |= 0x1 << 1
+	}
+
+	if subscription {
+		flag |= 0x1 << 2
+	}
+
+	if classFilter {
+		flag |= 0x1 << 3
+	}
+
+	return flag
+}
+
+func clearCommitOffsetFlag(sysFlag int32) int32 {
+	return sysFlag & (^0x1 << 0)
+}
+
+func tryFindBroker(mq *kernel.MessageQueue) *kernel.FindBrokerResult {
+	result := kernel.FindBrokerAddressInSubscribe(mq.BrokerName, recalculatePullFromWhichNode(mq), false)
+
+	if result == nil {
+		kernel.UpdateTopicRouteInfo(mq.Topic)
+	}
+
+	return kernel.FindBrokerAddressInSubscribe(mq.BrokerName, recalculatePullFromWhichNode(mq), false)
+}
+
+var (
+	pullFromWhichNodeTable sync.Map
+)
+
+func updatePullFromWhichNode(mq *kernel.MessageQueue, brokerId int64) {
+	pullFromWhichNodeTable.Store(mq.HashCode(), brokerId)
+}
+
+func recalculatePullFromWhichNode(mq *kernel.MessageQueue) int64 {
+	v, exist := pullFromWhichNodeTable.Load(mq.HashCode())
+	if exist {
+		return v.(int64)
+	}
+	return kernel.MasterId
+}
diff --git a/go.mod b/go.mod
index 809e20a..fa910f6 100644
--- a/go.mod
+++ b/go.mod
@@ -5,7 +5,6 @@ go 1.12
 require (
 	github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc // indirect
 	github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf // indirect
-	github.com/pkg/errors v0.8.1
 	github.com/sirupsen/logrus v1.3.0
 	github.com/stretchr/testify v1.3.0
 	gopkg.in/alecthomas/kingpin.v2 v2.2.6
diff --git a/go.sum b/go.sum
index 62f4cad..0b0e0c1 100644
--- a/go.sum
+++ b/go.sum
@@ -7,8 +7,6 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
 github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
 github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
-github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
-github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
 github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
 github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
 github.com/sirupsen/logrus v1.3.0 h1:hI/7Q+DtNZ2kINb6qt/lS+IyXnHQe9e90POfeewL/ME=
diff --git a/common/manager.go b/kernel/client.go
similarity index 71%
rename from common/manager.go
rename to kernel/client.go
index 83e2934..b8b6eee 100644
--- a/common/manager.go
+++ b/kernel/client.go
@@ -15,12 +15,16 @@ See the License for the specific language governing permissions and
 limitations under the License.
 */
 
-package common
+package kernel
 
 import (
 	"context"
-	"fmt"
+	"errors"
 	"github.com/apache/rocketmq-client-go/remote"
+	"github.com/apache/rocketmq-client-go/utils"
+	"os"
+	"strconv"
+	"sync"
 	"time"
 )
 
@@ -29,6 +33,23 @@ const (
 	tranceOff            = "false"
 )
 
+var (
+	log                           = utils.RLog
+	namesrvAddrs                  = os.Getenv("rocketmq.namesrv.addr")
+	clientIP                      = utils.LocalIP()
+	instanceName                  = os.Getenv("rocketmq.client.name")
+	pollNameServerInterval        = 30 * time.Second
+	heartbeatBrokerInterval       = 30 * time.Second
+	persistConsumerOffsetInterval = 5 * time.Second
+	unitMode                      = false
+	vipChannelEnabled, _          = strconv.ParseBool(os.Getenv("com.rocketmq.sendMessageWithVIPChannel"))
+	clientID                      = clientIP + "@" + instanceName
+)
+
+var (
+	ErrServiceState = errors.New("service state is not Running, please check")
+)
+
 type InnerProducer interface {
 	PublishTopicList() []string
 	UpdateTopicPublishInfo(topic string, info *TopicPublishInfo)
@@ -51,8 +72,9 @@ type InnerConsumer interface {
 func SendMessageSync(ctx context.Context, brokerAddrs, brokerName string, request *SendMessageRequest,
 	msgs []*Message) (*SendResult, error) {
 	cmd := remote.NewRemotingCommand(SendBatchMessage, request, encodeMessages(msgs))
-	response, err := client.InvokeSync(brokerAddrs, cmd, 3*time.Second)
+	response, err := remote.InvokeSync(brokerAddrs, cmd, 3 * time.Second)
 	if err != nil {
+		log.Warningf("send messages with sync error: %v", err)
 		return nil, err
 	}
 
@@ -68,14 +90,13 @@ func SendMessageAsync(ctx context.Context, brokerAddrs, brokerName string, reque
 func SendMessageOneWay(ctx context.Context, brokerAddrs string, request *SendMessageRequest,
 	msgs []*Message) (*SendResult, error) {
 	cmd := remote.NewRemotingCommand(SendBatchMessage, request, encodeMessages(msgs))
-	err := client.InvokeOneWay(brokerAddrs, cmd)
+	err := remote.InvokeOneWay(brokerAddrs, cmd)
+	if err != nil {
+		log.Warningf("send messages with oneway error: %v", err)
+	}
 	return nil, err
 }
 
-func encodeMessages(message []*Message) []byte {
-	return nil
-}
-
 func processSendResponse(brokerName string, msgs []*Message, cmd *remote.RemotingCommand) *SendResult {
 	var status SendStatus
 	switch cmd.Code {
@@ -96,8 +117,7 @@ func processSendResponse(brokerName string, msgs []*Message, cmd *remote.Remotin
 
 	msgIDs := make([]string, 0)
 	for i := 0; i < len(msgs); i++ {
-		msgIDs = append(msgIDs, msgs[i].Properties[UniqueClientMessageIdKeyindex])
-
+		msgIDs = append(msgIDs, msgs[i].Properties[UniqueClientMessageIdKeyIndex])
 	}
 
 	regionId := cmd.ExtFields[MsgRegion]
@@ -153,77 +173,18 @@ func UpdateConsumerOffset(consumerGroup, topic string, queue int, offset int64)
 	return nil
 }
 
-//SendStatus message send result
-type SendStatus int
-
-const (
-	SendOK SendStatus = iota
-	SendFlushDiskTimeout
-	SendFlushSlaveTimeout
-	SendSlaveNotAvailable
-)
-
-// SendResult rocketmq send result
-type SendResult struct {
-	Status        SendStatus
-	MsgIDs        []string
-	MessageQueue  *MessageQueue
-	QueueOffset   int64
-	TransactionID string
-	OffsetMsgID   string
-	RegionID      string
-	TraceOn       bool
-}
-
-// SendResult send message result to string(detail result)
-func (result *SendResult) String() string {
-	return fmt.Sprintf("SendResult [sendStatus=%d, msgIds=%s, offsetMsgId=%s, queueOffset=%d, messageQueue=%s]",
-		result.Status, result.MsgIDs, result.OffsetMsgID, result.QueueOffset, result.MessageQueue.String())
-}
-
-// PullResult the pull result
-type PullResult struct {
-	NextBeginOffset int64
-	MinOffset       int64
-	MaxOffset       int64
-	Status          PullStatus
-	Messages        []*MessageExt
-}
-
-// PullStatus pull status
-type PullStatus int
+var (
+	// group -> InnerProducer
+	producerMap sync.Map
 
-// predefined pull status
-const (
-	PullFound PullStatus = iota
-	PullNoNewMsg
-	PullNoMatchedMsg
-	PullOffsetIllegal
-	PullBrokerTimeout
+	// group -> InnerConsumer
+	consumerMap sync.Map
 )
 
-// MessageQueue message queue
-type MessageQueue struct {
-	Topic      string `json:"topic"`
-	BrokerName string `json:"brokerName"`
-	QueueId    int    `json:"queueId"`
-}
-
-func (mq *MessageQueue) String() string {
-	return fmt.Sprintf("MessageQueue [topic=%s, brokerName=%s, queueId=%d]", mq.Topic, mq.BrokerName, mq.QueueId)
-}
-
 func CheckClientInBroker() {
 
 }
 
-func SendHeartbeatToAllBrokerWithLock() {
-
-}
-
-func UpdateTopicRouteInfoFromNameServer(topic string) {
-}
-
 func RegisterConsumer(group string, consumer InnerConsumer) {
 
 }
@@ -248,16 +209,10 @@ func SelectConsumer(group string) InnerConsumer {
 	return nil
 }
 
-func FindBrokerAddressInPublish(brokerName string) {
-
-}
-
-func FindBrokerAddressInSubscribe(brokerName string, brokerId int64, onlyThisBroker bool) *FindBrokerResult {
+func encodeMessages(message []*Message) []byte {
 	return nil
 }
 
-type FindBrokerResult struct {
-	brokerAddr    string
-	slave         bool
-	brokerVersion int32
+func sendHeartbeatToAllBroker() {
+
 }
diff --git a/common/message.go b/kernel/message.go
similarity index 94%
rename from common/message.go
rename to kernel/message.go
index 08ead01..d65c48f 100644
--- a/common/message.go
+++ b/kernel/message.go
@@ -15,7 +15,7 @@ See the License for the specific language governing permissions and
 limitations under the License.
 */
 
-package common
+package kernel
 
 import "fmt"
 
@@ -40,7 +40,7 @@ const (
 	ReconsumeTime                  = "RECONSUME_TIME"
 	MsgRegion                      = "MSG_REGION"
 	TraceSwitch                    = "TRACE_ON"
-	UniqueClientMessageIdKeyindex  = "UNIQ_KEY"
+	UniqueClientMessageIdKeyIndex  = "UNIQ_KEY"
 	MaxReconsumeTimes              = "MAX_RECONSUME_TIMES"
 	ConsumeStartTime               = "CONSUME_START_TIME"
 	TranscationPreparedQueueOffset = "TRAN_PREPARED_QUEUE_OFFSET"
@@ -69,9 +69,10 @@ func (msg *Message) String() string {
 		msg.Topic, string(msg.Body), msg.Flag, msg.Properties, msg.TransactionId)
 }
 
-func (msg *Message) SetTags(tags string) {
-	msg.Properties[tags] = tags
-}
+//
+//func (msg *Message) SetTags(tags string) {
+//	msg.Properties[tags] = tags
+//}
 
 func (msg *Message) PutProperty(key, value string) {
 	msg.Properties[key] = value
@@ -104,6 +105,10 @@ type MessageExt struct {
 	PreparedTransactionOffset int64
 }
 
+func (msgExt *MessageExt) GetTags() string {
+	return msgExt.Properties[Tags]
+}
+
 func (msgExt *MessageExt) String() string {
 	return fmt.Sprint("[Message=%s, MsgId=%s, QueueId=%d, StoreSize=%d, QueueOffset=%d, SysFlag=%d, "+
 		"BornTimestamp=%d, BornHost=%s, StoreTimestamp=%d, StoreHost=%s, CommitLogOffset=%d, BodyCRC=%d, "+
diff --git a/kernel/model.go b/kernel/model.go
new file mode 100644
index 0000000..9ee30cb
--- /dev/null
+++ b/kernel/model.go
@@ -0,0 +1,241 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package kernel
+
+import (
+	"fmt"
+	"github.com/apache/rocketmq-client-go/utils"
+)
+
+// SendStatus of message
+type SendStatus int
+
+const (
+	SendOK SendStatus = iota
+	SendFlushDiskTimeout
+	SendFlushSlaveTimeout
+	SendSlaveNotAvailable
+)
+
+// SendResult RocketMQ send result
+type SendResult struct {
+	Status        SendStatus
+	MsgIDs        []string
+	MessageQueue  *MessageQueue
+	QueueOffset   int64
+	TransactionID string
+	OffsetMsgID   string
+	RegionID      string
+	TraceOn       bool
+}
+
+// SendResult send message result to string(detail result)
+func (result *SendResult) String() string {
+	return fmt.Sprintf("SendResult [sendStatus=%d, msgIds=%s, offsetMsgId=%s, queueOffset=%d, messageQueue=%s]",
+		result.Status, result.MsgIDs, result.OffsetMsgID, result.QueueOffset, result.MessageQueue.String())
+}
+
+// PullStatus pull status
+type PullStatus int
+
+// predefined pull status
+const (
+	PullFound PullStatus = iota
+	PullNoNewMsg
+	PullNoMatchedMsg
+	PullOffsetIllegal
+	PullBrokerTimeout
+)
+
+// PullResult the pull result
+type PullResult struct {
+	NextBeginOffset      int64
+	MinOffset            int64
+	MaxOffset            int64
+	Status               PullStatus
+	SuggestWhichBrokerId int64
+	messageBinary        []byte
+	messageExts          []*MessageExt
+}
+
+func (result *PullResult) GetMessageExts() []*MessageExt {
+	if result.messageExts != nil && len(result.messageExts) > 0 {
+		return result.messageExts
+	}
+
+	return result.messageExts
+}
+
+func (result *PullResult) SetMessageExts(msgExts []*MessageExt) {
+	result.messageBinary = nil
+	result.messageExts = msgExts
+}
+
+func (result *PullResult) GetMessages() []*Message {
+	if result.messageExts == nil || len(result.messageExts) == 0 {
+		return make([]*Message, 0)
+	}
+	return toMessages(result.messageExts)
+}
+
+func toMessages(messageExts []*MessageExt) []*Message {
+	msgs := make([]*Message, 0)
+
+	return msgs
+}
+
+// MessageQueue message queue
+type MessageQueue struct {
+	Topic      string `json:"topic"`
+	BrokerName string `json:"brokerName"`
+	QueueId    int    `json:"queueId"`
+}
+
+func (mq *MessageQueue) String() string {
+	return fmt.Sprintf("MessageQueue [topic=%s, brokerName=%s, queueId=%d]", mq.Topic, mq.BrokerName, mq.QueueId)
+}
+
+func (mq *MessageQueue) HashCode() int {
+	result := 1
+	result = 31*result + utils.HashString(mq.BrokerName)
+	result = 31*result + mq.QueueId
+	result = 31*result + utils.HashString(mq.Topic)
+
+	return result
+}
+
+type FindBrokerResult struct {
+	BrokerAddr    string
+	Slave         bool
+	BrokerVersion int
+}
+
+type (
+	// groupName of producer
+	producerData string
+
+	consumeType string
+
+	MessageModel     int
+	ConsumeFromWhere int
+	ServiceState int
+)
+
+const (
+	ConsumeActively  = consumeType("PULL")
+	ConsumePassively = consumeType("PUSH")
+
+	BroadCasting = MessageModel(1)
+	Clustering   = MessageModel(2)
+
+	ConsumeFromLastOffset ConsumeFromWhere = iota
+	ConsumeFromFirstOffset
+	ConsumeFromTimestamp
+
+	CreateJust ServiceState = iota
+	Running
+	Shutdown
+)
+
+func (mode MessageModel) String() string {
+	switch mode {
+	case BroadCasting:
+		return "BroadCasting"
+	case Clustering:
+		return "Clustering"
+	default:
+		return "Unknown"
+	}
+}
+
+type ExpressionType string
+
+const (
+	/**
+	 * <ul>
+	 * Keywords:
+	 * <li>{@code AND, OR, NOT, BETWEEN, IN, TRUE, FALSE, IS, NULL}</li>
+	 * </ul>
+	 * <p/>
+	 * <ul>
+	 * Data type:
+	 * <li>Boolean, like: TRUE, FALSE</li>
+	 * <li>String, like: 'abc'</li>
+	 * <li>Decimal, like: 123</li>
+	 * <li>Float number, like: 3.1415</li>
+	 * </ul>
+	 * <p/>
+	 * <ul>
+	 * Grammar:
+	 * <li>{@code AND, OR}</li>
+	 * <li>{@code >, >=, <, <=, =}</li>
+	 * <li>{@code BETWEEN A AND B}, equals to {@code >=A AND <=B}</li>
+	 * <li>{@code NOT BETWEEN A AND B}, equals to {@code >B OR <A}</li>
+	 * <li>{@code IN ('a', 'b')}, equals to {@code ='a' OR ='b'}, this operation only support String type.</li>
+	 * <li>{@code IS NULL}, {@code IS NOT NULL}, check parameter whether is null, or not.</li>
+	 * <li>{@code =TRUE}, {@code =FALSE}, check parameter whether is true, or false.</li>
+	 * </ul>
+	 * <p/>
+	 * <p>
+	 * Example:
+	 * (a > 10 AND a < 100) OR (b IS NOT NULL AND b=TRUE)
+	 * </p>
+	 */
+	SQL92 = ExpressionType("SQL92")
+
+	/**
+	 * Only support or operation such as
+	 * "tag1 || tag2 || tag3", <br>
+	 * If null or * expression, meaning subscribe all.
+	 */
+	TAG = ExpressionType("TAG")
+)
+
+func IsTagType(exp string) bool {
+	if exp == "" || exp == "TAG" {
+		return true
+	}
+	return false
+}
+
+var SubAll = "*"
+
+type SubscriptionData struct {
+	ClassFilterMode bool
+	Topic           string
+	SubString       string
+	Tags            map[string]bool
+	Codes           map[int32]bool
+	SubVersion      int64
+	ExpType         ExpressionType
+}
+
+type consumerData struct {
+	groupName         string
+	cType             consumeType
+	messageModel      MessageModel
+	where             ConsumeFromWhere
+	subscriptionDatas []SubscriptionData
+	unitMode          bool
+}
+
+type heartbeatData struct {
+	clientId      string
+	producerDatas []producerData
+	consumerDatas []consumerData
+}
diff --git a/common/transaction.go b/kernel/mq_version.go
similarity index 67%
copy from common/transaction.go
copy to kernel/mq_version.go
index e526af5..e7d7f32 100644
--- a/common/transaction.go
+++ b/kernel/mq_version.go
@@ -2,20 +2,21 @@
 Licensed to the Apache Software Foundation (ASF) under one or more
 contributor license agreements.  See the NOTICE file distributed with
 this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
+The ASF licenses this file to You under the Apache License Version 2.0
 (the "License"); you may not use this file except in compliance with
 the License.  You may obtain a copy of the License at
 
     http://www.apache.org/licenses/LICENSE-2.0
 
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+Unless required by applicable law or agreed to in writing software
+distributed under the License is distributed on an "AS IS" BASIS
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND either express or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
 */
 
-package common
+package kernel
 
-type TransactionListener interface {
-}
+const (
+	V4_1_0 = 0
+)
diff --git a/common/perm.go b/kernel/perm.go
similarity index 54%
copy from common/perm.go
copy to kernel/perm.go
index 13f1e7e..e78a876 100644
--- a/common/perm.go
+++ b/kernel/perm.go
@@ -1,21 +1,21 @@
 /*
  * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
+ * contributor license agreements.  See the NOTICE file dqueueIstributed with
+ * thqueueIs work for additional information regarding copyright ownership.
+ * The ASF licenses thqueueIs file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use thqueueIs file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
  *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  dqueueIstributed under the License queueIs dqueueIstributed on an "AS IS" BASIS,
  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
+ *  See the License for the specific language governing permqueueIssions and
  *  limitations under the License.
  */
 
-package common
+package kernel
 
 const (
 	permPriority = 0x1 << 3
@@ -24,15 +24,15 @@ const (
 	permInherit  = 0x1 << 0
 )
 
-func isReadable(perm int) bool {
+func queueIsReadable(perm int) bool {
 	return (perm & permRead) == permRead
 }
 
-func isWriteable(perm int) bool {
+func queueIsWriteable(perm int) bool {
 	return (perm & permWrite) == permWrite
 }
 
-func isInherited(perm int) bool {
+func queueIsInherited(perm int) bool {
 	return (perm & permInherit) == permInherit
 }
 
@@ -42,15 +42,15 @@ func perm2string(perm int) string {
 		bytes[i] = '-'
 	}
 
-	if isReadable(perm) {
+	if queueIsReadable(perm) {
 		bytes[0] = 'R'
 	}
 
-	if isWriteable(perm) {
+	if queueIsWriteable(perm) {
 		bytes[1] = 'W'
 	}
 
-	if isInherited(perm) {
+	if queueIsInherited(perm) {
 		bytes[2] = 'X'
 	}
 
diff --git a/common/request.go b/kernel/request.go
similarity index 96%
rename from common/request.go
rename to kernel/request.go
index 83d74a0..7967cca 100644
--- a/common/request.go
+++ b/kernel/request.go
@@ -15,7 +15,7 @@ See the License for the specific language governing permissions and
 limitations under the License.
 */
 
-package common
+package kernel
 
 const (
 	GetRouteInfoByTopic = int16(105)
@@ -54,8 +54,9 @@ type PullMessageRequest struct {
 	SysFlag              int32  `json:"sysFlag"`
 	CommitOffset         int64  `json:"commitOffset"`
 	SuspendTimeoutMillis int64  `json:"suspendTimeoutMillis"`
-	Subscription         string `json:"subscription"`
+	SubExpression        string `json:"subscription"`
 	SubVersion           int64  `json:"subVersion"`
+	ExpressionType       string `json:"expressionType"`
 }
 
 type GetMaxOffsetRequest struct {
diff --git a/common/response.go b/kernel/response.go
similarity index 98%
rename from common/response.go
rename to kernel/response.go
index 0fc0101..df1cecc 100644
--- a/common/response.go
+++ b/kernel/response.go
@@ -15,7 +15,7 @@ See the License for the specific language governing permissions and
 limitations under the License.
 */
 
-package common
+package kernel
 
 const (
 	Success           = int16(0)
diff --git a/common/route.go b/kernel/route.go
similarity index 78%
rename from common/route.go
rename to kernel/route.go
index 4ad3218..38deb93 100644
--- a/common/route.go
+++ b/kernel/route.go
@@ -15,13 +15,12 @@ See the License for the specific language governing permissions and
 limitations under the License.
 */
 
-package common
+package kernel
 
 import (
 	"encoding/json"
+	"errors"
 	"github.com/apache/rocketmq-client-go/remote"
-	"github.com/pkg/errors"
-	log "github.com/sirupsen/logrus"
 	"sort"
 	"strconv"
 	"strings"
@@ -34,7 +33,7 @@ const (
 	requestTimeout   = 3000
 	defaultTopic     = "TBW102"
 	defaultQueueNums = 4
-	masterId         = int64(0)
+	MasterId         = int64(0)
 )
 
 var (
@@ -42,10 +41,15 @@ var (
 )
 
 var (
+	// brokerName -> *BrokerData
 	brokerAddressesMap sync.Map
-	publishInfoMap     sync.Map
-	routeDataMap       sync.Map
-	lockNamesrv        sync.Mutex
+
+	// brokerName -> map[string]int32
+	brokerVersionMap sync.Map
+
+	publishInfoMap sync.Map
+	routeDataMap   sync.Map
+	lockNamesrv    sync.Mutex
 )
 
 // key is topic, value is TopicPublishInfo
@@ -70,37 +74,7 @@ func (info *TopicPublishInfo) fetchQueueIndex() int {
 	return int(qIndex) % length
 }
 
-func tryToFindTopicPublishInfo(topic string) *TopicPublishInfo {
-	value, exist := publishInfoMap.Load(topic)
-
-	var info *TopicPublishInfo
-	if exist {
-		info = value.(*TopicPublishInfo)
-	}
-
-	if info == nil || !info.isOK() {
-		updateTopicRouteInfo(topic)
-		value, exist = publishInfoMap.Load(topic)
-		if !exist {
-			info = &TopicPublishInfo{HaveTopicRouterInfo: false}
-		} else {
-			info = value.(*TopicPublishInfo)
-		}
-	}
-
-	if info.HaveTopicRouterInfo || info.isOK() {
-		return info
-	}
-
-	value, exist = publishInfoMap.Load(topic)
-	if exist {
-		return value.(*TopicPublishInfo)
-	}
-
-	return nil
-}
-
-func updateTopicRouteInfo(topic string) {
+func UpdateTopicRouteInfo(topic string) {
 	// Todo process lock timeout
 	lockNamesrv.Lock()
 	defer lockNamesrv.Unlock()
@@ -151,13 +125,91 @@ func updateTopicRouteInfo(topic string) {
 	}
 }
 
+func FindBrokerAddressInPublish(brokerName string) string {
+	bd, exist := brokerAddressesMap.Load(brokerName)
+
+	if !exist {
+		return ""
+	}
+
+	return bd.(*BrokerData).brokerAddresses[MasterId]
+}
+
+func FindBrokerAddressInSubscribe(brokerName string, brokerId int64, onlyThisBroker bool) *FindBrokerResult {
+	var (
+		brokerAddr = ""
+		slave      = false
+		found      = false
+	)
+
+	bd, exist := brokerAddressesMap.Load(brokerName)
+
+	if exist {
+		for k, v := range bd.(*BrokerData).brokerAddresses {
+			if v != "" {
+				found = true
+				if k != MasterId {
+					slave = true
+				}
+				break
+			}
+		}
+	}
+
+	var result *FindBrokerResult
+	if found {
+		result = &FindBrokerResult{
+			BrokerAddr:    brokerName,
+			Slave:         slave,
+			BrokerVersion: findBrokerVersion(brokerName, brokerAddr),
+		}
+	}
+
+	return result
+}
+
+func FetchSubscribeMessageQueues(topic string) ([]*MessageQueue, error) {
+	routeData, err := queryTopicRouteInfoFromServer(topic, 3*time.Second)
+
+	if err != nil {
+		return nil, err
+	}
+
+	mqs := make([]*MessageQueue, 0)
+
+	for _, qd := range routeData.queueDataList {
+		if queueIsReadable(qd.perm) {
+			for i := 0; i < qd.readQueueNums; i++ {
+				mqs = append(mqs, &MessageQueue{Topic: topic, BrokerName: qd.brokerName, QueueId: i})
+			}
+		}
+	}
+
+	return mqs, nil
+}
+
+func findBrokerVersion(brokerName, brokerAddr string) int {
+	versions, exist := brokerVersionMap.Load(brokerName)
+
+	if !exist {
+		return 0
+	}
+
+	v, exist := versions.(map[string]int)[brokerAddr]
+
+	if exist {
+		return v
+	}
+	return 0
+}
+
 func queryTopicRouteInfoFromServer(topic string, timeout time.Duration) (*topicRouteData, error) {
 	request := &GetRouteInfoRequest{
 		Topic: topic,
 	}
 	rc := remote.NewRemotingCommand(GetRouteInfoByTopic, request, nil)
 
-	response, err := client.InvokeSync(getNameServerAddress(), rc, timeout)
+	response, err := remote.InvokeSync(getNameServerAddress(), rc, timeout)
 
 	if err != nil {
 		return nil, err
@@ -242,7 +294,7 @@ func RouteData2PublishInfo(topic string, data *topicRouteData) *TopicPublishInfo
 	})
 
 	for _, qd := range qds {
-		if !isWriteable(qd.perm) {
+		if !queueIsWriteable(qd.perm) {
 			continue
 		}
 
@@ -254,7 +306,7 @@ func RouteData2PublishInfo(topic string, data *topicRouteData) *TopicPublishInfo
 			}
 		}
 
-		if bData == nil || bData.brokerAddresses[masterId] == "" {
+		if bData == nil || bData.brokerAddresses[MasterId] == "" {
 			continue
 		}
 
diff --git a/common/transaction.go b/kernel/transaction.go
similarity index 98%
copy from common/transaction.go
copy to kernel/transaction.go
index e526af5..4895875 100644
--- a/common/transaction.go
+++ b/kernel/transaction.go
@@ -15,7 +15,7 @@ See the License for the specific language governing permissions and
 limitations under the License.
 */
 
-package common
+package kernel
 
 type TransactionListener interface {
 }
diff --git a/remote/client.go b/remote/client.go
index 7b1c528..94f5ed2 100644
--- a/remote/client.go
+++ b/remote/client.go
@@ -93,6 +93,18 @@ type RemotingClient interface {
 	InvokeOneWay(string, *RemotingCommand) error
 }
 
+func InvokeSync(addr string, request *RemotingCommand, timeout time.Duration) (*RemotingCommand, error) {
+	return nil, nil
+}
+
+func InvokeAsync(addr string, request *RemotingCommand, timeout time.Duration, f func(*RemotingCommand)) error {
+	return nil
+}
+
+func InvokeOneWay(addr string, request*RemotingCommand) error {
+	return nil
+}
+
 //defaultRemotingClient for default RemotingClient implementation
 type defaultRemotingClient struct {
 	responseTable    map[int32]*ResponseFuture
diff --git a/common/transaction.go b/utils/helper.go
similarity index 77%
rename from common/transaction.go
rename to utils/helper.go
index e526af5..fe3dd62 100644
--- a/common/transaction.go
+++ b/utils/helper.go
@@ -15,7 +15,18 @@ See the License for the specific language governing permissions and
 limitations under the License.
 */
 
-package common
+package utils
 
-type TransactionListener interface {
+import "hash/crc32"
+
+func LocalIP() string {
+	return ""
+}
+
+// HashString hashes a string to a unique hashcode.
+func HashString(s string) int {
+	if s == "" {
+		return 0
+	}
+	return int(crc32.ChecksumIEEE([]byte(s)))
 }
diff --git a/common/perm.go b/utils/log.go
similarity index 56%
rename from common/perm.go
rename to utils/log.go
index 13f1e7e..d684efd 100644
--- a/common/perm.go
+++ b/utils/log.go
@@ -15,44 +15,28 @@
  *  limitations under the License.
  */
 
-package common
-
-const (
-	permPriority = 0x1 << 3
-	permRead     = 0x1 << 2
-	permWrite    = 0x1 << 1
-	permInherit  = 0x1 << 0
-)
-
-func isReadable(perm int) bool {
-	return (perm & permRead) == permRead
-}
-
-func isWriteable(perm int) bool {
-	return (perm & permWrite) == permWrite
-}
-
-func isInherited(perm int) bool {
-	return (perm & permInherit) == permInherit
-}
-
-func perm2string(perm int) string {
-	bytes := make([]byte, 3)
-	for i := 0; i < 3; i++ {
-		bytes[i] = '-'
-	}
-
-	if isReadable(perm) {
-		bytes[0] = 'R'
-	}
-
-	if isWriteable(perm) {
-		bytes[1] = 'W'
-	}
-
-	if isInherited(perm) {
-		bytes[2] = 'X'
-	}
-
-	return string(bytes)
+package utils
+
+import "io"
+
+var RLog Logger
+
+type Logger interface {
+	Output() io.Writer
+	SetOutput(w io.Writer)
+	Prefix() string
+	SetPrefix(p string)
+	SetHeader(h string)
+	Print(i ...interface{})
+	Printf(format string, args ...interface{})
+	Debug(i ...interface{})
+	Debugf(format string, args ...interface{})
+	Info(i ...interface{})
+	Infof(format string, args ...interface{})
+	Warning(i ...interface{})
+	Warningf(format string, args ...interface{})
+	Error(i ...interface{})
+	Errorf(format string, args ...interface{})
+	Fatal(i ...interface{})
+	Fatalf(format string, args ...interface{})
 }