You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2019/03/12 12:50:24 UTC
[rocketmq-client-go] branch native updated: Adding consumer of
original (#39)
This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/native by this push:
new cb6cc2d Adding consumer of original (#39)
cb6cc2d is described below
commit cb6cc2d3e1647994b5955c7e05e1e6ada7e0387b
Author: wenfeng <sx...@gmail.com>
AuthorDate: Tue Mar 12 20:50:20 2019 +0800
Adding consumer of original (#39)
* Adding more implementations of common
* Adding consuemr of original
* fix compile error
---
consumer.go | 269 ++++++++++++++++++++++++++
go.mod | 1 -
go.sum | 2 -
common/manager.go => kernel/client.go | 121 ++++--------
{common => kernel}/message.go | 15 +-
kernel/model.go | 241 +++++++++++++++++++++++
common/transaction.go => kernel/mq_version.go | 15 +-
{common => kernel}/perm.go | 26 +--
{common => kernel}/request.go | 5 +-
{common => kernel}/response.go | 2 +-
{common => kernel}/route.go | 134 +++++++++----
{common => kernel}/transaction.go | 2 +-
remote/client.go | 12 ++
common/transaction.go => utils/helper.go | 15 +-
common/perm.go => utils/log.go | 64 +++---
15 files changed, 726 insertions(+), 198 deletions(-)
diff --git a/consumer.go b/consumer.go
new file mode 100644
index 0000000..896df74
--- /dev/null
+++ b/consumer.go
@@ -0,0 +1,269 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package rocketmq
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "github.com/apache/rocketmq-client-go/kernel"
+ "strconv"
+ "sync"
+ "time"
+)
+
+type Consumer interface {
+ Pull(topic, expression string, numbers int) (*kernel.PullResult, error)
+ SubscribeWithChan(topic, expression string) (chan *kernel.Message, error)
+ SubscribeWithFunc(topic, expression string, f func(msg *kernel.Message) ConsumeResult) error
+ ACK(msg *kernel.Message, result ConsumeResult)
+}
+
+type ConsumeResult int
+
+type ConsumerType int
+
+const (
+ Original ConsumerType = iota
+ Orderly
+ Transaction
+)
+
+type ConsumerConfig struct {
+ GroupName string
+ Model kernel.MessageModel
+ UnitMode bool
+ MaxReconsumeTimes int
+ PullMessageTimeout time.Duration
+ FromWhere kernel.ConsumeFromWhere
+ brokerSuspendMaxTimeMillis int64
+}
+
+func NewConsumer(config ConsumerConfig) Consumer {
+ return &defaultConsumer{
+ config: config,
+ }
+}
+
+type defaultConsumer struct {
+ state kernel.ServiceState
+ config ConsumerConfig
+}
+
+func (c *defaultConsumer) Pull(topic, expression string, numbers int) (*kernel.PullResult, error) {
+ mq := getNextQueueOf(topic)
+
+ if mq == nil {
+ return nil, fmt.Errorf("prepard to pull topic: %s, but no queue is founded", topic)
+ }
+
+ data := getSubscriptionData(mq, expression)
+ result, err := c.pull(context.Background(), mq, data, c.nextOffsetOf(mq), numbers)
+
+ if err != nil {
+ return nil, err
+ }
+
+ processPullResult(mq, result, data)
+ return result, nil
+}
+
+// SubscribeWithChan ack manually
+func (c *defaultConsumer) SubscribeWithChan(topic, expression string) (chan *kernel.Message, error) {
+ return nil, nil
+}
+
+// SubscribeWithFunc ack automatic
+func (c *defaultConsumer) SubscribeWithFunc(topic, expression string,
+ f func(msg *kernel.Message) ConsumeResult) error {
+ return nil
+}
+
+func (c *defaultConsumer) ACK(msg *kernel.Message, result ConsumeResult) {
+
+}
+
+func (c *defaultConsumer) pull(ctx context.Context, mq *kernel.MessageQueue, data *kernel.SubscriptionData,
+ offset int64, numbers int) (*kernel.PullResult, error) {
+ err := c.makeSureStateOK()
+ if err != nil {
+ return nil, err
+ }
+
+ if mq == nil {
+ return nil, errors.New("MessageQueue is nil")
+ }
+
+ if offset < 0 {
+ return nil, errors.New("offset < 0")
+ }
+
+ if numbers <= 0 {
+ numbers = 1
+ }
+ c.subscriptionAutomatically(mq.Topic)
+
+ brokerResult := tryFindBroker(mq)
+ if brokerResult == nil {
+ return nil, fmt.Errorf("the broker %s does not exist", mq.BrokerName)
+ }
+
+ if (data.ExpType == kernel.TAG) && brokerResult.BrokerVersion < kernel.V4_1_0 {
+ return nil, fmt.Errorf("the broker [%s, %v] does not upgrade to support for filter message by %v",
+ mq.BrokerName, brokerResult.BrokerVersion, data.ExpType)
+ }
+
+ sysFlag := buildSysFlag(false, true, true, false)
+
+ if brokerResult.Slave {
+ sysFlag = clearCommitOffsetFlag(sysFlag)
+ }
+ pullRequest := &kernel.PullMessageRequest{
+ ConsumerGroup: c.config.GroupName,
+ Topic: mq.Topic,
+ QueueId: int32(mq.QueueId),
+ QueueOffset: offset,
+ MaxMsgNums: int32(numbers),
+ SysFlag: sysFlag,
+ CommitOffset: 0,
+ SuspendTimeoutMillis: c.config.brokerSuspendMaxTimeMillis,
+ SubExpression: data.SubString,
+ ExpressionType: string(data.ExpType),
+ }
+
+ if data.ExpType == kernel.TAG {
+ pullRequest.SubVersion = 0
+ } else {
+ pullRequest.SubVersion = data.SubVersion
+ }
+
+ // TODO computePullFromWhichFilterServer
+ return kernel.PullMessage(ctx, brokerResult.BrokerAddr, pullRequest)
+}
+
+func (c *defaultConsumer) makeSureStateOK() error {
+ if c.state != kernel.Running {
+ return fmt.Errorf("the consumer state is [%s], not running", c.state)
+ }
+ return nil
+}
+
+func (c *defaultConsumer) subscriptionAutomatically(topic string) {
+ // TODO
+}
+
+func (c *defaultConsumer) nextOffsetOf(queue *kernel.MessageQueue) int64 {
+ return 0
+}
+
+func toMessage(messageExts []*kernel.MessageExt) []*kernel.Message {
+ msgs := make([]*kernel.Message, 0)
+
+ return msgs
+}
+
+func processPullResult(mq *kernel.MessageQueue, result *kernel.PullResult, data *kernel.SubscriptionData) {
+ updatePullFromWhichNode(mq, result.SuggestWhichBrokerId)
+ switch result.Status {
+ case kernel.PullFound:
+ msgs := result.GetMessageExts()
+ msgListFilterAgain := msgs
+ if len(data.Tags) > 0 && data.ClassFilterMode {
+ msgListFilterAgain = make([]*kernel.MessageExt, len(msgs))
+ for _, msg := range msgs {
+ _, exist := data.Tags[msg.GetTags()]
+ if exist {
+ msgListFilterAgain = append(msgListFilterAgain, msg)
+ }
+ }
+ }
+
+ // TODO hook
+
+ for _, msg := range msgListFilterAgain {
+ traFlag, _ := strconv.ParseBool(msg.Properties[kernel.TransactionPrepared])
+ if traFlag {
+ msg.TransactionId = msg.Properties[kernel.UniqueClientMessageIdKeyIndex]
+ }
+
+ msg.Properties[kernel.MinOffset] = strconv.FormatInt(result.MinOffset, 10)
+ msg.Properties[kernel.MaxOffset] = strconv.FormatInt(result.MaxOffset, 10)
+ }
+
+ result.SetMessageExts(msgListFilterAgain)
+ }
+}
+
+func getSubscriptionData(mq *kernel.MessageQueue, exp string) *kernel.SubscriptionData {
+ return nil
+}
+
+func getNextQueueOf(topic string) *kernel.MessageQueue {
+ return nil
+}
+
+func buildSysFlag(commitOffset, suspend, subscription, classFilter bool) int32 {
+ var flag int32 = 0
+ if commitOffset {
+ flag |= 0x1 << 0
+ }
+
+ if suspend {
+ flag |= 0x1 << 1
+ }
+
+ if subscription {
+ flag |= 0x1 << 2
+ }
+
+ if classFilter {
+ flag |= 0x1 << 3
+ }
+
+ return flag
+}
+
+func clearCommitOffsetFlag(sysFlag int32) int32 {
+ return sysFlag & (^0x1 << 0)
+}
+
+func tryFindBroker(mq *kernel.MessageQueue) *kernel.FindBrokerResult {
+ result := kernel.FindBrokerAddressInSubscribe(mq.BrokerName, recalculatePullFromWhichNode(mq), false)
+
+ if result == nil {
+ kernel.UpdateTopicRouteInfo(mq.Topic)
+ }
+
+ return kernel.FindBrokerAddressInSubscribe(mq.BrokerName, recalculatePullFromWhichNode(mq), false)
+}
+
+var (
+ pullFromWhichNodeTable sync.Map
+)
+
+func updatePullFromWhichNode(mq *kernel.MessageQueue, brokerId int64) {
+ pullFromWhichNodeTable.Store(mq.HashCode(), brokerId)
+}
+
+func recalculatePullFromWhichNode(mq *kernel.MessageQueue) int64 {
+ v, exist := pullFromWhichNodeTable.Load(mq.HashCode())
+ if exist {
+ return v.(int64)
+ }
+ return kernel.MasterId
+}
diff --git a/go.mod b/go.mod
index 809e20a..fa910f6 100644
--- a/go.mod
+++ b/go.mod
@@ -5,7 +5,6 @@ go 1.12
require (
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc // indirect
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf // indirect
- github.com/pkg/errors v0.8.1
github.com/sirupsen/logrus v1.3.0
github.com/stretchr/testify v1.3.0
gopkg.in/alecthomas/kingpin.v2 v2.2.6
diff --git a/go.sum b/go.sum
index 62f4cad..0b0e0c1 100644
--- a/go.sum
+++ b/go.sum
@@ -7,8 +7,6 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
-github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
-github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sirupsen/logrus v1.3.0 h1:hI/7Q+DtNZ2kINb6qt/lS+IyXnHQe9e90POfeewL/ME=
diff --git a/common/manager.go b/kernel/client.go
similarity index 71%
rename from common/manager.go
rename to kernel/client.go
index 83e2934..b8b6eee 100644
--- a/common/manager.go
+++ b/kernel/client.go
@@ -15,12 +15,16 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
-package common
+package kernel
import (
"context"
- "fmt"
+ "errors"
"github.com/apache/rocketmq-client-go/remote"
+ "github.com/apache/rocketmq-client-go/utils"
+ "os"
+ "strconv"
+ "sync"
"time"
)
@@ -29,6 +33,23 @@ const (
tranceOff = "false"
)
+var (
+ log = utils.RLog
+ namesrvAddrs = os.Getenv("rocketmq.namesrv.addr")
+ clientIP = utils.LocalIP()
+ instanceName = os.Getenv("rocketmq.client.name")
+ pollNameServerInterval = 30 * time.Second
+ heartbeatBrokerInterval = 30 * time.Second
+ persistConsumerOffsetInterval = 5 * time.Second
+ unitMode = false
+ vipChannelEnabled, _ = strconv.ParseBool(os.Getenv("com.rocketmq.sendMessageWithVIPChannel"))
+ clientID = clientIP + "@" + instanceName
+)
+
+var (
+ ErrServiceState = errors.New("service state is not Running, please check")
+)
+
type InnerProducer interface {
PublishTopicList() []string
UpdateTopicPublishInfo(topic string, info *TopicPublishInfo)
@@ -51,8 +72,9 @@ type InnerConsumer interface {
func SendMessageSync(ctx context.Context, brokerAddrs, brokerName string, request *SendMessageRequest,
msgs []*Message) (*SendResult, error) {
cmd := remote.NewRemotingCommand(SendBatchMessage, request, encodeMessages(msgs))
- response, err := client.InvokeSync(brokerAddrs, cmd, 3*time.Second)
+ response, err := remote.InvokeSync(brokerAddrs, cmd, 3 * time.Second)
if err != nil {
+ log.Warningf("send messages with sync error: %v", err)
return nil, err
}
@@ -68,14 +90,13 @@ func SendMessageAsync(ctx context.Context, brokerAddrs, brokerName string, reque
func SendMessageOneWay(ctx context.Context, brokerAddrs string, request *SendMessageRequest,
msgs []*Message) (*SendResult, error) {
cmd := remote.NewRemotingCommand(SendBatchMessage, request, encodeMessages(msgs))
- err := client.InvokeOneWay(brokerAddrs, cmd)
+ err := remote.InvokeOneWay(brokerAddrs, cmd)
+ if err != nil {
+ log.Warningf("send messages with oneway error: %v", err)
+ }
return nil, err
}
-func encodeMessages(message []*Message) []byte {
- return nil
-}
-
func processSendResponse(brokerName string, msgs []*Message, cmd *remote.RemotingCommand) *SendResult {
var status SendStatus
switch cmd.Code {
@@ -96,8 +117,7 @@ func processSendResponse(brokerName string, msgs []*Message, cmd *remote.Remotin
msgIDs := make([]string, 0)
for i := 0; i < len(msgs); i++ {
- msgIDs = append(msgIDs, msgs[i].Properties[UniqueClientMessageIdKeyindex])
-
+ msgIDs = append(msgIDs, msgs[i].Properties[UniqueClientMessageIdKeyIndex])
}
regionId := cmd.ExtFields[MsgRegion]
@@ -153,77 +173,18 @@ func UpdateConsumerOffset(consumerGroup, topic string, queue int, offset int64)
return nil
}
-//SendStatus message send result
-type SendStatus int
-
-const (
- SendOK SendStatus = iota
- SendFlushDiskTimeout
- SendFlushSlaveTimeout
- SendSlaveNotAvailable
-)
-
-// SendResult rocketmq send result
-type SendResult struct {
- Status SendStatus
- MsgIDs []string
- MessageQueue *MessageQueue
- QueueOffset int64
- TransactionID string
- OffsetMsgID string
- RegionID string
- TraceOn bool
-}
-
-// SendResult send message result to string(detail result)
-func (result *SendResult) String() string {
- return fmt.Sprintf("SendResult [sendStatus=%d, msgIds=%s, offsetMsgId=%s, queueOffset=%d, messageQueue=%s]",
- result.Status, result.MsgIDs, result.OffsetMsgID, result.QueueOffset, result.MessageQueue.String())
-}
-
-// PullResult the pull result
-type PullResult struct {
- NextBeginOffset int64
- MinOffset int64
- MaxOffset int64
- Status PullStatus
- Messages []*MessageExt
-}
-
-// PullStatus pull status
-type PullStatus int
+var (
+ // group -> InnerProducer
+ producerMap sync.Map
-// predefined pull status
-const (
- PullFound PullStatus = iota
- PullNoNewMsg
- PullNoMatchedMsg
- PullOffsetIllegal
- PullBrokerTimeout
+ // group -> InnerConsumer
+ consumerMap sync.Map
)
-// MessageQueue message queue
-type MessageQueue struct {
- Topic string `json:"topic"`
- BrokerName string `json:"brokerName"`
- QueueId int `json:"queueId"`
-}
-
-func (mq *MessageQueue) String() string {
- return fmt.Sprintf("MessageQueue [topic=%s, brokerName=%s, queueId=%d]", mq.Topic, mq.BrokerName, mq.QueueId)
-}
-
func CheckClientInBroker() {
}
-func SendHeartbeatToAllBrokerWithLock() {
-
-}
-
-func UpdateTopicRouteInfoFromNameServer(topic string) {
-}
-
func RegisterConsumer(group string, consumer InnerConsumer) {
}
@@ -248,16 +209,10 @@ func SelectConsumer(group string) InnerConsumer {
return nil
}
-func FindBrokerAddressInPublish(brokerName string) {
-
-}
-
-func FindBrokerAddressInSubscribe(brokerName string, brokerId int64, onlyThisBroker bool) *FindBrokerResult {
+func encodeMessages(message []*Message) []byte {
return nil
}
-type FindBrokerResult struct {
- brokerAddr string
- slave bool
- brokerVersion int32
+func sendHeartbeatToAllBroker() {
+
}
diff --git a/common/message.go b/kernel/message.go
similarity index 94%
rename from common/message.go
rename to kernel/message.go
index 08ead01..d65c48f 100644
--- a/common/message.go
+++ b/kernel/message.go
@@ -15,7 +15,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
-package common
+package kernel
import "fmt"
@@ -40,7 +40,7 @@ const (
ReconsumeTime = "RECONSUME_TIME"
MsgRegion = "MSG_REGION"
TraceSwitch = "TRACE_ON"
- UniqueClientMessageIdKeyindex = "UNIQ_KEY"
+ UniqueClientMessageIdKeyIndex = "UNIQ_KEY"
MaxReconsumeTimes = "MAX_RECONSUME_TIMES"
ConsumeStartTime = "CONSUME_START_TIME"
TranscationPreparedQueueOffset = "TRAN_PREPARED_QUEUE_OFFSET"
@@ -69,9 +69,10 @@ func (msg *Message) String() string {
msg.Topic, string(msg.Body), msg.Flag, msg.Properties, msg.TransactionId)
}
-func (msg *Message) SetTags(tags string) {
- msg.Properties[tags] = tags
-}
+//
+//func (msg *Message) SetTags(tags string) {
+// msg.Properties[tags] = tags
+//}
func (msg *Message) PutProperty(key, value string) {
msg.Properties[key] = value
@@ -104,6 +105,10 @@ type MessageExt struct {
PreparedTransactionOffset int64
}
+func (msgExt *MessageExt) GetTags() string {
+ return msgExt.Properties[Tags]
+}
+
func (msgExt *MessageExt) String() string {
return fmt.Sprint("[Message=%s, MsgId=%s, QueueId=%d, StoreSize=%d, QueueOffset=%d, SysFlag=%d, "+
"BornTimestamp=%d, BornHost=%s, StoreTimestamp=%d, StoreHost=%s, CommitLogOffset=%d, BodyCRC=%d, "+
diff --git a/kernel/model.go b/kernel/model.go
new file mode 100644
index 0000000..9ee30cb
--- /dev/null
+++ b/kernel/model.go
@@ -0,0 +1,241 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package kernel
+
+import (
+ "fmt"
+ "github.com/apache/rocketmq-client-go/utils"
+)
+
+// SendStatus of message
+type SendStatus int
+
+const (
+ SendOK SendStatus = iota
+ SendFlushDiskTimeout
+ SendFlushSlaveTimeout
+ SendSlaveNotAvailable
+)
+
+// SendResult RocketMQ send result
+type SendResult struct {
+ Status SendStatus
+ MsgIDs []string
+ MessageQueue *MessageQueue
+ QueueOffset int64
+ TransactionID string
+ OffsetMsgID string
+ RegionID string
+ TraceOn bool
+}
+
+// SendResult send message result to string(detail result)
+func (result *SendResult) String() string {
+ return fmt.Sprintf("SendResult [sendStatus=%d, msgIds=%s, offsetMsgId=%s, queueOffset=%d, messageQueue=%s]",
+ result.Status, result.MsgIDs, result.OffsetMsgID, result.QueueOffset, result.MessageQueue.String())
+}
+
+// PullStatus pull status
+type PullStatus int
+
+// predefined pull status
+const (
+ PullFound PullStatus = iota
+ PullNoNewMsg
+ PullNoMatchedMsg
+ PullOffsetIllegal
+ PullBrokerTimeout
+)
+
+// PullResult the pull result
+type PullResult struct {
+ NextBeginOffset int64
+ MinOffset int64
+ MaxOffset int64
+ Status PullStatus
+ SuggestWhichBrokerId int64
+ messageBinary []byte
+ messageExts []*MessageExt
+}
+
+func (result *PullResult) GetMessageExts() []*MessageExt {
+ if result.messageExts != nil && len(result.messageExts) > 0 {
+ return result.messageExts
+ }
+
+ return result.messageExts
+}
+
+func (result *PullResult) SetMessageExts(msgExts []*MessageExt) {
+ result.messageBinary = nil
+ result.messageExts = msgExts
+}
+
+func (result *PullResult) GetMessages() []*Message {
+ if result.messageExts == nil || len(result.messageExts) == 0 {
+ return make([]*Message, 0)
+ }
+ return toMessages(result.messageExts)
+}
+
+func toMessages(messageExts []*MessageExt) []*Message {
+ msgs := make([]*Message, 0)
+
+ return msgs
+}
+
+// MessageQueue message queue
+type MessageQueue struct {
+ Topic string `json:"topic"`
+ BrokerName string `json:"brokerName"`
+ QueueId int `json:"queueId"`
+}
+
+func (mq *MessageQueue) String() string {
+ return fmt.Sprintf("MessageQueue [topic=%s, brokerName=%s, queueId=%d]", mq.Topic, mq.BrokerName, mq.QueueId)
+}
+
+func (mq *MessageQueue) HashCode() int {
+ result := 1
+ result = 31*result + utils.HashString(mq.BrokerName)
+ result = 31*result + mq.QueueId
+ result = 31*result + utils.HashString(mq.Topic)
+
+ return result
+}
+
+type FindBrokerResult struct {
+ BrokerAddr string
+ Slave bool
+ BrokerVersion int
+}
+
+type (
+ // groupName of producer
+ producerData string
+
+ consumeType string
+
+ MessageModel int
+ ConsumeFromWhere int
+ ServiceState int
+)
+
+const (
+ ConsumeActively = consumeType("PULL")
+ ConsumePassively = consumeType("PUSH")
+
+ BroadCasting = MessageModel(1)
+ Clustering = MessageModel(2)
+
+ ConsumeFromLastOffset ConsumeFromWhere = iota
+ ConsumeFromFirstOffset
+ ConsumeFromTimestamp
+
+ CreateJust ServiceState = iota
+ Running
+ Shutdown
+)
+
+func (mode MessageModel) String() string {
+ switch mode {
+ case BroadCasting:
+ return "BroadCasting"
+ case Clustering:
+ return "Clustering"
+ default:
+ return "Unknown"
+ }
+}
+
+type ExpressionType string
+
+const (
+ /**
+ * <ul>
+ * Keywords:
+ * <li>{@code AND, OR, NOT, BETWEEN, IN, TRUE, FALSE, IS, NULL}</li>
+ * </ul>
+ * <p/>
+ * <ul>
+ * Data type:
+ * <li>Boolean, like: TRUE, FALSE</li>
+ * <li>String, like: 'abc'</li>
+ * <li>Decimal, like: 123</li>
+ * <li>Float number, like: 3.1415</li>
+ * </ul>
+ * <p/>
+ * <ul>
+ * Grammar:
+ * <li>{@code AND, OR}</li>
+ * <li>{@code >, >=, <, <=, =}</li>
+ * <li>{@code BETWEEN A AND B}, equals to {@code >=A AND <=B}</li>
+ * <li>{@code NOT BETWEEN A AND B}, equals to {@code >B OR <A}</li>
+ * <li>{@code IN ('a', 'b')}, equals to {@code ='a' OR ='b'}, this operation only support String type.</li>
+ * <li>{@code IS NULL}, {@code IS NOT NULL}, check parameter whether is null, or not.</li>
+ * <li>{@code =TRUE}, {@code =FALSE}, check parameter whether is true, or false.</li>
+ * </ul>
+ * <p/>
+ * <p>
+ * Example:
+ * (a > 10 AND a < 100) OR (b IS NOT NULL AND b=TRUE)
+ * </p>
+ */
+ SQL92 = ExpressionType("SQL92")
+
+ /**
+ * Only support or operation such as
+ * "tag1 || tag2 || tag3", <br>
+ * If null or * expression, meaning subscribe all.
+ */
+ TAG = ExpressionType("TAG")
+)
+
+func IsTagType(exp string) bool {
+ if exp == "" || exp == "TAG" {
+ return true
+ }
+ return false
+}
+
+var SubAll = "*"
+
+type SubscriptionData struct {
+ ClassFilterMode bool
+ Topic string
+ SubString string
+ Tags map[string]bool
+ Codes map[int32]bool
+ SubVersion int64
+ ExpType ExpressionType
+}
+
+type consumerData struct {
+ groupName string
+ cType consumeType
+ messageModel MessageModel
+ where ConsumeFromWhere
+ subscriptionDatas []SubscriptionData
+ unitMode bool
+}
+
+type heartbeatData struct {
+ clientId string
+ producerDatas []producerData
+ consumerDatas []consumerData
+}
diff --git a/common/transaction.go b/kernel/mq_version.go
similarity index 67%
copy from common/transaction.go
copy to kernel/mq_version.go
index e526af5..e7d7f32 100644
--- a/common/transaction.go
+++ b/kernel/mq_version.go
@@ -2,20 +2,21 @@
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
+The ASF licenses this file to You under the Apache License Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+Unless required by applicable law or agreed to in writing software
+distributed under the License is distributed on an "AS IS" BASIS
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
-package common
+package kernel
-type TransactionListener interface {
-}
+const (
+ V4_1_0 = 0
+)
diff --git a/common/perm.go b/kernel/perm.go
similarity index 54%
copy from common/perm.go
copy to kernel/perm.go
index 13f1e7e..e78a876 100644
--- a/common/perm.go
+++ b/kernel/perm.go
@@ -1,21 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
+ * contributor license agreements. See the NOTICE file dqueueIstributed with
+ * thqueueIs work for additional information regarding copyright ownership.
+ * The ASF licenses thqueueIs file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use thqueueIs file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
+ * dqueueIstributed under the License queueIs dqueueIstributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
+ * See the License for the specific language governing permqueueIssions and
* limitations under the License.
*/
-package common
+package kernel
const (
permPriority = 0x1 << 3
@@ -24,15 +24,15 @@ const (
permInherit = 0x1 << 0
)
-func isReadable(perm int) bool {
+func queueIsReadable(perm int) bool {
return (perm & permRead) == permRead
}
-func isWriteable(perm int) bool {
+func queueIsWriteable(perm int) bool {
return (perm & permWrite) == permWrite
}
-func isInherited(perm int) bool {
+func queueIsInherited(perm int) bool {
return (perm & permInherit) == permInherit
}
@@ -42,15 +42,15 @@ func perm2string(perm int) string {
bytes[i] = '-'
}
- if isReadable(perm) {
+ if queueIsReadable(perm) {
bytes[0] = 'R'
}
- if isWriteable(perm) {
+ if queueIsWriteable(perm) {
bytes[1] = 'W'
}
- if isInherited(perm) {
+ if queueIsInherited(perm) {
bytes[2] = 'X'
}
diff --git a/common/request.go b/kernel/request.go
similarity index 96%
rename from common/request.go
rename to kernel/request.go
index 83d74a0..7967cca 100644
--- a/common/request.go
+++ b/kernel/request.go
@@ -15,7 +15,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
-package common
+package kernel
const (
GetRouteInfoByTopic = int16(105)
@@ -54,8 +54,9 @@ type PullMessageRequest struct {
SysFlag int32 `json:"sysFlag"`
CommitOffset int64 `json:"commitOffset"`
SuspendTimeoutMillis int64 `json:"suspendTimeoutMillis"`
- Subscription string `json:"subscription"`
+ SubExpression string `json:"subscription"`
SubVersion int64 `json:"subVersion"`
+ ExpressionType string `json:"expressionType"`
}
type GetMaxOffsetRequest struct {
diff --git a/common/response.go b/kernel/response.go
similarity index 98%
rename from common/response.go
rename to kernel/response.go
index 0fc0101..df1cecc 100644
--- a/common/response.go
+++ b/kernel/response.go
@@ -15,7 +15,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
-package common
+package kernel
const (
Success = int16(0)
diff --git a/common/route.go b/kernel/route.go
similarity index 78%
rename from common/route.go
rename to kernel/route.go
index 4ad3218..38deb93 100644
--- a/common/route.go
+++ b/kernel/route.go
@@ -15,13 +15,12 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
-package common
+package kernel
import (
"encoding/json"
+ "errors"
"github.com/apache/rocketmq-client-go/remote"
- "github.com/pkg/errors"
- log "github.com/sirupsen/logrus"
"sort"
"strconv"
"strings"
@@ -34,7 +33,7 @@ const (
requestTimeout = 3000
defaultTopic = "TBW102"
defaultQueueNums = 4
- masterId = int64(0)
+ MasterId = int64(0)
)
var (
@@ -42,10 +41,15 @@ var (
)
var (
+ // brokerName -> *BrokerData
brokerAddressesMap sync.Map
- publishInfoMap sync.Map
- routeDataMap sync.Map
- lockNamesrv sync.Mutex
+
+ // brokerName -> map[string]int32
+ brokerVersionMap sync.Map
+
+ publishInfoMap sync.Map
+ routeDataMap sync.Map
+ lockNamesrv sync.Mutex
)
// key is topic, value is TopicPublishInfo
@@ -70,37 +74,7 @@ func (info *TopicPublishInfo) fetchQueueIndex() int {
return int(qIndex) % length
}
-func tryToFindTopicPublishInfo(topic string) *TopicPublishInfo {
- value, exist := publishInfoMap.Load(topic)
-
- var info *TopicPublishInfo
- if exist {
- info = value.(*TopicPublishInfo)
- }
-
- if info == nil || !info.isOK() {
- updateTopicRouteInfo(topic)
- value, exist = publishInfoMap.Load(topic)
- if !exist {
- info = &TopicPublishInfo{HaveTopicRouterInfo: false}
- } else {
- info = value.(*TopicPublishInfo)
- }
- }
-
- if info.HaveTopicRouterInfo || info.isOK() {
- return info
- }
-
- value, exist = publishInfoMap.Load(topic)
- if exist {
- return value.(*TopicPublishInfo)
- }
-
- return nil
-}
-
-func updateTopicRouteInfo(topic string) {
+func UpdateTopicRouteInfo(topic string) {
// Todo process lock timeout
lockNamesrv.Lock()
defer lockNamesrv.Unlock()
@@ -151,13 +125,91 @@ func updateTopicRouteInfo(topic string) {
}
}
+func FindBrokerAddressInPublish(brokerName string) string {
+ bd, exist := brokerAddressesMap.Load(brokerName)
+
+ if !exist {
+ return ""
+ }
+
+ return bd.(*BrokerData).brokerAddresses[MasterId]
+}
+
+func FindBrokerAddressInSubscribe(brokerName string, brokerId int64, onlyThisBroker bool) *FindBrokerResult {
+ var (
+ brokerAddr = ""
+ slave = false
+ found = false
+ )
+
+ bd, exist := brokerAddressesMap.Load(brokerName)
+
+ if exist {
+ for k, v := range bd.(*BrokerData).brokerAddresses {
+ if v != "" {
+ found = true
+ if k != MasterId {
+ slave = true
+ }
+ break
+ }
+ }
+ }
+
+ var result *FindBrokerResult
+ if found {
+ result = &FindBrokerResult{
+ BrokerAddr: brokerName,
+ Slave: slave,
+ BrokerVersion: findBrokerVersion(brokerName, brokerAddr),
+ }
+ }
+
+ return result
+}
+
+func FetchSubscribeMessageQueues(topic string) ([]*MessageQueue, error) {
+ routeData, err := queryTopicRouteInfoFromServer(topic, 3*time.Second)
+
+ if err != nil {
+ return nil, err
+ }
+
+ mqs := make([]*MessageQueue, 0)
+
+ for _, qd := range routeData.queueDataList {
+ if queueIsReadable(qd.perm) {
+ for i := 0; i < qd.readQueueNums; i++ {
+ mqs = append(mqs, &MessageQueue{Topic: topic, BrokerName: qd.brokerName, QueueId: i})
+ }
+ }
+ }
+
+ return mqs, nil
+}
+
+func findBrokerVersion(brokerName, brokerAddr string) int {
+ versions, exist := brokerVersionMap.Load(brokerName)
+
+ if !exist {
+ return 0
+ }
+
+ v, exist := versions.(map[string]int)[brokerAddr]
+
+ if exist {
+ return v
+ }
+ return 0
+}
+
func queryTopicRouteInfoFromServer(topic string, timeout time.Duration) (*topicRouteData, error) {
request := &GetRouteInfoRequest{
Topic: topic,
}
rc := remote.NewRemotingCommand(GetRouteInfoByTopic, request, nil)
- response, err := client.InvokeSync(getNameServerAddress(), rc, timeout)
+ response, err := remote.InvokeSync(getNameServerAddress(), rc, timeout)
if err != nil {
return nil, err
@@ -242,7 +294,7 @@ func RouteData2PublishInfo(topic string, data *topicRouteData) *TopicPublishInfo
})
for _, qd := range qds {
- if !isWriteable(qd.perm) {
+ if !queueIsWriteable(qd.perm) {
continue
}
@@ -254,7 +306,7 @@ func RouteData2PublishInfo(topic string, data *topicRouteData) *TopicPublishInfo
}
}
- if bData == nil || bData.brokerAddresses[masterId] == "" {
+ if bData == nil || bData.brokerAddresses[MasterId] == "" {
continue
}
diff --git a/common/transaction.go b/kernel/transaction.go
similarity index 98%
copy from common/transaction.go
copy to kernel/transaction.go
index e526af5..4895875 100644
--- a/common/transaction.go
+++ b/kernel/transaction.go
@@ -15,7 +15,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
-package common
+package kernel
type TransactionListener interface {
}
diff --git a/remote/client.go b/remote/client.go
index 7b1c528..94f5ed2 100644
--- a/remote/client.go
+++ b/remote/client.go
@@ -93,6 +93,18 @@ type RemotingClient interface {
InvokeOneWay(string, *RemotingCommand) error
}
+func InvokeSync(addr string, request *RemotingCommand, timeout time.Duration) (*RemotingCommand, error) {
+ return nil, nil
+}
+
+func InvokeAsync(addr string, request *RemotingCommand, timeout time.Duration, f func(*RemotingCommand)) error {
+ return nil
+}
+
+func InvokeOneWay(addr string, request*RemotingCommand) error {
+ return nil
+}
+
//defaultRemotingClient for default RemotingClient implementation
type defaultRemotingClient struct {
responseTable map[int32]*ResponseFuture
diff --git a/common/transaction.go b/utils/helper.go
similarity index 77%
rename from common/transaction.go
rename to utils/helper.go
index e526af5..fe3dd62 100644
--- a/common/transaction.go
+++ b/utils/helper.go
@@ -15,7 +15,18 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
-package common
+package utils
-type TransactionListener interface {
+import "hash/crc32"
+
+func LocalIP() string {
+ return ""
+}
+
+// HashString hashes a string to a unique hashcode.
+func HashString(s string) int {
+ if s == "" {
+ return 0
+ }
+ return int(crc32.ChecksumIEEE([]byte(s)))
}
diff --git a/common/perm.go b/utils/log.go
similarity index 56%
rename from common/perm.go
rename to utils/log.go
index 13f1e7e..d684efd 100644
--- a/common/perm.go
+++ b/utils/log.go
@@ -15,44 +15,28 @@
* limitations under the License.
*/
-package common
-
-const (
- permPriority = 0x1 << 3
- permRead = 0x1 << 2
- permWrite = 0x1 << 1
- permInherit = 0x1 << 0
-)
-
-func isReadable(perm int) bool {
- return (perm & permRead) == permRead
-}
-
-func isWriteable(perm int) bool {
- return (perm & permWrite) == permWrite
-}
-
-func isInherited(perm int) bool {
- return (perm & permInherit) == permInherit
-}
-
-func perm2string(perm int) string {
- bytes := make([]byte, 3)
- for i := 0; i < 3; i++ {
- bytes[i] = '-'
- }
-
- if isReadable(perm) {
- bytes[0] = 'R'
- }
-
- if isWriteable(perm) {
- bytes[1] = 'W'
- }
-
- if isInherited(perm) {
- bytes[2] = 'X'
- }
-
- return string(bytes)
+package utils
+
+import "io"
+
+var RLog Logger
+
+type Logger interface {
+ Output() io.Writer
+ SetOutput(w io.Writer)
+ Prefix() string
+ SetPrefix(p string)
+ SetHeader(h string)
+ Print(i ...interface{})
+ Printf(format string, args ...interface{})
+ Debug(i ...interface{})
+ Debugf(format string, args ...interface{})
+ Info(i ...interface{})
+ Infof(format string, args ...interface{})
+ Warning(i ...interface{})
+ Warningf(format string, args ...interface{})
+ Error(i ...interface{})
+ Errorf(format string, args ...interface{})
+ Fatal(i ...interface{})
+ Fatalf(format string, args ...interface{})
}