You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by hu...@apache.org on 2019/07/02 15:40:19 UTC
[rocketmq-client-go] branch native updated: Refactor directories &
API (#82)
This is an automated email from the ASF dual-hosted git repository.
huzongtang pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/native by this push:
new 3011efa Refactor directories & API (#82)
3011efa is described below
commit 3011efa4508e453f14e0ef83647aa0e429e1cb35
Author: wenfeng <sx...@gmail.com>
AuthorDate: Tue Jul 2 23:40:15 2019 +0800
Refactor directories & API (#82)
* refactor directorie & apiss
* fix ci failed
---
api.go | 54 ++++
consumer/strategy.go | 130 ---------
docs/Introduction.md | 12 +-
examples/consumer/main.go | 16 +-
examples/producer/main.go | 8 +-
{consumer => internal/consumer}/consumer.go | 301 ++++-----------------
{consumer => internal/consumer}/consumer_test.go | 0
{consumer => internal/consumer}/offset_store.go | 33 +--
{consumer => internal/consumer}/process_queue.go | 24 +-
{consumer => internal/consumer}/pull_consumer.go | 50 ++--
.../consumer}/pull_consumer_test.go | 0
{consumer => internal/consumer}/push_consumer.go | 103 +++----
.../consumer}/push_consumer_test.go | 0
{consumer => internal/consumer}/statistics.go | 0
{kernel => internal/kernel}/client.go | 83 +++---
{kernel => internal/kernel}/client_test.go | 0
{kernel => internal/kernel}/constants.go | 0
internal/kernel/model.go | 80 ++++++
{kernel => internal/kernel}/mq_version.go | 0
{kernel => internal/kernel}/perm.go | 0
{kernel => internal/kernel}/request.go | 0
{kernel => internal/kernel}/response.go | 0
{kernel => internal/kernel}/route.go | 15 +-
{kernel => internal/kernel}/route_test.go | 0
{kernel => internal/kernel}/transaction.go | 0
{kernel => internal/kernel}/validators.go | 0
{producer => internal/producer}/producer.go | 29 +-
{remote => internal/remote}/codec.go | 0
{remote => internal/remote}/codec_test.go | 0
{remote => internal/remote}/remote_client.go | 0
{remote => internal/remote}/remote_client_test.go | 0
{remote => internal/remote}/rpchook.go | 0
primitive/consume.go | 128 +++++++++
{kernel => primitive}/message.go | 32 ++-
primitive/options.go | 132 +++++++++
kernel/model.go => primitive/result.go | 86 +-----
primitive/strategy.go | 117 ++++++++
37 files changed, 760 insertions(+), 673 deletions(-)
diff --git a/api.go b/api.go
new file mode 100644
index 0000000..5043f95
--- /dev/null
+++ b/api.go
@@ -0,0 +1,54 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package rocketmq
+
+import (
+ "context"
+ "github.com/apache/rocketmq-client-go/primitive"
+)
+
+type Producer interface {
+ Start() error
+ Shutdown() error
+ SendSync(context.Context, ...*primitive.Message) (primitive.SendResult, error)
+ SendAsync(context.Context, func(primitive.SendResult), ...*primitive.Message) error
+ SendOneWay(context.Context, ...*primitive.Message) error
+}
+
+func NewProducer(opt primitive.ProducerOptions) (Producer, error) {
+ return nil, nil
+}
+
+type PushConsumer interface {
+ Start() error
+ Shutdown() error
+ Subscribe(topic string, selector primitive.MessageSelector,
+ f func(context.Context, ...*primitive.MessageExt) (primitive.ConsumeResult, error)) error
+ Unsubscribe(string) error
+}
+
+type PullConsumer interface {
+ Start() error
+ Shutdown() error
+ Pull(context.Context, string, primitive.MessageSelector, int) (primitive.PullResult, error)
+ PullFrom(context.Context, primitive.MessageQueue, int64, int) (primitive.PullResult, error)
+ // only update in memory
+ UpdateOffset(primitive.MessageQueue, int64) error
+ PersistOffset(context.Context) error
+ CurrentOffset(primitive.MessageQueue) int64
+}
diff --git a/consumer/strategy.go b/consumer/strategy.go
deleted file mode 100644
index 1cb6b2b..0000000
--- a/consumer/strategy.go
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements. See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package consumer
-
-import (
- "github.com/apache/rocketmq-client-go/kernel"
- "github.com/apache/rocketmq-client-go/rlog"
- "github.com/apache/rocketmq-client-go/utils"
-)
-
-// Strategy Algorithm for message allocating between consumers
-type AllocateStrategy string
-
-const (
- // An allocate strategy proxy for based on machine room nearside priority. An actual allocate strategy can be
- // specified.
- //
- // If any consumer is alive in a machine room, the message queue of the broker which is deployed in the same machine
- // should only be allocated to those. Otherwise, those message queues can be shared along all consumers since there are
- // no alive consumer to monopolize them.
- StrategyMachineNearby = AllocateStrategy("MachineNearby")
-
- // Average Hashing queue algorithm
- StrategyAveragely = AllocateStrategy("Averagely")
-
- // Cycle average Hashing queue algorithm
- StrategyAveragelyCircle = AllocateStrategy("AveragelyCircle")
-
- // Use Message Queue specified
- StrategyConfig = AllocateStrategy("Config")
-
- // Computer room Hashing queue algorithm, such as Alipay logic room
- StrategyMachineRoom = AllocateStrategy("MachineRoom")
-
- // Consistent Hashing queue algorithm
- StrategyConsistentHash = AllocateStrategy("ConsistentHash")
-)
-
-func allocateByAveragely(consumerGroup, currentCID string, mqAll []*kernel.MessageQueue,
- cidAll []string) []*kernel.MessageQueue {
- if currentCID == "" || utils.IsArrayEmpty(mqAll) || utils.IsArrayEmpty(cidAll) {
- return nil
- }
- var (
- find bool
- index int
- )
-
- for idx := range cidAll {
- if cidAll[idx] == currentCID {
- find = true
- index = idx
- break
- }
- }
- if !find {
- rlog.Infof("[BUG] ConsumerGroup=%s, ConsumerId=%s not in cidAll:%+v", consumerGroup, currentCID, cidAll)
- return nil
- }
-
- mqSize := len(mqAll)
- cidSize := len(cidAll)
- mod := mqSize % cidSize
-
- var averageSize int
- if mqSize <= cidSize {
- averageSize = 1
- } else {
- if mod > 0 && index < mod {
- averageSize = mqSize/cidSize + 1
- } else {
- averageSize = mqSize / cidSize
- }
- }
-
- var startIndex int
- if mod > 0 && index < mod {
- startIndex = index * averageSize
- } else {
- startIndex = index*averageSize + mod
- }
-
- num := utils.MinInt(averageSize, mqSize-startIndex)
- result := make([]*kernel.MessageQueue, num)
- for i := 0; i < num; i++ {
- result[i] = mqAll[(startIndex+i)%mqSize]
- }
- return result
-}
-
-// TODO
-func allocateByMachineNearby(consumerGroup, currentCID string, mqAll []*kernel.MessageQueue,
- cidAll []string) []*kernel.MessageQueue {
- return allocateByAveragely(consumerGroup, currentCID, mqAll, cidAll)
-}
-
-func allocateByAveragelyCircle(consumerGroup, currentCID string, mqAll []*kernel.MessageQueue,
- cidAll []string) []*kernel.MessageQueue {
- return allocateByAveragely(consumerGroup, currentCID, mqAll, cidAll)
-}
-
-func allocateByConfig(consumerGroup, currentCID string, mqAll []*kernel.MessageQueue,
- cidAll []string) []*kernel.MessageQueue {
- return allocateByAveragely(consumerGroup, currentCID, mqAll, cidAll)
-}
-
-func allocateByMachineRoom(consumerGroup, currentCID string, mqAll []*kernel.MessageQueue,
- cidAll []string) []*kernel.MessageQueue {
- return allocateByAveragely(consumerGroup, currentCID, mqAll, cidAll)
-}
-
-func allocateByConsistentHash(consumerGroup, currentCID string, mqAll []*kernel.MessageQueue,
- cidAll []string) []*kernel.MessageQueue {
- return allocateByAveragely(consumerGroup, currentCID, mqAll, cidAll)
-}
diff --git a/docs/Introduction.md b/docs/Introduction.md
index 9bc22c7..0db6e7b 100644
--- a/docs/Introduction.md
+++ b/docs/Introduction.md
@@ -20,8 +20,8 @@ rlog.SetLogger(Logger)
Producer interface {
Start() error
Shutdown() error
- SendSync(context.Context, *kernel.Message) (*kernel.SendResult, error)
- SendOneWay(context.Context, *kernel.Message) error
+ SendSync(context.Context, *primitive.Message) (*kernel.SendResult, error)
+ SendOneWay(context.Context, *primitive.Message) error
}
```
@@ -42,7 +42,7 @@ err := p.Start()
- send message with sync
```go
-result, err := p.SendSync(context.Background(), &kernel.Message{
+result, err := p.SendSync(context.Background(), &primitive.Message{
Topic: "test",
Body: []byte("Hello RocketMQ Go Client!"),
})
@@ -52,7 +52,7 @@ result, err := p.SendSync(context.Background(), &kernel.Message{
- or send message with oneway
```go
-err := p.SendOneWay(context.Background(), &kernel.Message{
+err := p.SendOneWay(context.Background(), &primitive.Message{
Topic: "test",
Body: []byte("Hello RocketMQ Go Client!"),
})
@@ -68,7 +68,7 @@ PushConsumer interface {
Start() error
Shutdown()
Subscribe(topic string, selector MessageSelector,
- f func(*ConsumeMessageContext, []*kernel.MessageExt) (ConsumeResult, error)) error
+ f func(*ConsumeMessageContext, []*primitive.MessageExt) (ConsumeResult, error)) error
}
```
@@ -85,7 +85,7 @@ c := consumer.NewPushConsumer("testGroup", consumer.ConsumerOption{
- Subscribe a topic(only support one topic now), and define your consuming function
```go
err := c.Subscribe("test", consumer.MessageSelector{}, func(ctx *consumer.ConsumeMessageContext,
- msgs []*kernel.MessageExt) (consumer.ConsumeResult, error) {
+ msgs []*primitive.MessageExt) (consumer.ConsumeResult, error) {
fmt.Println(msgs)
return consumer.ConsumeSuccess, nil
})
diff --git a/examples/consumer/main.go b/examples/consumer/main.go
index f660d43..65b0ba3 100644
--- a/examples/consumer/main.go
+++ b/examples/consumer/main.go
@@ -19,22 +19,22 @@ package main
import (
"fmt"
- "github.com/apache/rocketmq-client-go/consumer"
- "github.com/apache/rocketmq-client-go/kernel"
+ "github.com/apache/rocketmq-client-go/internal/consumer"
+ "github.com/apache/rocketmq-client-go/primitive"
"os"
"time"
)
func main() {
- c, _ := consumer.NewPushConsumer("testGroup", consumer.ConsumerOption{
+ c, _ := consumer.NewPushConsumer("testGroup", primitive.ConsumerOption{
NameServerAddr: "127.0.0.1:9876",
- ConsumerModel: consumer.Clustering,
- FromWhere: consumer.ConsumeFromFirstOffset,
+ ConsumerModel: primitive.Clustering,
+ FromWhere: primitive.ConsumeFromFirstOffset,
})
- err := c.Subscribe("test", consumer.MessageSelector{}, func(ctx *consumer.ConsumeMessageContext,
- msgs []*kernel.MessageExt) (consumer.ConsumeResult, error) {
+ err := c.Subscribe("test", primitive.MessageSelector{}, func(ctx *consumer.ConsumeMessageContext,
+ msgs []*primitive.MessageExt) (primitive.ConsumeResult, error) {
fmt.Println(msgs)
- return consumer.ConsumeSuccess, nil
+ return primitive.ConsumeSuccess, nil
})
if err != nil {
fmt.Println(err.Error())
diff --git a/examples/producer/main.go b/examples/producer/main.go
index 9cc626b..c3e338d 100644
--- a/examples/producer/main.go
+++ b/examples/producer/main.go
@@ -20,13 +20,13 @@ package main
import (
"context"
"fmt"
- "github.com/apache/rocketmq-client-go/kernel"
- "github.com/apache/rocketmq-client-go/producer"
+ "github.com/apache/rocketmq-client-go/internal/producer"
+ "github.com/apache/rocketmq-client-go/primitive"
"os"
)
func main() {
- opt := producer.ProducerOptions{
+ opt := primitive.ProducerOptions{
NameServerAddr: "127.0.0.1:9876",
RetryTimesWhenSendFailed: 2,
}
@@ -37,7 +37,7 @@ func main() {
os.Exit(1)
}
for i := 0; i < 1000; i++ {
- res, err := p.SendSync(context.Background(), &kernel.Message{
+ res, err := p.SendSync(context.Background(), &primitive.Message{
Topic: "test",
Body: []byte("Hello RocketMQ Go Client!"),
})
diff --git a/consumer/consumer.go b/internal/consumer/consumer.go
similarity index 66%
rename from consumer/consumer.go
rename to internal/consumer/consumer.go
index 80e4fb5..08ae664 100644
--- a/consumer/consumer.go
+++ b/internal/consumer/consumer.go
@@ -20,8 +20,9 @@ package consumer
import (
"encoding/json"
"fmt"
- "github.com/apache/rocketmq-client-go/kernel"
- "github.com/apache/rocketmq-client-go/remote"
+ "github.com/apache/rocketmq-client-go/internal/kernel"
+ "github.com/apache/rocketmq-client-go/internal/remote"
+ "github.com/apache/rocketmq-client-go/primitive"
"github.com/apache/rocketmq-client-go/rlog"
"github.com/apache/rocketmq-client-go/utils"
"github.com/tidwall/gjson"
@@ -53,135 +54,18 @@ const (
_PersistConsumerOffsetInterval = 5 * time.Second
)
-// Message model defines the way how messages are delivered to each consumer clients.
-// </p>
-//
-// RocketMQ supports two message models: clustering and broadcasting. If clustering is set, consumer clients with
-// the same {@link #consumerGroup} would only consume shards of the messages subscribed, which achieves load
-// balances; Conversely, if the broadcasting is set, each consumer client will consume all subscribed messages
-// separately.
-// </p>
-//
-// This field defaults to clustering.
-type MessageModel int
-
-const (
- BroadCasting MessageModel = iota
- Clustering
-)
-
-func (mode MessageModel) String() string {
- switch mode {
- case BroadCasting:
- return "BroadCasting"
- case Clustering:
- return "Clustering"
- default:
- return "Unknown"
- }
-}
-
-// Consuming point on consumer booting.
-// </p>
-//
-// There are three consuming points:
-// <ul>
-// <li>
-// <code>CONSUME_FROM_LAST_OFFSET</code>: consumer clients pick up where it stopped previously.
-// If it were a newly booting up consumer client, according aging of the consumer group, there are two
-// cases:
-// <ol>
-// <li>
-// if the consumer group is created so recently that the earliest message being subscribed has yet
-// expired, which means the consumer group represents a lately launched business, consuming will
-// start from the very beginning;
-// </li>
-// <li>
-// if the earliest message being subscribed has expired, consuming will start from the latest
-// messages, meaning messages born prior to the booting timestamp would be ignored.
-// </li>
-// </ol>
-// </li>
-// <li>
-// <code>CONSUME_FROM_FIRST_OFFSET</code>: Consumer client will start from earliest messages available.
-// </li>
-// <li>
-// <code>CONSUME_FROM_TIMESTAMP</code>: Consumer client will start from specified timestamp, which means
-// messages born prior to {@link #consumeTimestamp} will be ignored
-// </li>
-// </ul>
-type ConsumeFromWhere int
-
-const (
- ConsumeFromLastOffset ConsumeFromWhere = iota
- ConsumeFromFirstOffset
- ConsumeFromTimestamp
-)
-
type ConsumeType string
const (
_PullConsume = ConsumeType("pull")
_PushConsume = ConsumeType("push")
-)
-
-type ExpressionType string
-
-const (
- /**
- * <ul>
- * Keywords:
- * <li>{@code AND, OR, NOT, BETWEEN, IN, TRUE, FALSE, IS, NULL}</li>
- * </ul>
- * <p/>
- * <ul>
- * Data type:
- * <li>Boolean, like: TRUE, FALSE</li>
- * <li>String, like: 'abc'</li>
- * <li>Decimal, like: 123</li>
- * <li>Float number, like: 3.1415</li>
- * </ul>
- * <p/>
- * <ul>
- * Grammar:
- * <li>{@code AND, OR}</li>
- * <li>{@code >, >=, <, <=, =}</li>
- * <li>{@code BETWEEN A AND B}, equals to {@code >=A AND <=B}</li>
- * <li>{@code NOT BETWEEN A AND B}, equals to {@code >B OR <A}</li>
- * <li>{@code IN ('a', 'b')}, equals to {@code ='a' OR ='b'}, this operation only support String type.</li>
- * <li>{@code IS NULL}, {@code IS NOT NULL}, check parameter whether is null, or not.</li>
- * <li>{@code =TRUE}, {@code =FALSE}, check parameter whether is true, or false.</li>
- * </ul>
- * <p/>
- * <p>
- * Example:
- * (a > 10 AND a < 100) OR (b IS NOT NULL AND b=TRUE)
- * </p>
- */
- SQL92 = ExpressionType("SQL92")
-
- /**
- * Only support or operation such as
- * "tag1 || tag2 || tag3", <br>
- * If null or * expression, meaning subscribe all.
- */
- TAG = ExpressionType("TAG")
-)
-
-func IsTagType(exp string) bool {
- if exp == "" || exp == "TAG" {
- return true
- }
- return false
-}
-const (
_SubAll = "*"
)
type PullRequest struct {
consumerGroup string
- mq *kernel.MessageQueue
+ mq *primitive.MessageQueue
pq *processQueue
nextOffset int64
lockedFirst bool
@@ -192,83 +76,6 @@ func (pr *PullRequest) String() string {
pr.consumerGroup, pr.mq.Topic, pr.mq.QueueId)
}
-type ConsumerOption struct {
- kernel.ClientOption
- NameServerAddr string
-
- /**
- * Backtracking consumption time with second precision. Time format is
- * 20131223171201<br>
- * Implying Seventeen twelve and 01 seconds on December 23, 2013 year<br>
- * Default backtracking consumption time Half an hour ago.
- */
- ConsumeTimestamp string
-
- // The socket timeout in milliseconds
- ConsumerPullTimeout time.Duration
-
- // Concurrently max span offset.it has no effect on sequential consumption
- ConsumeConcurrentlyMaxSpan int
-
- // Flow control threshold on queue level, each message queue will cache at most 1000 messages by default,
- // Consider the {PullBatchSize}, the instantaneous value may exceed the limit
- PullThresholdForQueue int64
-
- // Limit the cached message size on queue level, each message queue will cache at most 100 MiB messages by default,
- // Consider the {@code pullBatchSize}, the instantaneous value may exceed the limit
- //
- // The size of a message only measured by message body, so it's not accurate
- PullThresholdSizeForQueue int
-
- // Flow control threshold on topic level, default value is -1(Unlimited)
- //
- // The value of {@code pullThresholdForQueue} will be overwrote and calculated based on
- // {@code pullThresholdForTopic} if it is't unlimited
- //
- // For example, if the value of pullThresholdForTopic is 1000 and 10 message queues are assigned to this consumer,
- // then pullThresholdForQueue will be set to 100
- PullThresholdForTopic int
-
- // Limit the cached message size on topic level, default value is -1 MiB(Unlimited)
- //
- // The value of {@code pullThresholdSizeForQueue} will be overwrote and calculated based on
- // {@code pullThresholdSizeForTopic} if it is't unlimited
- //
- // For example, if the value of pullThresholdSizeForTopic is 1000 MiB and 10 message queues are
- // assigned to this consumer, then pullThresholdSizeForQueue will be set to 100 MiB
- PullThresholdSizeForTopic int
-
- // Message pull Interval
- PullInterval time.Duration
-
- // Batch consumption size
- ConsumeMessageBatchMaxSize int
-
- // Batch pull size
- PullBatchSize int32
-
- // Whether update subscription relationship when every pull
- PostSubscriptionWhenPull bool
-
- // Max re-consume times. -1 means 16 times.
- //
- // If messages are re-consumed more than {@link #maxReconsumeTimes} before success, it's be directed to a deletion
- // queue waiting.
- MaxReconsumeTimes int
-
- // Suspending pulling time for cases requiring slow pulling like flow-control scenario.
- SuspendCurrentQueueTimeMillis time.Duration
-
- // Maximum amount of time a message may block the consuming thread.
- ConsumeTimeout time.Duration
-
- ConsumerModel MessageModel
- Strategy AllocateStrategy
- ConsumeOrderly bool
- FromWhere ConsumeFromWhere
- // TODO traceDispatcher
-}
-
// TODO hook
type defaultConsumer struct {
/**
@@ -279,25 +86,25 @@ type defaultConsumer struct {
* See <a href="http://rocketmq.apache.org/docs/core-concept/">here</a> for further discussion.
*/
consumerGroup string
- model MessageModel
- allocate func(string, string, []*kernel.MessageQueue, []string) []*kernel.MessageQueue
+ model primitive.MessageModel
+ allocate func(string, string, []*primitive.MessageQueue, []string) []*primitive.MessageQueue
unitMode bool
consumeOrderly bool
- fromWhere ConsumeFromWhere
+ fromWhere primitive.ConsumeFromWhere
cType ConsumeType
client *kernel.RMQClient
- mqChanged func(topic string, mqAll, mqDivided []*kernel.MessageQueue)
+ mqChanged func(topic string, mqAll, mqDivided []*primitive.MessageQueue)
state kernel.ServiceState
pause bool
once sync.Once
- option ConsumerOption
- // key: int, hash(*kernel.MessageQueue)
+ option primitive.ConsumerOption
+ // key: int, hash(*primitive.MessageQueue)
// value: *processQueue
processQueueTable sync.Map
// key: topic(string)
- // value: map[int]*kernel.MessageQueue
+ // value: map[int]*primitive.MessageQueue
topicSubscribeInfoTable sync.Map
// key: topic
@@ -314,19 +121,19 @@ func (dc *defaultConsumer) persistConsumerOffset() {
rlog.Errorf("consumer state error: %s", err.Error())
return
}
- mqs := make([]*kernel.MessageQueue, 0)
+ mqs := make([]*primitive.MessageQueue, 0)
dc.processQueueTable.Range(func(key, value interface{}) bool {
- mqs = append(mqs, key.(*kernel.MessageQueue))
+ mqs = append(mqs, key.(*primitive.MessageQueue))
return true
})
dc.storage.persist(mqs)
}
-func (dc *defaultConsumer) updateTopicSubscribeInfo(topic string, mqs []*kernel.MessageQueue) {
+func (dc *defaultConsumer) updateTopicSubscribeInfo(topic string, mqs []*primitive.MessageQueue) {
_, exist := dc.subscriptionDataTable.Load(topic)
// does subscribe, if true, replace it
if exist {
- mqSet := make(map[int]*kernel.MessageQueue, 0)
+ mqSet := make(map[int]*primitive.MessageQueue, 0)
for idx := range mqs {
mq := mqs[idx]
mqSet[mq.HashCode()] = mq
@@ -355,23 +162,23 @@ func (dc *defaultConsumer) doBalance() {
rlog.Warnf("do balance of group: %s, but topic: %s does not exist.", dc.consumerGroup, topic)
return true
}
- mqs := v.([]*kernel.MessageQueue)
+ mqs := v.([]*primitive.MessageQueue)
switch dc.model {
- case BroadCasting:
+ case primitive.BroadCasting:
changed := dc.updateProcessQueueTable(topic, mqs)
if changed {
dc.mqChanged(topic, mqs, mqs)
rlog.Infof("messageQueueChanged, Group: %s, Topic: %s, MessageQueues: %v",
dc.consumerGroup, topic, mqs)
}
- case Clustering:
+ case primitive.Clustering:
cidAll := dc.findConsumerList(topic)
if cidAll == nil {
rlog.Warnf("do balance for Group: %s, Topic: %s get consumer id list failed",
dc.consumerGroup, topic)
return true
}
- mqAll := make([]*kernel.MessageQueue, len(mqs))
+ mqAll := make([]*primitive.MessageQueue, len(mqs))
copy(mqAll, mqs)
sort.Strings(cidAll)
sort.SliceStable(mqAll, func(i, j int) bool {
@@ -390,9 +197,9 @@ func (dc *defaultConsumer) doBalance() {
changed := dc.updateProcessQueueTable(topic, allocateResult)
if changed {
dc.mqChanged(topic, mqAll, allocateResult)
- rlog.Infof("do balance result changed, allocateMessageQueueStrategyName=%s, group=%s, "+
+ rlog.Infof("do balance result changed, group=%s, "+
"topic=%s, clientId=%s, mqAllSize=%d, cidAllSize=%d, rebalanceResultSize=%d, "+
- "rebalanceResultSet=%v", string(dc.option.Strategy), dc.consumerGroup, topic, dc.client.ClientID(), len(mqAll),
+ "rebalanceResultSet=%v", dc.consumerGroup, topic, dc.client.ClientID(), len(mqAll),
len(cidAll), len(allocateResult), allocateResult)
}
@@ -418,12 +225,12 @@ func (dc *defaultConsumer) makeSureStateOK() error {
}
type lockBatchRequestBody struct {
- ConsumerGroup string `json:"consumerGroup"`
- ClientId string `json:"clientId"`
- MQs []*kernel.MessageQueue `json:"mqSet"`
+ ConsumerGroup string `json:"consumerGroup"`
+ ClientId string `json:"clientId"`
+ MQs []*primitive.MessageQueue `json:"mqSet"`
}
-func (dc *defaultConsumer) lock(mq *kernel.MessageQueue) bool {
+func (dc *defaultConsumer) lock(mq *primitive.MessageQueue) bool {
brokerResult := kernel.FindBrokerAddressInSubscribe(mq.BrokerName, kernel.MasterId, true)
if brokerResult == nil {
@@ -433,7 +240,7 @@ func (dc *defaultConsumer) lock(mq *kernel.MessageQueue) bool {
body := &lockBatchRequestBody{
ConsumerGroup: dc.consumerGroup,
ClientId: dc.client.ClientID(),
- MQs: []*kernel.MessageQueue{mq},
+ MQs: []*primitive.MessageQueue{mq},
}
lockedMQ := dc.doLock(brokerResult.BrokerAddr, body)
var lockOK bool
@@ -453,7 +260,7 @@ func (dc *defaultConsumer) lock(mq *kernel.MessageQueue) bool {
return lockOK
}
-func (dc *defaultConsumer) unlock(mq *kernel.MessageQueue, oneway bool) {
+func (dc *defaultConsumer) unlock(mq *primitive.MessageQueue, oneway bool) {
brokerResult := kernel.FindBrokerAddressInSubscribe(mq.BrokerName, kernel.MasterId, true)
if brokerResult == nil {
@@ -463,14 +270,14 @@ func (dc *defaultConsumer) unlock(mq *kernel.MessageQueue, oneway bool) {
body := &lockBatchRequestBody{
ConsumerGroup: dc.consumerGroup,
ClientId: dc.client.ClientID(),
- MQs: []*kernel.MessageQueue{mq},
+ MQs: []*primitive.MessageQueue{mq},
}
dc.doUnlock(brokerResult.BrokerAddr, body, oneway)
rlog.Warnf("unlock messageQueue. group:%s, clientId:%s, mq:%s",
dc.consumerGroup, dc.client.ClientID(), mq.String())
}
-func (dc *defaultConsumer) lockAll(mq kernel.MessageQueue) {
+func (dc *defaultConsumer) lockAll(mq primitive.MessageQueue) {
mqMapSet := dc.buildProcessQueueTableByBrokerName()
for broker, mqs := range mqMapSet {
if len(mqs) == 0 {
@@ -539,7 +346,7 @@ func (dc *defaultConsumer) unlockAll(oneway bool) {
}
}
-func (dc *defaultConsumer) doLock(addr string, body *lockBatchRequestBody) []kernel.MessageQueue {
+func (dc *defaultConsumer) doLock(addr string, body *lockBatchRequestBody) []primitive.MessageQueue {
data, _ := json.Marshal(body)
request := remote.NewRemotingCommand(kernel.ReqLockBatchMQ, nil, data)
response, err := dc.client.InvokeSync(addr, request, 1*time.Second)
@@ -548,7 +355,7 @@ func (dc *defaultConsumer) doLock(addr string, body *lockBatchRequestBody) []ker
return nil
}
lockOKMQSet := struct {
- MQs []kernel.MessageQueue `json:"lockOKMQSet"`
+ MQs []primitive.MessageQueue `json:"lockOKMQSet"`
}{}
err = json.Unmarshal(response.Body, &lockOKMQSet)
if err != nil {
@@ -577,14 +384,14 @@ func (dc *defaultConsumer) doUnlock(addr string, body *lockBatchRequestBody, one
}
}
-func (dc *defaultConsumer) buildProcessQueueTableByBrokerName() map[string][]*kernel.MessageQueue {
- result := make(map[string][]*kernel.MessageQueue, 0)
+func (dc *defaultConsumer) buildProcessQueueTableByBrokerName() map[string][]*primitive.MessageQueue {
+ result := make(map[string][]*primitive.MessageQueue, 0)
dc.processQueueTable.Range(func(key, value interface{}) bool {
- mq := key.(*kernel.MessageQueue)
+ mq := key.(*primitive.MessageQueue)
mqs, exist := result[mq.BrokerName]
if !exist {
- mqs = make([]*kernel.MessageQueue, 0)
+ mqs = make([]*primitive.MessageQueue, 0)
}
mqs = append(mqs, mq)
result[mq.BrokerName] = mqs
@@ -595,15 +402,15 @@ func (dc *defaultConsumer) buildProcessQueueTableByBrokerName() map[string][]*ke
}
// TODO 问题不少 需要再好好对一下
-func (dc *defaultConsumer) updateProcessQueueTable(topic string, mqs []*kernel.MessageQueue) bool {
+func (dc *defaultConsumer) updateProcessQueueTable(topic string, mqs []*primitive.MessageQueue) bool {
var changed bool
- mqSet := make(map[*kernel.MessageQueue]bool)
+ mqSet := make(map[*primitive.MessageQueue]bool)
for idx := range mqs {
mqSet[mqs[idx]] = true
}
// TODO
dc.processQueueTable.Range(func(key, value interface{}) bool {
- mq := key.(*kernel.MessageQueue)
+ mq := key.(*primitive.MessageQueue)
pq := value.(*processQueue)
if mq.Topic == topic {
if !mqSet[mq] {
@@ -666,13 +473,13 @@ func (dc *defaultConsumer) updateProcessQueueTable(topic string, mqs []*kernel.M
return changed
}
-func (dc *defaultConsumer) removeUnnecessaryMessageQueue(mq *kernel.MessageQueue, pq *processQueue) bool {
- dc.storage.persist([]*kernel.MessageQueue{mq})
+func (dc *defaultConsumer) removeUnnecessaryMessageQueue(mq *primitive.MessageQueue, pq *processQueue) bool {
+ dc.storage.persist([]*primitive.MessageQueue{mq})
dc.storage.remove(mq)
return true
}
-func (dc *defaultConsumer) computePullFromWhere(mq *kernel.MessageQueue) int64 {
+func (dc *defaultConsumer) computePullFromWhere(mq *primitive.MessageQueue) int64 {
if dc.cType == _PullConsume {
return 0
}
@@ -682,7 +489,7 @@ func (dc *defaultConsumer) computePullFromWhere(mq *kernel.MessageQueue) int64 {
result = lastOffset
} else {
switch dc.fromWhere {
- case ConsumeFromLastOffset:
+ case primitive.ConsumeFromLastOffset:
if lastOffset == -1 {
if strings.HasPrefix(mq.Topic, kernel.RetryGroupTopicPrefix) {
lastOffset = 0
@@ -697,11 +504,11 @@ func (dc *defaultConsumer) computePullFromWhere(mq *kernel.MessageQueue) int64 {
} else {
result = -1
}
- case ConsumeFromFirstOffset:
+ case primitive.ConsumeFromFirstOffset:
if lastOffset == -1 {
result = 0
}
- case ConsumeFromTimestamp:
+ case primitive.ConsumeFromTimestamp:
if lastOffset == -1 {
if strings.HasPrefix(mq.Topic, kernel.RetryGroupTopicPrefix) {
lastOffset, err := dc.queryMaxOffset(mq)
@@ -760,12 +567,12 @@ func (dc *defaultConsumer) findConsumerList(topic string) []string {
return nil
}
-func (dc *defaultConsumer) sendBack(msg *kernel.MessageExt, level int) error {
+func (dc *defaultConsumer) sendBack(msg *primitive.MessageExt, level int) error {
return nil
}
// QueryMaxOffset with specific queueId and topic
-func (dc *defaultConsumer) queryMaxOffset(mq *kernel.MessageQueue) (int64, error) {
+func (dc *defaultConsumer) queryMaxOffset(mq *primitive.MessageQueue) (int64, error) {
brokerAddr := kernel.FindBrokerAddrByName(mq.BrokerName)
if brokerAddr == "" {
kernel.UpdateTopicRouteInfo(mq.Topic)
@@ -790,7 +597,7 @@ func (dc *defaultConsumer) queryMaxOffset(mq *kernel.MessageQueue) (int64, error
}
// SearchOffsetByTimestamp with specific queueId and topic
-func (dc *defaultConsumer) searchOffsetByTimestamp(mq *kernel.MessageQueue, timestamp int64) (int64, error) {
+func (dc *defaultConsumer) searchOffsetByTimestamp(mq *primitive.MessageQueue, timestamp int64) (int64, error) {
brokerAddr := kernel.FindBrokerAddrByName(mq.BrokerName)
if brokerAddr == "" {
kernel.UpdateTopicRouteInfo(mq.Topic)
@@ -815,19 +622,19 @@ func (dc *defaultConsumer) searchOffsetByTimestamp(mq *kernel.MessageQueue, time
return strconv.ParseInt(response.ExtFields["offset"], 10, 64)
}
-func buildSubscriptionData(topic string, selector MessageSelector) *kernel.SubscriptionData {
+func buildSubscriptionData(topic string, selector primitive.MessageSelector) *kernel.SubscriptionData {
subData := &kernel.SubscriptionData{
Topic: topic,
SubString: selector.Expression,
ExpType: string(selector.Type),
}
- if selector.Type != "" && selector.Type != TAG {
+ if selector.Type != "" && selector.Type != primitive.TAG {
return subData
}
if selector.Expression == "" || selector.Expression == _SubAll {
- subData.ExpType = string(TAG)
+ subData.ExpType = string(primitive.TAG)
subData.SubString = _SubAll
} else {
tags := strings.Split(selector.Expression, "\\|\\|")
@@ -847,7 +654,7 @@ func buildSubscriptionData(topic string, selector MessageSelector) *kernel.Subsc
return subData
}
-func getNextQueueOf(topic string) *kernel.MessageQueue {
+func getNextQueueOf(topic string) *primitive.MessageQueue {
queues, err := kernel.FetchSubscribeMessageQueues(topic)
if err != nil && len(queues) > 0 {
rlog.Error(err.Error())
@@ -890,7 +697,7 @@ func clearCommitOffsetFlag(sysFlag int32) int32 {
return sysFlag & (^0x1 << 0)
}
-func tryFindBroker(mq *kernel.MessageQueue) *kernel.FindBrokerResult {
+func tryFindBroker(mq *primitive.MessageQueue) *kernel.FindBrokerResult {
result := kernel.FindBrokerAddressInSubscribe(mq.BrokerName, recalculatePullFromWhichNode(mq), false)
if result == nil {
@@ -903,11 +710,11 @@ var (
pullFromWhichNodeTable sync.Map
)
-func updatePullFromWhichNode(mq *kernel.MessageQueue, brokerId int64) {
+func updatePullFromWhichNode(mq *primitive.MessageQueue, brokerId int64) {
pullFromWhichNodeTable.Store(mq.HashCode(), brokerId)
}
-func recalculatePullFromWhichNode(mq *kernel.MessageQueue) int64 {
+func recalculatePullFromWhichNode(mq *primitive.MessageQueue) int64 {
v, exist := pullFromWhichNodeTable.Load(mq.HashCode())
if exist {
return v.(int64)
diff --git a/consumer/consumer_test.go b/internal/consumer/consumer_test.go
similarity index 100%
rename from consumer/consumer_test.go
rename to internal/consumer/consumer_test.go
diff --git a/consumer/offset_store.go b/internal/consumer/offset_store.go
similarity index 86%
rename from consumer/offset_store.go
rename to internal/consumer/offset_store.go
index 0edf828..702ba0d 100644
--- a/consumer/offset_store.go
+++ b/internal/consumer/offset_store.go
@@ -20,8 +20,9 @@ package consumer
import (
"encoding/json"
"fmt"
- "github.com/apache/rocketmq-client-go/kernel"
- "github.com/apache/rocketmq-client-go/remote"
+ "github.com/apache/rocketmq-client-go/internal/kernel"
+ "github.com/apache/rocketmq-client-go/internal/remote"
+ "github.com/apache/rocketmq-client-go/primitive"
"github.com/apache/rocketmq-client-go/rlog"
"github.com/apache/rocketmq-client-go/utils"
"os"
@@ -50,10 +51,10 @@ func init() {
}
type OffsetStore interface {
- persist(mqs []*kernel.MessageQueue)
- remove(mq *kernel.MessageQueue)
- read(mq *kernel.MessageQueue, t readType) int64
- update(mq *kernel.MessageQueue, offset int64, increaseOnly bool)
+ persist(mqs []*primitive.MessageQueue)
+ remove(mq *primitive.MessageQueue)
+ read(mq *primitive.MessageQueue, t readType) int64
+ update(mq *primitive.MessageQueue, offset int64, increaseOnly bool)
}
type localFileOffsetStore struct {
@@ -108,7 +109,7 @@ func (local *localFileOffsetStore) load() {
}
}
-func (local *localFileOffsetStore) read(mq *kernel.MessageQueue, t readType) int64 {
+func (local *localFileOffsetStore) read(mq *primitive.MessageQueue, t readType) int64 {
if t == _ReadFromMemory || t == _ReadMemoryThenStore {
off := readFromMemory(local.OffsetTable, mq)
if off >= 0 || (off == -1 && t == _ReadFromMemory) {
@@ -119,7 +120,7 @@ func (local *localFileOffsetStore) read(mq *kernel.MessageQueue, t readType) int
return readFromMemory(local.OffsetTable, mq)
}
-func (local *localFileOffsetStore) update(mq *kernel.MessageQueue, offset int64, increaseOnly bool) {
+func (local *localFileOffsetStore) update(mq *primitive.MessageQueue, offset int64, increaseOnly bool) {
rlog.Debugf("update offset: %s to %d", mq, offset)
localOffset, exist := local.OffsetTable[mq.Topic]
if !exist {
@@ -143,7 +144,7 @@ func (local *localFileOffsetStore) update(mq *kernel.MessageQueue, offset int64,
}
}
-func (local *localFileOffsetStore) persist(mqs []*kernel.MessageQueue) {
+func (local *localFileOffsetStore) persist(mqs []*primitive.MessageQueue) {
if len(mqs) == 0 {
return
}
@@ -171,7 +172,7 @@ func (local *localFileOffsetStore) persist(mqs []*kernel.MessageQueue) {
utils.CheckError(fmt.Sprintf("persist offset to %s", local.path), utils.WriteToFile(local.path, data))
}
-func (local *localFileOffsetStore) remove(mq *kernel.MessageQueue) {
+func (local *localFileOffsetStore) remove(mq *primitive.MessageQueue) {
// nothing to do
}
@@ -190,7 +191,7 @@ func NewRemoteOffsetStore(group string, client *kernel.RMQClient) OffsetStore {
}
}
-func (r *remoteBrokerOffsetStore) persist(mqs []*kernel.MessageQueue) {
+func (r *remoteBrokerOffsetStore) persist(mqs []*primitive.MessageQueue) {
r.mutex.Lock()
defer r.mutex.Unlock()
if len(mqs) == 0 {
@@ -217,7 +218,7 @@ func (r *remoteBrokerOffsetStore) persist(mqs []*kernel.MessageQueue) {
}
}
-func (r *remoteBrokerOffsetStore) remove(mq *kernel.MessageQueue) {
+func (r *remoteBrokerOffsetStore) remove(mq *primitive.MessageQueue) {
r.mutex.Lock()
defer r.mutex.Unlock()
if mq == nil {
@@ -231,7 +232,7 @@ func (r *remoteBrokerOffsetStore) remove(mq *kernel.MessageQueue) {
delete(offset, mq.QueueId)
}
-func (r *remoteBrokerOffsetStore) read(mq *kernel.MessageQueue, t readType) int64 {
+func (r *remoteBrokerOffsetStore) read(mq *primitive.MessageQueue, t readType) int64 {
r.mutex.RLock()
if t == _ReadFromMemory || t == _ReadMemoryThenStore {
off := readFromMemory(r.OffsetTable, mq)
@@ -251,7 +252,7 @@ func (r *remoteBrokerOffsetStore) read(mq *kernel.MessageQueue, t readType) int6
return off
}
-func (r *remoteBrokerOffsetStore) update(mq *kernel.MessageQueue, offset int64, increaseOnly bool) {
+func (r *remoteBrokerOffsetStore) update(mq *primitive.MessageQueue, offset int64, increaseOnly bool) {
rlog.Debugf("update offset: %s to %d", mq, offset)
r.mutex.Lock()
defer r.mutex.Unlock()
@@ -278,7 +279,7 @@ func (r *remoteBrokerOffsetStore) update(mq *kernel.MessageQueue, offset int64,
}
}
-func (r *remoteBrokerOffsetStore) fetchConsumeOffsetFromBroker(group string, mq *kernel.MessageQueue) (int64, error) {
+func (r *remoteBrokerOffsetStore) fetchConsumeOffsetFromBroker(group string, mq *primitive.MessageQueue) (int64, error) {
broker := kernel.FindBrokerAddrByName(mq.BrokerName)
if broker == "" {
kernel.UpdateTopicRouteInfo(mq.Topic)
@@ -330,7 +331,7 @@ func (r *remoteBrokerOffsetStore) updateConsumeOffsetToBroker(group, topic strin
return r.client.InvokeOneWay(broker, cmd, 5*time.Second)
}
-func readFromMemory(table map[string]map[int]*queueOffset, mq *kernel.MessageQueue) int64 {
+func readFromMemory(table map[string]map[int]*queueOffset, mq *primitive.MessageQueue) int64 {
localOffset, exist := table[mq.Topic]
if !exist {
return -1
diff --git a/consumer/process_queue.go b/internal/consumer/process_queue.go
similarity index 87%
rename from consumer/process_queue.go
rename to internal/consumer/process_queue.go
index 77e84fa..2ec865c 100644
--- a/consumer/process_queue.go
+++ b/internal/consumer/process_queue.go
@@ -18,7 +18,7 @@ limitations under the License.
package consumer
import (
- "github.com/apache/rocketmq-client-go/kernel"
+ "github.com/apache/rocketmq-client-go/primitive"
"github.com/apache/rocketmq-client-go/rlog"
"github.com/emirpasic/gods/maps/treemap"
"github.com/emirpasic/gods/utils"
@@ -51,7 +51,7 @@ type processQueue struct {
consuming bool
msgAccCnt int64
lockConsume sync.Mutex
- msgCh chan []*kernel.MessageExt
+ msgCh chan []*primitive.MessageExt
}
func newProcessQueue() *processQueue {
@@ -60,12 +60,12 @@ func newProcessQueue() *processQueue {
lastPullTime: time.Now(),
lastConsumeTime: time.Now(),
lastLockTime: time.Now(),
- msgCh: make(chan []*kernel.MessageExt, 32),
+ msgCh: make(chan []*primitive.MessageExt, 32),
}
return pq
}
-func (pq *processQueue) putMessage(messages ...*kernel.MessageExt) {
+func (pq *processQueue) putMessage(messages ...*primitive.MessageExt) {
if messages == nil || len(messages) == 0 {
return
}
@@ -92,7 +92,7 @@ func (pq *processQueue) putMessage(messages ...*kernel.MessageExt) {
}
msg := messages[len(messages)-1]
- maxOffset, err := strconv.ParseInt(msg.Properties[kernel.PropertyMaxOffset], 10, 64)
+ maxOffset, err := strconv.ParseInt(msg.Properties[primitive.PropertyMaxOffset], 10, 64)
if err != nil {
acc := maxOffset - msg.QueueOffset
if acc > 0 {
@@ -101,7 +101,7 @@ func (pq *processQueue) putMessage(messages ...*kernel.MessageExt) {
}
}
-func (pq *processQueue) removeMessage(messages ...*kernel.MessageExt) int64 {
+func (pq *processQueue) removeMessage(messages ...*primitive.MessageExt) int64 {
result := int64(-1)
pq.mutex.Lock()
pq.lastConsumeTime = time.Now()
@@ -152,8 +152,8 @@ func (pq *processQueue) cleanExpiredMsg(consumer defaultConsumer) {
return
}
_, firstValue := pq.msgCache.Min()
- msg := firstValue.(*kernel.MessageExt)
- startTime := msg.Properties[kernel.PropertyConsumeStartTime]
+ msg := firstValue.(*primitive.MessageExt)
+ startTime := msg.Properties[primitive.PropertyConsumeStartTime]
if startTime != "" {
st, err := strconv.ParseInt(startTime, 10, 64)
if err != nil {
@@ -187,15 +187,15 @@ func (pq *processQueue) getMaxSpan() int {
return int(lastKey.(int64) - firstKey.(int64))
}
-func (pq *processQueue) getMessages() []*kernel.MessageExt {
+func (pq *processQueue) getMessages() []*primitive.MessageExt {
return <-pq.msgCh
}
-func (pq *processQueue) takeMessages(number int) []*kernel.MessageExt {
+func (pq *processQueue) takeMessages(number int) []*primitive.MessageExt {
for pq.msgCache.Empty() {
time.Sleep(10 * time.Millisecond)
}
- result := make([]*kernel.MessageExt, number)
+ result := make([]*primitive.MessageExt, number)
i := 0
pq.mutex.Lock()
for ; i < number; i++ {
@@ -203,7 +203,7 @@ func (pq *processQueue) takeMessages(number int) []*kernel.MessageExt {
if v == nil {
break
}
- result[i] = v.(*kernel.MessageExt)
+ result[i] = v.(*primitive.MessageExt)
pq.msgCache.Remove(k)
}
pq.mutex.Unlock()
diff --git a/consumer/pull_consumer.go b/internal/consumer/pull_consumer.go
similarity index 70%
rename from consumer/pull_consumer.go
rename to internal/consumer/pull_consumer.go
index 1b4f950..9250771 100644
--- a/consumer/pull_consumer.go
+++ b/internal/consumer/pull_consumer.go
@@ -21,27 +21,23 @@ import (
"context"
"errors"
"fmt"
- "github.com/apache/rocketmq-client-go/kernel"
+ "github.com/apache/rocketmq-client-go/internal/kernel"
+ "github.com/apache/rocketmq-client-go/primitive"
"strconv"
"sync"
)
-type MessageSelector struct {
- Type ExpressionType
- Expression string
-}
-
type PullConsumer interface {
Start()
Shutdown()
- Pull(ctx context.Context, topic string, selector MessageSelector, numbers int) (*kernel.PullResult, error)
+ Pull(ctx context.Context, topic string, selector primitive.MessageSelector, numbers int) (*primitive.PullResult, error)
}
var (
queueCounterTable sync.Map
)
-func NewConsumer(config ConsumerOption) *defaultPullConsumer {
+func NewConsumer(config primitive.ConsumerOption) *defaultPullConsumer {
return &defaultPullConsumer{
option: config,
}
@@ -49,10 +45,10 @@ func NewConsumer(config ConsumerOption) *defaultPullConsumer {
type defaultPullConsumer struct {
state kernel.ServiceState
- option ConsumerOption
+ option primitive.ConsumerOption
client *kernel.RMQClient
GroupName string
- Model MessageModel
+ Model primitive.MessageModel
UnitMode bool
}
@@ -60,7 +56,7 @@ func (c *defaultPullConsumer) Start() {
c.state = kernel.StateRunning
}
-func (c *defaultPullConsumer) Pull(ctx context.Context, topic string, selector MessageSelector, numbers int) (*kernel.PullResult, error) {
+func (c *defaultPullConsumer) Pull(ctx context.Context, topic string, selector primitive.MessageSelector, numbers int) (*primitive.PullResult, error) {
mq := getNextQueueOf(topic)
if mq == nil {
return nil, fmt.Errorf("prepard to pull topic: %s, but no queue is founded", topic)
@@ -78,22 +74,22 @@ func (c *defaultPullConsumer) Pull(ctx context.Context, topic string, selector M
}
// SubscribeWithChan ack manually
-func (c *defaultPullConsumer) SubscribeWithChan(topic, selector MessageSelector) (chan *kernel.Message, error) {
+func (c *defaultPullConsumer) SubscribeWithChan(topic, selector primitive.MessageSelector) (chan *primitive.Message, error) {
return nil, nil
}
// SubscribeWithFunc ack automatic
-func (c *defaultPullConsumer) SubscribeWithFunc(topic, selector MessageSelector,
- f func(msg *kernel.Message) ConsumeResult) error {
+func (c *defaultPullConsumer) SubscribeWithFunc(topic, selector primitive.MessageSelector,
+ f func(msg *primitive.Message) primitive.ConsumeResult) error {
return nil
}
-func (c *defaultPullConsumer) ACK(msg *kernel.Message, result ConsumeResult) {
+func (c *defaultPullConsumer) ACK(msg *primitive.Message, result primitive.ConsumeResult) {
}
-func (c *defaultPullConsumer) pull(ctx context.Context, mq *kernel.MessageQueue, data *kernel.SubscriptionData,
- offset int64, numbers int) (*kernel.PullResult, error) {
+func (c *defaultPullConsumer) pull(ctx context.Context, mq *primitive.MessageQueue, data *kernel.SubscriptionData,
+ offset int64, numbers int) (*primitive.PullResult, error) {
err := c.makeSureStateOK()
if err != nil {
return nil, err
@@ -117,7 +113,7 @@ func (c *defaultPullConsumer) pull(ctx context.Context, mq *kernel.MessageQueue,
return nil, fmt.Errorf("the broker %s does not exist", mq.BrokerName)
}
- if (data.ExpType == string(TAG)) && brokerResult.BrokerVersion < kernel.V4_1_0 {
+ if (data.ExpType == string(primitive.TAG)) && brokerResult.BrokerVersion < kernel.V4_1_0 {
return nil, fmt.Errorf("the broker [%s, %v] does not upgrade to support for filter message by %v",
mq.BrokerName, brokerResult.BrokerVersion, data.ExpType)
}
@@ -140,7 +136,7 @@ func (c *defaultPullConsumer) pull(ctx context.Context, mq *kernel.MessageQueue,
ExpressionType: string(data.ExpType),
}
- if data.ExpType == string(TAG) {
+ if data.ExpType == string(primitive.TAG) {
pullRequest.SubVersion = 0
} else {
pullRequest.SubVersion = data.SubVersion
@@ -161,18 +157,18 @@ func (c *defaultPullConsumer) subscriptionAutomatically(topic string) {
// TODO
}
-func (c *defaultPullConsumer) nextOffsetOf(queue *kernel.MessageQueue) int64 {
+func (c *defaultPullConsumer) nextOffsetOf(queue *primitive.MessageQueue) int64 {
return 0
}
-func processPullResult(mq *kernel.MessageQueue, result *kernel.PullResult, data *kernel.SubscriptionData) {
+func processPullResult(mq *primitive.MessageQueue, result *primitive.PullResult, data *kernel.SubscriptionData) {
updatePullFromWhichNode(mq, result.SuggestWhichBrokerId)
switch result.Status {
- case kernel.PullFound:
+ case primitive.PullFound:
msgs := result.GetMessageExts()
msgListFilterAgain := msgs
if len(data.Tags) > 0 && data.ClassFilterMode {
- msgListFilterAgain = make([]*kernel.MessageExt, len(msgs))
+ msgListFilterAgain = make([]*primitive.MessageExt, len(msgs))
for _, msg := range msgs {
_, exist := data.Tags[msg.GetTags()]
if exist {
@@ -184,13 +180,13 @@ func processPullResult(mq *kernel.MessageQueue, result *kernel.PullResult, data
// TODO hook
for _, msg := range msgListFilterAgain {
- traFlag, _ := strconv.ParseBool(msg.Properties[kernel.PropertyTransactionPrepared])
+ traFlag, _ := strconv.ParseBool(msg.Properties[primitive.PropertyTransactionPrepared])
if traFlag {
- msg.TransactionId = msg.Properties[kernel.PropertyUniqueClientMessageIdKeyIndex]
+ msg.TransactionId = msg.Properties[primitive.PropertyUniqueClientMessageIdKeyIndex]
}
- msg.Properties[kernel.PropertyMinOffset] = strconv.FormatInt(result.MinOffset, 10)
- msg.Properties[kernel.PropertyMaxOffset] = strconv.FormatInt(result.MaxOffset, 10)
+ msg.Properties[primitive.PropertyMinOffset] = strconv.FormatInt(result.MinOffset, 10)
+ msg.Properties[primitive.PropertyMaxOffset] = strconv.FormatInt(result.MaxOffset, 10)
}
result.SetMessageExts(msgListFilterAgain)
diff --git a/consumer/pull_consumer_test.go b/internal/consumer/pull_consumer_test.go
similarity index 100%
rename from consumer/pull_consumer_test.go
rename to internal/consumer/pull_consumer_test.go
diff --git a/consumer/push_consumer.go b/internal/consumer/push_consumer.go
similarity index 88%
rename from consumer/push_consumer.go
rename to internal/consumer/push_consumer.go
index 1cb5efd..b3d1e5d 100644
--- a/consumer/push_consumer.go
+++ b/internal/consumer/push_consumer.go
@@ -21,7 +21,8 @@ import (
"context"
"errors"
"fmt"
- "github.com/apache/rocketmq-client-go/kernel"
+ "github.com/apache/rocketmq-client-go/internal/kernel"
+ "github.com/apache/rocketmq-client-go/primitive"
"github.com/apache/rocketmq-client-go/rlog"
"github.com/apache/rocketmq-client-go/utils"
"math"
@@ -38,31 +39,28 @@ import (
// See quick start/Consumer in the example module for a typical usage.
//
// <strong>Thread Safety:</strong> After initialization, the instance can be regarded as thread-safe.
-type ConsumeResult int
const (
- Mb = 1024 * 1024
- ConsumeSuccess ConsumeResult = iota
- ConsumeRetryLater
+ Mb = 1024 * 1024
)
type PushConsumer interface {
Start() error
Shutdown()
- Subscribe(topic string, selector MessageSelector,
- f func(*ConsumeMessageContext, []*kernel.MessageExt) (ConsumeResult, error)) error
+ Subscribe(topic string, selector primitive.MessageSelector,
+ f func(*ConsumeMessageContext, []*primitive.MessageExt) (primitive.ConsumeResult, error)) error
}
type pushConsumer struct {
*defaultConsumer
queueFlowControlTimes int
queueMaxSpanFlowControlTimes int
- consume func(*ConsumeMessageContext, []*kernel.MessageExt) (ConsumeResult, error)
- submitToConsume func(*processQueue, *kernel.MessageQueue)
+ consume func(*ConsumeMessageContext, []*primitive.MessageExt) (primitive.ConsumeResult, error)
+ submitToConsume func(*processQueue, *primitive.MessageQueue)
subscribedTopic map[string]string
}
-func NewPushConsumer(consumerGroup string, opt ConsumerOption) (PushConsumer, error) {
+func NewPushConsumer(consumerGroup string, opt primitive.ConsumerOption) (PushConsumer, error) {
if err := utils.VerifyIP(opt.NameServerAddr); err != nil {
return nil, err
}
@@ -86,23 +84,10 @@ func NewPushConsumer(consumerGroup string, opt ConsumerOption) (PushConsumer, er
option: opt,
}
- switch opt.Strategy {
- case StrategyAveragely:
- dc.allocate = allocateByAveragely
- case StrategyAveragelyCircle:
- dc.allocate = allocateByAveragelyCircle
- case StrategyConfig:
- dc.allocate = allocateByConfig
- case StrategyConsistentHash:
- dc.allocate = allocateByConsistentHash
- case StrategyMachineNearby:
- dc.allocate = allocateByMachineNearby
- case StrategyMachineRoom:
- dc.allocate = allocateByMachineRoom
- default:
- dc.allocate = allocateByAveragely
+ if opt.Strategy == nil {
+ opt.Strategy = primitive.AllocateByAveragely
}
-
+ dc.allocate = opt.Strategy
p := &pushConsumer{
defaultConsumer: dc,
subscribedTopic: make(map[string]string, 0),
@@ -124,15 +109,15 @@ func (pc *pushConsumer) Start() error {
pc.state = kernel.StateStartFailed
pc.validate()
- if pc.model == Clustering {
+ if pc.model == primitive.Clustering {
// set retry topic
retryTopic := kernel.GetRetryTopic(pc.consumerGroup)
pc.subscriptionDataTable.Store(retryTopic, buildSubscriptionData(retryTopic,
- MessageSelector{TAG, _SubAll}))
+ primitive.MessageSelector{primitive.TAG, _SubAll}))
}
pc.client = kernel.GetOrNewRocketMQClient(pc.option.ClientOption)
- if pc.model == Clustering {
+ if pc.model == primitive.Clustering {
pc.option.ChangeInstanceNameToPID()
pc.storage = NewRemoteOffsetStore(pc.consumerGroup, pc.client)
} else {
@@ -177,8 +162,8 @@ func (pc *pushConsumer) Start() error {
func (pc *pushConsumer) Shutdown() {}
-func (pc *pushConsumer) Subscribe(topic string, selector MessageSelector,
- f func(*ConsumeMessageContext, []*kernel.MessageExt) (ConsumeResult, error)) error {
+func (pc *pushConsumer) Subscribe(topic string, selector primitive.MessageSelector,
+ f func(*ConsumeMessageContext, []*primitive.MessageExt) (primitive.ConsumeResult, error)) error {
if pc.state != kernel.StateCreateJust {
return errors.New("subscribe topic only started before")
}
@@ -197,7 +182,7 @@ func (pc *pushConsumer) PersistConsumerOffset() {
pc.defaultConsumer.persistConsumerOffset()
}
-func (pc *pushConsumer) UpdateTopicSubscribeInfo(topic string, mqs []*kernel.MessageQueue) {
+func (pc *pushConsumer) UpdateTopicSubscribeInfo(topic string, mqs []*primitive.MessageQueue) {
pc.defaultConsumer.updateTopicSubscribeInfo(topic, mqs)
}
@@ -213,7 +198,7 @@ func (pc *pushConsumer) IsUnitMode() bool {
return pc.unitMode
}
-func (pc *pushConsumer) messageQueueChanged(topic string, mqAll, mqDivided []*kernel.MessageQueue) {
+func (pc *pushConsumer) messageQueueChanged(topic string, mqAll, mqDivided []*primitive.MessageQueue) {
// TODO
}
@@ -399,7 +384,7 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
subExpression string
)
- if pc.model == Clustering {
+ if pc.model == primitive.Clustering {
commitOffsetValue = pc.storage.read(request.mq, _ReadFromMemory)
if commitOffsetValue > 0 {
commitOffsetEnable = true
@@ -423,7 +408,7 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
SysFlag: sysFlag,
CommitOffset: commitOffsetValue,
SubExpression: _SubAll,
- ExpressionType: string(TAG), // TODO
+ ExpressionType: string(primitive.TAG), // TODO
}
//
//if data.ExpType == string(TAG) {
@@ -445,14 +430,14 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
goto NEXT
}
- if result.Status == kernel.PullBrokerTimeout {
+ if result.Status == primitive.PullBrokerTimeout {
rlog.Warnf("pull broker: %s timeout", brokerResult.BrokerAddr)
sleepTime = _PullDelayTimeWhenError
goto NEXT
}
switch result.Status {
- case kernel.PullFound:
+ case primitive.PullFound:
rlog.Debugf("Topic: %s, QueueId: %d found messages: %d", request.mq.Topic, request.mq.QueueId,
len(result.GetMessageExts()))
prevRequestOffset := request.nextOffset
@@ -472,19 +457,19 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
rlog.Warnf("[BUG] pull message result maybe data wrong, [nextBeginOffset=%d, "+
"firstMsgOffset=%d, prevRequestOffset=%d]", result.NextBeginOffset, firstMsgOffset, prevRequestOffset)
}
- case kernel.PullNoNewMsg:
+ case primitive.PullNoNewMsg:
rlog.Debugf("Topic: %s, QueueId: %d no more msg, next offset: %d", request.mq.Topic, request.mq.QueueId, result.NextBeginOffset)
- case kernel.PullNoMsgMatched:
+ case primitive.PullNoMsgMatched:
request.nextOffset = result.NextBeginOffset
pc.correctTagsOffset(request)
- case kernel.PullOffsetIllegal:
+ case primitive.PullOffsetIllegal:
rlog.Warnf("the pull request offset illegal, {} {}", request.String(), result.String())
request.nextOffset = result.NextBeginOffset
pq.dropped = true
go func() {
time.Sleep(10 * time.Second)
pc.storage.update(request.mq, request.nextOffset, false)
- pc.storage.persist([]*kernel.MessageQueue{request.mq})
+ pc.storage.persist([]*primitive.MessageQueue{request.mq})
pc.storage.remove(request.mq)
rlog.Warnf("fix the pull request offset: %s", request.String())
}()
@@ -499,7 +484,7 @@ func (pc *pushConsumer) correctTagsOffset(pr *PullRequest) {
// TODO
}
-func (pc *pushConsumer) sendMessageBack(ctx *ConsumeMessageContext, msg *kernel.MessageExt) bool {
+func (pc *pushConsumer) sendMessageBack(ctx *ConsumeMessageContext, msg *primitive.MessageExt) bool {
return true
}
@@ -514,7 +499,7 @@ func (pc *pushConsumer) resume() {
rlog.Infof("resume consumer: %s", pc.consumerGroup)
}
-func (pc *pushConsumer) resetOffset(topic string, table map[kernel.MessageQueue]int64) {
+func (pc *pushConsumer) resetOffset(topic string, table map[primitive.MessageQueue]int64) {
//topic := cmd.ExtFields["topic"]
//group := cmd.ExtFields["group"]
//if topic == "" || group == "" {
@@ -529,7 +514,7 @@ func (pc *pushConsumer) resetOffset(topic string, table map[kernel.MessageQueue]
//rlog.Infof("invoke reset offset operation from broker. brokerAddr=%s, topic=%s, group=%s, timestamp=%v",
// from, topic, group, t)
//
- //offsetTable := make(map[kernel.MessageQueue]int64, 0)
+ //offsetTable := make(map[primitive.MessageQueue]int64, 0)
//err = json.Unmarshal(cmd.Body, &offsetTable)
//if err != nil {
// rlog.Warnf("received reset offset command from: %s, but parse offset table: %s", err.Error())
@@ -541,7 +526,7 @@ func (pc *pushConsumer) resetOffset(topic string, table map[kernel.MessageQueue]
// return
//}
- set := make(map[int]*kernel.MessageQueue, 0)
+ set := make(map[int]*primitive.MessageQueue, 0)
for k := range table {
set[k.HashCode()] = &k
}
@@ -559,7 +544,7 @@ func (pc *pushConsumer) resetOffset(topic string, table map[kernel.MessageQueue]
if !exist {
return
}
- queuesOfTopic := v.(map[int]*kernel.MessageQueue)
+ queuesOfTopic := v.(map[int]*primitive.MessageQueue)
for k := range queuesOfTopic {
q := set[k]
if q != nil {
@@ -575,9 +560,9 @@ func (pc *pushConsumer) resetOffset(topic string, table map[kernel.MessageQueue]
}
}
-func (pc *pushConsumer) removeUnnecessaryMessageQueue(mq *kernel.MessageQueue, pq *processQueue) bool {
+func (pc *pushConsumer) removeUnnecessaryMessageQueue(mq *primitive.MessageQueue, pq *processQueue) bool {
pc.defaultConsumer.removeUnnecessaryMessageQueue(mq, pq)
- if !pc.consumeOrderly || Clustering != pc.model {
+ if !pc.consumeOrderly || primitive.Clustering != pc.model {
return true
}
// TODO orderly
@@ -586,21 +571,21 @@ func (pc *pushConsumer) removeUnnecessaryMessageQueue(mq *kernel.MessageQueue, p
type ConsumeMessageContext struct {
consumerGroup string
- msgs []*kernel.MessageExt
- mq *kernel.MessageQueue
+ msgs []*primitive.MessageExt
+ mq *primitive.MessageQueue
success bool
status string
// mqTractContext
properties map[string]string
}
-func (pc *pushConsumer) consumeMessageCurrently(pq *processQueue, mq *kernel.MessageQueue) {
+func (pc *pushConsumer) consumeMessageCurrently(pq *processQueue, mq *primitive.MessageQueue) {
msgs := pq.getMessages()
if msgs == nil {
return
}
for count := 0; count < len(msgs); count++ {
- var subMsgs []*kernel.MessageExt
+ var subMsgs []*primitive.MessageExt
if count+pc.option.ConsumeMessageBatchMaxSize > len(msgs) {
subMsgs = msgs[count:]
count = len(msgs)
@@ -626,11 +611,11 @@ func (pc *pushConsumer) consumeMessageCurrently(pq *processQueue, mq *kernel.Mes
for idx := range subMsgs {
msg := subMsgs[idx]
if msg.Properties != nil {
- retryTopic := msg.Properties[kernel.PropertyRetryTopic]
+ retryTopic := msg.Properties[primitive.PropertyRetryTopic]
if retryTopic == "" && groupTopic == msg.Topic {
msg.Topic = retryTopic
}
- subMsgs[idx].Properties[kernel.PropertyConsumeStartTime] = strconv.FormatInt(
+ subMsgs[idx].Properties[primitive.PropertyConsumeStartTime] = strconv.FormatInt(
beginTime.UnixNano()/int64(time.Millisecond), 10)
}
}
@@ -640,7 +625,7 @@ func (pc *pushConsumer) consumeMessageCurrently(pq *processQueue, mq *kernel.Mes
ctx.properties["ConsumeContextType"] = "EXCEPTION"
} else if consumeRT >= pc.option.ConsumeTimeout {
ctx.properties["ConsumeContextType"] = "TIMEOUT"
- } else if result == ConsumeSuccess {
+ } else if result == primitive.ConsumeSuccess {
ctx.properties["ConsumeContextType"] = "SUCCESS"
} else {
ctx.properties["ConsumeContextType"] = "RECONSUME_LATER"
@@ -650,12 +635,12 @@ func (pc *pushConsumer) consumeMessageCurrently(pq *processQueue, mq *kernel.Mes
increaseConsumeRT(pc.consumerGroup, mq.Topic, consumeRT)
if !pq.dropped {
- msgBackFailed := make([]*kernel.MessageExt, 0)
- if result == ConsumeSuccess {
+ msgBackFailed := make([]*primitive.MessageExt, 0)
+ if result == primitive.ConsumeSuccess {
increaseConsumeOKTPS(pc.consumerGroup, mq.Topic, len(subMsgs))
} else {
increaseConsumeFailedTPS(pc.consumerGroup, mq.Topic, len(subMsgs))
- if pc.model == BroadCasting {
+ if pc.model == primitive.BroadCasting {
for i := 0; i < len(msgs); i++ {
rlog.Warnf("BROADCASTING, the message=%s consume failed, drop it, {}", subMsgs[i])
}
@@ -688,5 +673,5 @@ func (pc *pushConsumer) consumeMessageCurrently(pq *processQueue, mq *kernel.Mes
}
}
-func (pc *pushConsumer) consumeMessageOrderly(pq *processQueue, mq *kernel.MessageQueue) {
+func (pc *pushConsumer) consumeMessageOrderly(pq *processQueue, mq *primitive.MessageQueue) {
}
diff --git a/consumer/push_consumer_test.go b/internal/consumer/push_consumer_test.go
similarity index 100%
rename from consumer/push_consumer_test.go
rename to internal/consumer/push_consumer_test.go
diff --git a/consumer/statistics.go b/internal/consumer/statistics.go
similarity index 100%
rename from consumer/statistics.go
rename to internal/consumer/statistics.go
diff --git a/kernel/client.go b/internal/kernel/client.go
similarity index 86%
rename from kernel/client.go
rename to internal/kernel/client.go
index 9dbecc6..78de681 100644
--- a/kernel/client.go
+++ b/internal/kernel/client.go
@@ -22,7 +22,8 @@ import (
"context"
"errors"
"fmt"
- "github.com/apache/rocketmq-client-go/remote"
+ "github.com/apache/rocketmq-client-go/internal/remote"
+ "github.com/apache/rocketmq-client-go/primitive"
"github.com/apache/rocketmq-client-go/rlog"
"os"
"strconv"
@@ -65,28 +66,6 @@ func init() {
}
}
-type ClientOption struct {
- NameServerAddr string
- ClientIP string
- InstanceName string
- UnitMode bool
- UnitName string
- VIPChannelEnabled bool
- UseTLS bool
-}
-
-func (opt *ClientOption) ChangeInstanceNameToPID() {
- if opt.InstanceName == "DEFAULT" {
- opt.InstanceName = strconv.Itoa(os.Getegid())
- }
-}
-
-func (opt *ClientOption) String() string {
- return fmt.Sprintf("ClientOption [ClientIP=%s, InstanceName=%s, "+
- "UnitMode=%v, UnitName=%s, VIPChannelEnabled=%v, UseTLS=%v]", opt.ClientIP,
- opt.InstanceName, opt.UnitMode, opt.UnitName, opt.VIPChannelEnabled, opt.UseTLS)
-}
-
type InnerProducer interface {
PublishTopicList() []string
UpdateTopicPublishInfo(topic string, info *TopicPublishInfo)
@@ -97,7 +76,7 @@ type InnerProducer interface {
type InnerConsumer interface {
PersistConsumerOffset()
- UpdateTopicSubscribeInfo(topic string, mqs []*MessageQueue)
+ UpdateTopicSubscribeInfo(topic string, mqs []*primitive.MessageQueue)
IsSubscribeTopicNeedUpdate(topic string) bool
SubscriptionDataList() []*SubscriptionData
Rebalance()
@@ -105,7 +84,7 @@ type InnerConsumer interface {
}
type RMQClient struct {
- option ClientOption
+ option primitive.ClientOption
// group -> InnerProducer
producerMap sync.Map
@@ -119,7 +98,7 @@ type RMQClient struct {
var clientMap sync.Map
-func GetOrNewRocketMQClient(option ClientOption) *RMQClient {
+func GetOrNewRocketMQClient(option primitive.ClientOption) *RMQClient {
client := &RMQClient{
option: option,
remoteClient: remote.NewRemotingClient(),
@@ -298,12 +277,12 @@ func (c *RMQClient) UpdateTopicRouteInfo() {
// SendMessageAsync send message with batch by async
func (c *RMQClient) SendMessageAsync(ctx context.Context, brokerAddrs, brokerName string, request *SendMessageRequest,
- msgs []*Message, f func(result *SendResult)) error {
+ msgs []*primitive.Message, f func(result *primitive.SendResult)) error {
return nil
}
func (c *RMQClient) SendMessageOneWay(ctx context.Context, brokerAddrs string, request *SendMessageRequest,
- msgs []*Message) (*SendResult, error) {
+ msgs []*primitive.Message) (*primitive.SendResult, error) {
cmd := remote.NewRemotingCommand(ReqSendBatchMessage, request, encodeMessages(msgs))
err := c.remoteClient.InvokeOneWay(brokerAddrs, cmd, 3*time.Second)
if err != nil {
@@ -312,28 +291,28 @@ func (c *RMQClient) SendMessageOneWay(ctx context.Context, brokerAddrs string, r
return nil, err
}
-func (c *RMQClient) ProcessSendResponse(brokerName string, cmd *remote.RemotingCommand, msgs ...*Message) *SendResult {
- var status SendStatus
+func (c *RMQClient) ProcessSendResponse(brokerName string, cmd *remote.RemotingCommand, msgs ...*primitive.Message) *primitive.SendResult {
+ var status primitive.SendStatus
switch cmd.Code {
case ResFlushDiskTimeout:
- status = SendFlushDiskTimeout
+ status = primitive.SendFlushDiskTimeout
case ResFlushSlaveTimeout:
- status = SendFlushSlaveTimeout
+ status = primitive.SendFlushSlaveTimeout
case ResSlaveNotAvailable:
- status = SendSlaveNotAvailable
+ status = primitive.SendSlaveNotAvailable
case ResSuccess:
- status = SendOK
+ status = primitive.SendOK
default:
// TODO process unknown code
}
msgIDs := make([]string, 0)
for i := 0; i < len(msgs); i++ {
- msgIDs = append(msgIDs, msgs[i].Properties[PropertyUniqueClientMessageIdKeyIndex])
+ msgIDs = append(msgIDs, msgs[i].Properties[primitive.PropertyUniqueClientMessageIdKeyIndex])
}
- regionId := cmd.ExtFields[PropertyMsgRegion]
- trace := cmd.ExtFields[PropertyTraceSwitch]
+ regionId := cmd.ExtFields[primitive.PropertyMsgRegion]
+ trace := cmd.ExtFields[primitive.PropertyTraceSwitch]
if regionId == "" {
regionId = defaultTraceRegionID
@@ -341,11 +320,11 @@ func (c *RMQClient) ProcessSendResponse(brokerName string, cmd *remote.RemotingC
qId, _ := strconv.Atoi(cmd.ExtFields["queueId"])
off, _ := strconv.ParseInt(cmd.ExtFields["queueOffset"], 10, 64)
- return &SendResult{
+ return &primitive.SendResult{
Status: status,
MsgID: cmd.ExtFields["msgId"],
OffsetMsgID: cmd.ExtFields["msgId"],
- MessageQueue: &MessageQueue{
+ MessageQueue: &primitive.MessageQueue{
Topic: msgs[0].Topic,
BrokerName: brokerName,
QueueId: qId,
@@ -358,7 +337,7 @@ func (c *RMQClient) ProcessSendResponse(brokerName string, cmd *remote.RemotingC
}
// PullMessage with sync
-func (c *RMQClient) PullMessage(ctx context.Context, brokerAddrs string, request *PullMessageRequest) (*PullResult, error) {
+func (c *RMQClient) PullMessage(ctx context.Context, brokerAddrs string, request *PullMessageRequest) (*primitive.PullResult, error) {
cmd := remote.NewRemotingCommand(ReqPullMessage, request, nil)
res, err := c.remoteClient.InvokeSync(brokerAddrs, cmd, 3*time.Second)
if err != nil {
@@ -368,17 +347,17 @@ func (c *RMQClient) PullMessage(ctx context.Context, brokerAddrs string, request
return c.processPullResponse(res)
}
-func (c *RMQClient) processPullResponse(response *remote.RemotingCommand) (*PullResult, error) {
- pullResult := &PullResult{}
+func (c *RMQClient) processPullResponse(response *remote.RemotingCommand) (*primitive.PullResult, error) {
+ pullResult := &primitive.PullResult{}
switch response.Code {
case ResSuccess:
- pullResult.Status = PullFound
+ pullResult.Status = primitive.PullFound
case ResPullNotFound:
- pullResult.Status = PullNoNewMsg
+ pullResult.Status = primitive.PullNoNewMsg
case ResPullRetryImmediately:
- pullResult.Status = PullNoMsgMatched
+ pullResult.Status = primitive.PullNoMsgMatched
case ResPullOffsetMoved:
- pullResult.Status = PullOffsetIllegal
+ pullResult.Status = primitive.PullOffsetIllegal
default:
return nil, fmt.Errorf("unknown Response Code: %d, remark: %s", response.Code, response.Remark)
}
@@ -403,13 +382,13 @@ func (c *RMQClient) processPullResponse(response *remote.RemotingCommand) (*Pull
pullResult.SuggestWhichBrokerId, _ = strconv.ParseInt(v, 10, 64)
}
- pullResult.messageExts = decodeMessage(response.Body)
+ //pullResult.messageExts = decodeMessage(response.Body) TODO parse in top
return pullResult, nil
}
// PullMessageAsync pull message async
-func (c *RMQClient) PullMessageAsync(ctx context.Context, brokerAddrs string, request *PullMessageRequest, f func(result *PullResult)) error {
+func (c *RMQClient) PullMessageAsync(ctx context.Context, brokerAddrs string, request *PullMessageRequest, f func(result *primitive.PullResult)) error {
return nil
}
@@ -501,13 +480,13 @@ func (c *RMQClient) isNeedUpdateSubscribeInfo(topic string) bool {
return result
}
-func routeData2SubscribeInfo(topic string, data *TopicRouteData) []*MessageQueue {
- list := make([]*MessageQueue, 0)
+func routeData2SubscribeInfo(topic string, data *TopicRouteData) []*primitive.MessageQueue {
+ list := make([]*primitive.MessageQueue, 0)
for idx := range data.QueueDataList {
qd := data.QueueDataList[idx]
if queueIsReadable(qd.Perm) {
for i := 0; i < qd.ReadQueueNums; i++ {
- list = append(list, &MessageQueue{
+ list = append(list, &primitive.MessageQueue{
Topic: topic,
BrokerName: qd.BrokerName,
QueueId: i,
@@ -518,7 +497,7 @@ func routeData2SubscribeInfo(topic string, data *TopicRouteData) []*MessageQueue
return list
}
-func encodeMessages(message []*Message) []byte {
+func encodeMessages(message []*primitive.Message) []byte {
var buffer bytes.Buffer
index := 0
for index < len(message) {
diff --git a/kernel/client_test.go b/internal/kernel/client_test.go
similarity index 100%
rename from kernel/client_test.go
rename to internal/kernel/client_test.go
diff --git a/kernel/constants.go b/internal/kernel/constants.go
similarity index 100%
rename from kernel/constants.go
rename to internal/kernel/constants.go
diff --git a/internal/kernel/model.go b/internal/kernel/model.go
new file mode 100644
index 0000000..b3ef62e
--- /dev/null
+++ b/internal/kernel/model.go
@@ -0,0 +1,80 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package kernel
+
+import (
+ "encoding/json"
+ "github.com/apache/rocketmq-client-go/rlog"
+)
+
+type FindBrokerResult struct {
+ BrokerAddr string
+ Slave bool
+ BrokerVersion int32
+}
+
+type (
+ // groupName of consumer
+ producerData string
+
+ consumeType string
+
+ ServiceState int
+)
+
+const (
+ StateCreateJust ServiceState = iota
+ StateStartFailed
+ StateRunning
+ StateShutdown
+)
+
+type SubscriptionData struct {
+ ClassFilterMode bool
+ Topic string
+ SubString string
+ Tags map[string]bool
+ Codes map[int32]bool
+ SubVersion int64
+ ExpType string
+}
+
+type consumerData struct {
+ GroupName string `json:"groupName"`
+ CType consumeType `json:"consumeType"`
+ MessageModel string `json:"messageModel"`
+ Where string `json:"consumeFromWhere"`
+ SubscriptionDatas []*SubscriptionData `json:"subscriptionDataSet"`
+ UnitMode bool `json:"unitMode"`
+}
+
+type heartbeatData struct {
+ ClientId string `json:"clientID"`
+ ProducerDatas []producerData `json:"producerDataSet"`
+ ConsumerDatas []consumerData `json:"consumerDataSet"`
+}
+
+func (data *heartbeatData) encode() []byte {
+ d, err := json.Marshal(data)
+ if err != nil {
+ rlog.Errorf("marshal heartbeatData error: %s", err.Error())
+ return nil
+ }
+ rlog.Info(string(d))
+ return d
+}
diff --git a/kernel/mq_version.go b/internal/kernel/mq_version.go
similarity index 100%
rename from kernel/mq_version.go
rename to internal/kernel/mq_version.go
diff --git a/kernel/perm.go b/internal/kernel/perm.go
similarity index 100%
rename from kernel/perm.go
rename to internal/kernel/perm.go
diff --git a/kernel/request.go b/internal/kernel/request.go
similarity index 100%
rename from kernel/request.go
rename to internal/kernel/request.go
diff --git a/kernel/response.go b/internal/kernel/response.go
similarity index 100%
rename from kernel/response.go
rename to internal/kernel/response.go
diff --git a/kernel/route.go b/internal/kernel/route.go
similarity index 96%
rename from kernel/route.go
rename to internal/kernel/route.go
index d0678e6..f216c0e 100644
--- a/kernel/route.go
+++ b/internal/kernel/route.go
@@ -20,7 +20,8 @@ package kernel
import (
"encoding/json"
"errors"
- "github.com/apache/rocketmq-client-go/remote"
+ "github.com/apache/rocketmq-client-go/internal/remote"
+ "github.com/apache/rocketmq-client-go/primitive"
"github.com/apache/rocketmq-client-go/rlog"
"github.com/apache/rocketmq-client-go/utils"
"github.com/tidwall/gjson"
@@ -98,7 +99,7 @@ func cleanOfflineBroker() {
type TopicPublishInfo struct {
OrderTopic bool
HaveTopicRouterInfo bool
- MqList []*MessageQueue
+ MqList []*primitive.MessageQueue
RouteData *TopicRouteData
TopicQueueIndex int32
}
@@ -221,19 +222,19 @@ func FindBrokerAddressInSubscribe(brokerName string, brokerId int64, onlyThisBro
return result
}
-func FetchSubscribeMessageQueues(topic string) ([]*MessageQueue, error) {
+func FetchSubscribeMessageQueues(topic string) ([]*primitive.MessageQueue, error) {
routeData, err := queryTopicRouteInfoFromServer(topic)
if err != nil {
return nil, err
}
- mqs := make([]*MessageQueue, 0)
+ mqs := make([]*primitive.MessageQueue, 0)
for _, qd := range routeData.QueueDataList {
if queueIsReadable(qd.Perm) {
for i := 0; i < qd.ReadQueueNums; i++ {
- mqs = append(mqs, &MessageQueue{Topic: topic, BrokerName: qd.BrokerName, QueueId: i})
+ mqs = append(mqs, &primitive.MessageQueue{Topic: topic, BrokerName: qd.BrokerName, QueueId: i})
}
}
}
@@ -321,7 +322,7 @@ func routeData2PublishInfo(topic string, data *TopicRouteData) *TopicPublishInfo
item := strings.Split(broker, ":")
nums, _ := strconv.Atoi(item[1])
for i := 0; i < nums; i++ {
- mq := &MessageQueue{
+ mq := &primitive.MessageQueue{
Topic: topic,
BrokerName: item[0],
QueueId: i,
@@ -357,7 +358,7 @@ func routeData2PublishInfo(topic string, data *TopicRouteData) *TopicPublishInfo
}
for i := 0; i < qd.WriteQueueNums; i++ {
- mq := &MessageQueue{
+ mq := &primitive.MessageQueue{
Topic: topic,
BrokerName: qd.BrokerName,
QueueId: i,
diff --git a/kernel/route_test.go b/internal/kernel/route_test.go
similarity index 100%
rename from kernel/route_test.go
rename to internal/kernel/route_test.go
diff --git a/kernel/transaction.go b/internal/kernel/transaction.go
similarity index 100%
rename from kernel/transaction.go
rename to internal/kernel/transaction.go
diff --git a/kernel/validators.go b/internal/kernel/validators.go
similarity index 100%
rename from kernel/validators.go
rename to internal/kernel/validators.go
diff --git a/producer/producer.go b/internal/producer/producer.go
similarity index 85%
rename from producer/producer.go
rename to internal/producer/producer.go
index 8bb0bf8..52657ee 100644
--- a/producer/producer.go
+++ b/internal/producer/producer.go
@@ -21,8 +21,9 @@ import (
"context"
"errors"
"fmt"
- "github.com/apache/rocketmq-client-go/kernel"
- "github.com/apache/rocketmq-client-go/remote"
+ "github.com/apache/rocketmq-client-go/internal/kernel"
+ "github.com/apache/rocketmq-client-go/internal/remote"
+ "github.com/apache/rocketmq-client-go/primitive"
"github.com/apache/rocketmq-client-go/rlog"
"github.com/apache/rocketmq-client-go/utils"
"os"
@@ -34,11 +35,11 @@ import (
type Producer interface {
Start() error
Shutdown() error
- SendSync(context.Context, *kernel.Message) (*kernel.SendResult, error)
- SendOneWay(context.Context, *kernel.Message) error
+ SendSync(context.Context, *primitive.Message) (*primitive.SendResult, error)
+ SendOneWay(context.Context, *primitive.Message) error
}
-func NewProducer(opt ProducerOptions) (Producer, error) {
+func NewProducer(opt primitive.ProducerOptions) (Producer, error) {
if err := utils.VerifyIP(opt.NameServerAddr); err != nil {
return nil, err
}
@@ -63,18 +64,10 @@ type defaultProducer struct {
group string
client *kernel.RMQClient
state kernel.ServiceState
- options ProducerOptions
+ options primitive.ProducerOptions
publishInfo sync.Map
}
-type ProducerOptions struct {
- kernel.ClientOption
- NameServerAddr string
- GroupName string
- RetryTimesWhenSendFailed int
- UnitMode bool
-}
-
func (p *defaultProducer) Start() error {
p.state = kernel.StateRunning
p.client.RegisterProducer(p.group, p)
@@ -86,7 +79,7 @@ func (p *defaultProducer) Shutdown() error {
return nil
}
-func (p *defaultProducer) SendSync(ctx context.Context, msg *kernel.Message) (*kernel.SendResult, error) {
+func (p *defaultProducer) SendSync(ctx context.Context, msg *primitive.Message) (*primitive.SendResult, error) {
if msg == nil {
return nil, errors.New("message is nil")
}
@@ -122,7 +115,7 @@ func (p *defaultProducer) SendSync(ctx context.Context, msg *kernel.Message) (*k
return nil, err
}
-func (p *defaultProducer) SendOneWay(ctx context.Context, msg *kernel.Message) error {
+func (p *defaultProducer) SendOneWay(ctx context.Context, msg *primitive.Message) error {
if msg == nil {
return errors.New("message is nil")
}
@@ -157,7 +150,7 @@ func (p *defaultProducer) SendOneWay(ctx context.Context, msg *kernel.Message) e
return err
}
-func (p *defaultProducer) buildSendRequest(mq *kernel.MessageQueue, msg *kernel.Message) *remote.RemotingCommand {
+func (p *defaultProducer) buildSendRequest(mq *primitive.MessageQueue, msg *primitive.Message) *remote.RemotingCommand {
req := &kernel.SendMessageRequest{
ProducerGroup: p.group,
Topic: mq.Topic,
@@ -173,7 +166,7 @@ func (p *defaultProducer) buildSendRequest(mq *kernel.MessageQueue, msg *kernel.
return remote.NewRemotingCommand(kernel.ReqSendMessage, req, msg.Body)
}
-func (p *defaultProducer) selectMessageQueue(topic string) *kernel.MessageQueue {
+func (p *defaultProducer) selectMessageQueue(topic string) *primitive.MessageQueue {
v, exist := p.publishInfo.Load(topic)
if !exist {
diff --git a/remote/codec.go b/internal/remote/codec.go
similarity index 100%
rename from remote/codec.go
rename to internal/remote/codec.go
diff --git a/remote/codec_test.go b/internal/remote/codec_test.go
similarity index 100%
rename from remote/codec_test.go
rename to internal/remote/codec_test.go
diff --git a/remote/remote_client.go b/internal/remote/remote_client.go
similarity index 100%
rename from remote/remote_client.go
rename to internal/remote/remote_client.go
diff --git a/remote/remote_client_test.go b/internal/remote/remote_client_test.go
similarity index 100%
rename from remote/remote_client_test.go
rename to internal/remote/remote_client_test.go
diff --git a/remote/rpchook.go b/internal/remote/rpchook.go
similarity index 100%
rename from remote/rpchook.go
rename to internal/remote/rpchook.go
diff --git a/primitive/consume.go b/primitive/consume.go
new file mode 100644
index 0000000..40e89b9
--- /dev/null
+++ b/primitive/consume.go
@@ -0,0 +1,128 @@
+package primitive
+
+// Message model defines the way how messages are delivered to each consumer clients.
+// </p>
+//
+// RocketMQ supports two message models: clustering and broadcasting. If clustering is set, consumer clients with
+// the same {@link #consumerGroup} would only consume shards of the messages subscribed, which achieves load
+// balances; Conversely, if the broadcasting is set, each consumer client will consume all subscribed messages
+// separately.
+// </p>
+//
+// This field defaults to clustering.
+type MessageModel int
+
+const (
+ BroadCasting MessageModel = iota
+ Clustering
+)
+
+func (mode MessageModel) String() string {
+ switch mode {
+ case BroadCasting:
+ return "BroadCasting"
+ case Clustering:
+ return "Clustering"
+ default:
+ return "Unknown"
+ }
+}
+
+// Consuming point on consumer booting.
+// </p>
+//
+// There are three consuming points:
+// <ul>
+// <li>
+// <code>CONSUME_FROM_LAST_OFFSET</code>: consumer clients pick up where it stopped previously.
+// If it were a newly booting up consumer client, according aging of the consumer group, there are two
+// cases:
+// <ol>
+// <li>
+// if the consumer group is created so recently that the earliest message being subscribed has yet
+// expired, which means the consumer group represents a lately launched business, consuming will
+// start from the very beginning;
+// </li>
+// <li>
+// if the earliest message being subscribed has expired, consuming will start from the latest
+// messages, meaning messages born prior to the booting timestamp would be ignored.
+// </li>
+// </ol>
+// </li>
+// <li>
+// <code>CONSUME_FROM_FIRST_OFFSET</code>: Consumer client will start from earliest messages available.
+// </li>
+// <li>
+// <code>CONSUME_FROM_TIMESTAMP</code>: Consumer client will start from specified timestamp, which means
+// messages born prior to {@link #consumeTimestamp} will be ignored
+// </li>
+// </ul>
+type ConsumeFromWhere int
+
+const (
+ ConsumeFromLastOffset ConsumeFromWhere = iota
+ ConsumeFromFirstOffset
+ ConsumeFromTimestamp
+)
+
+type ExpressionType string
+
+const (
+ /**
+ * <ul>
+ * Keywords:
+ * <li>{@code AND, OR, NOT, BETWEEN, IN, TRUE, FALSE, IS, NULL}</li>
+ * </ul>
+ * <p/>
+ * <ul>
+ * Data type:
+ * <li>Boolean, like: TRUE, FALSE</li>
+ * <li>String, like: 'abc'</li>
+ * <li>Decimal, like: 123</li>
+ * <li>Float number, like: 3.1415</li>
+ * </ul>
+ * <p/>
+ * <ul>
+ * Grammar:
+ * <li>{@code AND, OR}</li>
+ * <li>{@code >, >=, <, <=, =}</li>
+ * <li>{@code BETWEEN A AND B}, equals to {@code >=A AND <=B}</li>
+ * <li>{@code NOT BETWEEN A AND B}, equals to {@code >B OR <A}</li>
+ * <li>{@code IN ('a', 'b')}, equals to {@code ='a' OR ='b'}, this operation only support String type.</li>
+ * <li>{@code IS NULL}, {@code IS NOT NULL}, check parameter whether is null, or not.</li>
+ * <li>{@code =TRUE}, {@code =FALSE}, check parameter whether is true, or false.</li>
+ * </ul>
+ * <p/>
+ * <p>
+ * Example:
+ * (a > 10 AND a < 100) OR (b IS NOT NULL AND b=TRUE)
+ * </p>
+ */
+ SQL92 = ExpressionType("SQL92")
+
+ /**
+ * Only support or operation such as
+ * "tag1 || tag2 || tag3", <br>
+ * If null or * expression, meaning subscribe all.
+ */
+ TAG = ExpressionType("TAG")
+)
+
+func IsTagType(exp string) bool {
+ if exp == "" || exp == "TAG" {
+ return true
+ }
+ return false
+}
+
+type MessageSelector struct {
+ Type ExpressionType
+ Expression string
+}
+
+type ConsumeResult int
+
+const (
+ ConsumeSuccess ConsumeResult = iota
+ ConsumeRetryLater
+)
diff --git a/kernel/message.go b/primitive/message.go
similarity index 85%
rename from kernel/message.go
rename to primitive/message.go
index 0f65853..d15c8a3 100644
--- a/kernel/message.go
+++ b/primitive/message.go
@@ -15,9 +15,12 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
-package kernel
+package primitive
-import "fmt"
+import (
+ "fmt"
+ "github.com/apache/rocketmq-client-go/utils"
+)
const (
PropertyKeySeparator = " "
@@ -118,3 +121,28 @@ func (msgExt *MessageExt) String() string {
msgExt.StoreTimestamp, msgExt.StoreHost, msgExt.CommitLogOffset, msgExt.BodyCRC, msgExt.ReconsumeTimes,
msgExt.PreparedTransactionOffset)
}
+
+// MessageQueue message queue
+type MessageQueue struct {
+ Topic string `json:"topic"`
+ BrokerName string `json:"brokerName"`
+ QueueId int `json:"queueId"`
+}
+
+func (mq *MessageQueue) String() string {
+ return fmt.Sprintf("MessageQueue [topic=%s, brokerName=%s, queueId=%d]", mq.Topic, mq.BrokerName, mq.QueueId)
+}
+
+func (mq *MessageQueue) HashCode() int {
+ result := 1
+ result = 31*result + utils.HashString(mq.BrokerName)
+ result = 31*result + mq.QueueId
+ result = 31*result + utils.HashString(mq.Topic)
+
+ return result
+}
+
+func (mq *MessageQueue) Equals(queue *MessageQueue) bool {
+ // TODO
+ return true
+}
diff --git a/primitive/options.go b/primitive/options.go
new file mode 100644
index 0000000..b657ac0
--- /dev/null
+++ b/primitive/options.go
@@ -0,0 +1,132 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package primitive
+
+import (
+ "fmt"
+ "os"
+ "strconv"
+ "time"
+)
+
+type ProducerOptions struct {
+ ClientOption
+ NameServerAddr string
+ GroupName string
+ RetryTimesWhenSendFailed int
+ UnitMode bool
+}
+
+type ConsumerOption struct {
+ ClientOption
+ NameServerAddr string
+
+ /**
+ * Backtracking consumption time with second precision. Time format is
+ * 20131223171201<br>
+ * Implying Seventeen twelve and 01 seconds on December 23, 2013 year<br>
+ * Default backtracking consumption time Half an hour ago.
+ */
+ ConsumeTimestamp string
+
+ // The socket timeout in milliseconds
+ ConsumerPullTimeout time.Duration
+
+ // Concurrently max span offset.it has no effect on sequential consumption
+ ConsumeConcurrentlyMaxSpan int
+
+ // Flow control threshold on queue level, each message queue will cache at most 1000 messages by default,
+ // Consider the {PullBatchSize}, the instantaneous value may exceed the limit
+ PullThresholdForQueue int64
+
+ // Limit the cached message size on queue level, each message queue will cache at most 100 MiB messages by default,
+ // Consider the {@code pullBatchSize}, the instantaneous value may exceed the limit
+ //
+ // The size of a message only measured by message body, so it's not accurate
+ PullThresholdSizeForQueue int
+
+ // Flow control threshold on topic level, default value is -1(Unlimited)
+ //
+ // The value of {@code pullThresholdForQueue} will be overwrote and calculated based on
+ // {@code pullThresholdForTopic} if it is't unlimited
+ //
+ // For example, if the value of pullThresholdForTopic is 1000 and 10 message queues are assigned to this consumer,
+ // then pullThresholdForQueue will be set to 100
+ PullThresholdForTopic int
+
+ // Limit the cached message size on topic level, default value is -1 MiB(Unlimited)
+ //
+ // The value of {@code pullThresholdSizeForQueue} will be overwrote and calculated based on
+ // {@code pullThresholdSizeForTopic} if it is't unlimited
+ //
+ // For example, if the value of pullThresholdSizeForTopic is 1000 MiB and 10 message queues are
+ // assigned to this consumer, then pullThresholdSizeForQueue will be set to 100 MiB
+ PullThresholdSizeForTopic int
+
+ // Message pull Interval
+ PullInterval time.Duration
+
+ // Batch consumption size
+ ConsumeMessageBatchMaxSize int
+
+ // Batch pull size
+ PullBatchSize int32
+
+ // Whether update subscription relationship when every pull
+ PostSubscriptionWhenPull bool
+
+ // Max re-consume times. -1 means 16 times.
+ //
+ // If messages are re-consumed more than {@link #maxReconsumeTimes} before success, it's be directed to a deletion
+ // queue waiting.
+ MaxReconsumeTimes int
+
+ // Suspending pulling time for cases requiring slow pulling like flow-control scenario.
+ SuspendCurrentQueueTimeMillis time.Duration
+
+ // Maximum amount of time a message may block the consuming thread.
+ ConsumeTimeout time.Duration
+
+ ConsumerModel MessageModel
+ Strategy AllocateStrategy
+ ConsumeOrderly bool
+ FromWhere ConsumeFromWhere
+ // TODO traceDispatcher
+}
+
+func (opt *ClientOption) ChangeInstanceNameToPID() {
+ if opt.InstanceName == "DEFAULT" {
+ opt.InstanceName = strconv.Itoa(os.Getegid())
+ }
+}
+
+func (opt *ClientOption) String() string {
+ return fmt.Sprintf("ClientOption [ClientIP=%s, InstanceName=%s, "+
+ "UnitMode=%v, UnitName=%s, VIPChannelEnabled=%v, UseTLS=%v]", opt.ClientIP,
+ opt.InstanceName, opt.UnitMode, opt.UnitName, opt.VIPChannelEnabled, opt.UseTLS)
+}
+
+type ClientOption struct {
+ NameServerAddr string
+ ClientIP string
+ InstanceName string
+ UnitMode bool
+ UnitName string
+ VIPChannelEnabled bool
+ UseTLS bool
+}
diff --git a/kernel/model.go b/primitive/result.go
similarity index 71%
rename from kernel/model.go
rename to primitive/result.go
index 8ed0f97..0c1e738 100644
--- a/kernel/model.go
+++ b/primitive/result.go
@@ -15,14 +15,12 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
-package kernel
+package primitive
import (
"bytes"
"encoding/binary"
- "encoding/json"
"fmt"
- "github.com/apache/rocketmq-client-go/rlog"
"github.com/apache/rocketmq-client-go/utils"
)
@@ -209,85 +207,3 @@ func toMessages(messageExts []*MessageExt) []*Message {
return msgs
}
-
-// MessageQueue message queue
-type MessageQueue struct {
- Topic string `json:"topic"`
- BrokerName string `json:"brokerName"`
- QueueId int `json:"queueId"`
-}
-
-func (mq *MessageQueue) String() string {
- return fmt.Sprintf("MessageQueue [topic=%s, brokerName=%s, queueId=%d]", mq.Topic, mq.BrokerName, mq.QueueId)
-}
-
-func (mq *MessageQueue) HashCode() int {
- result := 1
- result = 31*result + utils.HashString(mq.BrokerName)
- result = 31*result + mq.QueueId
- result = 31*result + utils.HashString(mq.Topic)
-
- return result
-}
-
-func (mq *MessageQueue) Equals(queue *MessageQueue) bool {
- // TODO
- return true
-}
-
-type FindBrokerResult struct {
- BrokerAddr string
- Slave bool
- BrokerVersion int32
-}
-
-type (
- // groupName of consumer
- producerData string
-
- consumeType string
-
- ServiceState int
-)
-
-const (
- StateCreateJust ServiceState = iota
- StateStartFailed
- StateRunning
- StateShutdown
-)
-
-type SubscriptionData struct {
- ClassFilterMode bool
- Topic string
- SubString string
- Tags map[string]bool
- Codes map[int32]bool
- SubVersion int64
- ExpType string
-}
-
-type consumerData struct {
- GroupName string `json:"groupName"`
- CType consumeType `json:"consumeType"`
- MessageModel string `json:"messageModel"`
- Where string `json:"consumeFromWhere"`
- SubscriptionDatas []*SubscriptionData `json:"subscriptionDataSet"`
- UnitMode bool `json:"unitMode"`
-}
-
-type heartbeatData struct {
- ClientId string `json:"clientID"`
- ProducerDatas []producerData `json:"producerDataSet"`
- ConsumerDatas []consumerData `json:"consumerDataSet"`
-}
-
-func (data *heartbeatData) encode() []byte {
- d, err := json.Marshal(data)
- if err != nil {
- rlog.Errorf("marshal heartbeatData error: %s", err.Error())
- return nil
- }
- rlog.Info(string(d))
- return d
-}
diff --git a/primitive/strategy.go b/primitive/strategy.go
new file mode 100644
index 0000000..56dfef1
--- /dev/null
+++ b/primitive/strategy.go
@@ -0,0 +1,117 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package primitive
+
+import (
+ "github.com/apache/rocketmq-client-go/rlog"
+ "github.com/apache/rocketmq-client-go/utils"
+)
+
+// Strategy Algorithm for message allocating between consumers
+// An allocate strategy proxy for based on machine room nearside priority. An actual allocate strategy can be
+// specified.
+//
+// If any consumer is alive in a machine room, the message queue of the broker which is deployed in the same machine
+// should only be allocated to those. Otherwise, those message queues can be shared along all consumers since there are
+// no alive consumer to monopolize them.
+//
+// Average Hashing queue algorithm
+// Cycle average Hashing queue algorithm
+// Use Message Queue specified
+// Computer room Hashing queue algorithm, such as Alipay logic room
+// Consistent Hashing queue algorithm
+
+type AllocateStrategy func(string, string, []*MessageQueue, []string) []*MessageQueue
+
+func AllocateByAveragely(consumerGroup, currentCID string, mqAll []*MessageQueue,
+ cidAll []string) []*MessageQueue {
+ if currentCID == "" || utils.IsArrayEmpty(mqAll) || utils.IsArrayEmpty(cidAll) {
+ return nil
+ }
+ var (
+ find bool
+ index int
+ )
+
+ for idx := range cidAll {
+ if cidAll[idx] == currentCID {
+ find = true
+ index = idx
+ break
+ }
+ }
+ if !find {
+ rlog.Infof("[BUG] ConsumerGroup=%s, ConsumerId=%s not in cidAll:%+v", consumerGroup, currentCID, cidAll)
+ return nil
+ }
+
+ mqSize := len(mqAll)
+ cidSize := len(cidAll)
+ mod := mqSize % cidSize
+
+ var averageSize int
+ if mqSize <= cidSize {
+ averageSize = 1
+ } else {
+ if mod > 0 && index < mod {
+ averageSize = mqSize/cidSize + 1
+ } else {
+ averageSize = mqSize / cidSize
+ }
+ }
+
+ var startIndex int
+ if mod > 0 && index < mod {
+ startIndex = index * averageSize
+ } else {
+ startIndex = index*averageSize + mod
+ }
+
+ num := utils.MinInt(averageSize, mqSize-startIndex)
+ result := make([]*MessageQueue, num)
+ for i := 0; i < num; i++ {
+ result[i] = mqAll[(startIndex+i)%mqSize]
+ }
+ return result
+}
+
+// TODO
+func AllocateByMachineNearby(consumerGroup, currentCID string, mqAll []*MessageQueue,
+ cidAll []string) []*MessageQueue {
+ return AllocateByAveragely(consumerGroup, currentCID, mqAll, cidAll)
+}
+
+func AllocateByAveragelyCircle(consumerGroup, currentCID string, mqAll []*MessageQueue,
+ cidAll []string) []*MessageQueue {
+ return AllocateByAveragely(consumerGroup, currentCID, mqAll, cidAll)
+}
+
+func AllocateByConfig(consumerGroup, currentCID string, mqAll []*MessageQueue,
+ cidAll []string) []*MessageQueue {
+ return AllocateByAveragely(consumerGroup, currentCID, mqAll, cidAll)
+}
+
+func AllocateByMachineRoom(consumerGroup, currentCID string, mqAll []*MessageQueue,
+ cidAll []string) []*MessageQueue {
+ return AllocateByAveragely(consumerGroup, currentCID, mqAll, cidAll)
+}
+
+func AllocateByConsistentHash(consumerGroup, currentCID string, mqAll []*MessageQueue,
+ cidAll []string) []*MessageQueue {
+ return AllocateByAveragely(consumerGroup, currentCID, mqAll, cidAll)
+}