You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2019/08/20 08:38:25 UTC
[rocketmq-client-go] branch native updated: [ISSUE #147] Support
Namespace and domain of namesrv (#161)
This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/native by this push:
new 6303ecd [ISSUE #147] Support Namespace and domain of namesrv (#161)
6303ecd is described below
commit 6303ecd0878adb88b6ffb78587584aa6dd51b480
Author: wenfeng <sx...@gmail.com>
AuthorDate: Tue Aug 20 16:38:21 2019 +0800
[ISSUE #147] Support Namespace and domain of namesrv (#161)
* support domain of namesrv
* add namespace support
* add getConsumerInfo func
---
.gitignore | 3 +-
consumer/consumer.go | 11 +++----
consumer/option.go | 7 ++++
consumer/pull_consumer.go | 4 +--
consumer/push_consumer.go | 14 ++++++--
examples/consumer/acl/main.go | 9 +++--
examples/consumer/{acl => namespace}/main.go | 10 ++++--
examples/producer/acl/main.go | 10 ++++--
examples/producer/{acl => namespace}/main.go | 11 +++++--
internal/client.go | 14 +++++---
internal/model.go | 2 +-
internal/namesrv.go | 17 +++++++++-
internal/remote/codec.go | 49 +++++++++++++++++++++++-----
internal/remote/remote_client.go | 2 ++
internal/response.go | 1 +
internal/route.go | 34 +++++++++++--------
producer/option.go | 7 ++++
producer/producer.go | 21 ++++++++++--
18 files changed, 175 insertions(+), 51 deletions(-)
diff --git a/.gitignore b/.gitignore
index cb35ce2..a0c292d 100644
--- a/.gitignore
+++ b/.gitignore
@@ -2,4 +2,5 @@
go.mod
go.sum
vendor/
-coverage.txt
\ No newline at end of file
+coverage.txt
+examples/test
\ No newline at end of file
diff --git a/consumer/consumer.go b/consumer/consumer.go
index b5bf22f..2c88e5b 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -227,7 +227,6 @@ func (pr *PullRequest) String() string {
pr.consumerGroup, pr.mq.Topic, pr.mq.QueueId)
}
-// TODO hook
type defaultConsumer struct {
/**
* Consumers of the same role is required to have exactly same subscriptions and consumerGroup to correctly achieve
@@ -312,8 +311,8 @@ func (dc *defaultConsumer) persistConsumerOffset() error {
return nil
}
-func (c *defaultConsumer) updateOffset(queue *primitive.MessageQueue, offset int64) error {
- c.storage.update(queue, offset, false)
+func (dc *defaultConsumer) updateOffset(queue *primitive.MessageQueue, offset int64) error {
+ dc.storage.update(queue, offset, false)
return nil
}
@@ -615,14 +614,14 @@ func (dc *defaultConsumer) updateProcessQueueTable(topic string, mqs []*primitiv
//delete(mqSet, mq)
dc.processQueueTable.Delete(key)
changed = true
- rlog.Infof("do defaultConsumer, Group:%s, remove unnecessary mq: %s", dc.consumerGroup, mq.String())
+ rlog.Debugf("do defaultConsumer, Group:%s, remove unnecessary mq: %s", dc.consumerGroup, mq.String())
}
} else if pq.isPullExpired() && dc.cType == _PushConsume {
pq.dropped = true
if dc.removeUnnecessaryMessageQueue(&mq, pq) {
delete(mqSet, mq)
changed = true
- rlog.Infof("do defaultConsumer, Group:%s, remove unnecessary mq: %s, "+
+ rlog.Debugf("do defaultConsumer, Group:%s, remove unnecessary mq: %s, "+
"because pull was paused, so try to fixed it", dc.consumerGroup, mq)
}
}
@@ -650,7 +649,7 @@ func (dc *defaultConsumer) updateProcessQueueTable(topic string, mqs []*primitiv
if exist {
rlog.Debugf("do defaultConsumer, Group: %s, mq already exist, %s", dc.consumerGroup, mq.String())
} else {
- rlog.Infof("do defaultConsumer, Group: %s, add a new mq, %s", dc.consumerGroup, mq.String())
+ rlog.Debugf("do defaultConsumer, Group: %s, add a new mq, %s", dc.consumerGroup, mq.String())
pq := newProcessQueue(dc.consumeOrderly)
dc.processQueueTable.Store(mq, pq)
pr := PullRequest{
diff --git a/consumer/option.go b/consumer/option.go
index 558fd90..f92c118 100644
--- a/consumer/option.go
+++ b/consumer/option.go
@@ -175,6 +175,13 @@ func WithNameServer(nameServers []string) Option {
}
}
+// WithNamespace set the namespace of consumer
+func WithNamespace(namespace string) Option {
+ return func(opts *consumerOptions) {
+ opts.Namespace = namespace
+ }
+}
+
func WithVIPChannel(enable bool) Option {
return func(opts *consumerOptions) {
opts.VIPChannelEnabled = enable
diff --git a/consumer/pull_consumer.go b/consumer/pull_consumer.go
index 742cf16..dad27a8 100644
--- a/consumer/pull_consumer.go
+++ b/consumer/pull_consumer.go
@@ -160,8 +160,8 @@ func (c *defaultPullConsumer) ACK(msg *primitive.Message, result ConsumeResult)
}
-func (c *defaultConsumer) checkPull(ctx context.Context, mq *primitive.MessageQueue, offset int64, numbers int) error {
- err := c.makeSureStateOK()
+func (dc *defaultConsumer) checkPull(ctx context.Context, mq *primitive.MessageQueue, offset int64, numbers int) error {
+ err := dc.makeSureStateOK()
if err != nil {
return err
}
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 3b9747c..6303ded 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -64,8 +64,13 @@ func NewPushConsumer(opts ...Option) (*pushConsumer, error) {
if err != nil {
return nil, errors.Wrap(err, "new Namesrv failed.")
}
+ if !defaultOpts.Credentials.IsEmpty() {
+ srvs.SetCredentials(defaultOpts.Credentials)
+ }
internal.RegisterNamsrv(srvs)
-
+ if defaultOpts.Namespace != "" {
+ defaultOpts.GroupName = defaultOpts.Namespace + "%" + defaultOpts.GroupName
+ }
dc := &defaultConsumer{
client: internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil),
consumerGroup: defaultOpts.GroupName,
@@ -96,7 +101,7 @@ func NewPushConsumer(opts ...Option) (*pushConsumer, error) {
return p, nil
}
-// TODO: add shutdown on pushConsumr.
+// TODO: add shutdown on pushConsumer.
func (pc *pushConsumer) Start() error {
var err error
pc.once.Do(func() {
@@ -163,6 +168,9 @@ func (pc *pushConsumer) Subscribe(topic string, selector MessageSelector,
if pc.state != internal.StateCreateJust {
return errors.New("subscribe topic only started before")
}
+ if pc.option.Namespace != "" {
+ topic = pc.option.Namespace + "%" + topic
+ }
data := buildSubscriptionData(topic, selector)
pc.subscriptionDataTable.Store(topic, data)
pc.subscribedTopic[topic] = ""
@@ -285,7 +293,7 @@ func (pc *pushConsumer) validate() {
}
func (pc *pushConsumer) pullMessage(request *PullRequest) {
- rlog.Infof("start a nwe Pull Message task %s for [%s]", request.String(), pc.consumerGroup)
+ rlog.Debugf("start a new Pull Message task %s for [%s]", request.String(), pc.consumerGroup)
var sleepTime time.Duration
pq := request.pq
go func() {
diff --git a/examples/consumer/acl/main.go b/examples/consumer/acl/main.go
index 4582cde..8f944e7 100644
--- a/examples/consumer/acl/main.go
+++ b/examples/consumer/acl/main.go
@@ -29,7 +29,7 @@ import (
)
func main() {
- c, _ := rocketmq.NewPushConsumer(
+ c, err := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup"),
consumer.WithNameServer([]string{"127.0.0.1:9876"}),
consumer.WithCredentials(primitive.Credentials{
@@ -37,7 +37,12 @@ func main() {
SecretKey: "12345678",
}),
)
- err := c.Subscribe("test", consumer.MessageSelector{}, func(ctx context.Context,
+ if err != nil {
+ fmt.Println("init consumer error: " + err.Error())
+ os.Exit(0)
+ }
+
+ err = c.Subscribe("test", consumer.MessageSelector{}, func(ctx context.Context,
msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
fmt.Printf("subscribe callback: %v \n", msgs)
return consumer.ConsumeSuccess, nil
diff --git a/examples/consumer/acl/main.go b/examples/consumer/namespace/main.go
similarity index 86%
copy from examples/consumer/acl/main.go
copy to examples/consumer/namespace/main.go
index 4582cde..a0e8708 100644
--- a/examples/consumer/acl/main.go
+++ b/examples/consumer/namespace/main.go
@@ -29,15 +29,21 @@ import (
)
func main() {
- c, _ := rocketmq.NewPushConsumer(
+ c, err := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup"),
consumer.WithNameServer([]string{"127.0.0.1:9876"}),
consumer.WithCredentials(primitive.Credentials{
AccessKey: "RocketMQ",
SecretKey: "12345678",
}),
+ consumer.WithNamespace("namespace"),
)
- err := c.Subscribe("test", consumer.MessageSelector{}, func(ctx context.Context,
+ if err != nil {
+ fmt.Println("init consumer error: " + err.Error())
+ os.Exit(0)
+ }
+
+ err = c.Subscribe("test", consumer.MessageSelector{}, func(ctx context.Context,
msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
fmt.Printf("subscribe callback: %v \n", msgs)
return consumer.ConsumeSuccess, nil
diff --git a/examples/producer/acl/main.go b/examples/producer/acl/main.go
index cee23db..a98500c 100644
--- a/examples/producer/acl/main.go
+++ b/examples/producer/acl/main.go
@@ -30,7 +30,7 @@ import (
)
func main() {
- p, _ := rocketmq.NewProducer(
+ p, err := rocketmq.NewProducer(
producer.WithNameServer([]string{"127.0.0.1:9876"}),
producer.WithRetry(2),
producer.WithCredentials(primitive.Credentials{
@@ -38,7 +38,13 @@ func main() {
SecretKey: "12345678",
}),
)
- err := p.Start()
+
+ if err != nil {
+ fmt.Println("init producer error: " + err.Error())
+ os.Exit(0)
+ }
+
+ err = p.Start()
if err != nil {
fmt.Printf("start producer error: %s", err.Error())
os.Exit(1)
diff --git a/examples/producer/acl/main.go b/examples/producer/namespace/main.go
similarity index 91%
copy from examples/producer/acl/main.go
copy to examples/producer/namespace/main.go
index cee23db..002868a 100644
--- a/examples/producer/acl/main.go
+++ b/examples/producer/namespace/main.go
@@ -30,15 +30,22 @@ import (
)
func main() {
- p, _ := rocketmq.NewProducer(
+ p, err := rocketmq.NewProducer(
producer.WithNameServer([]string{"127.0.0.1:9876"}),
producer.WithRetry(2),
producer.WithCredentials(primitive.Credentials{
AccessKey: "RocketMQ",
SecretKey: "12345678",
}),
+ producer.WithNamespace("namespace"),
)
- err := p.Start()
+
+ if err != nil {
+ fmt.Println("init producer error: " + err.Error())
+ os.Exit(0)
+ }
+
+ err = p.Start()
if err != nil {
fmt.Printf("start producer error: %s", err.Error())
os.Exit(1)
diff --git a/internal/client.go b/internal/client.go
index c6b3246..11eca1e 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -102,10 +102,10 @@ type ClientOptions struct {
UnitMode bool
UnitName string
VIPChannelEnabled bool
- ACLEnabled bool
RetryTimes int
Interceptors []primitive.Interceptor
Credentials primitive.Credentials
+ Namespace string
}
func (opt *ClientOptions) ChangeInstanceNameToPID() {
@@ -116,8 +116,8 @@ func (opt *ClientOptions) ChangeInstanceNameToPID() {
func (opt *ClientOptions) String() string {
return fmt.Sprintf("ClientOption [ClientIP=%s, InstanceName=%s, "+
- "UnitMode=%v, UnitName=%s, VIPChannelEnabled=%v, ACLEnabled=%v]", opt.ClientIP,
- opt.InstanceName, opt.UnitMode, opt.UnitName, opt.VIPChannelEnabled, opt.ACLEnabled)
+ "UnitMode=%v, UnitName=%s, VIPChannelEnabled=%v]", opt.ClientIP,
+ opt.InstanceName, opt.UnitMode, opt.UnitName, opt.VIPChannelEnabled)
}
//go:generate mockgen -source client.go -destination mock_client.go -self_package github.com/apache/rocketmq-client-go/internal --package internal RMQClient
@@ -210,6 +210,12 @@ func GetOrNewRocketMQClient(option ClientOptions, callbackCh chan interface{}) *
return nil
})
+ client.remoteClient.RegisterRequestFunc(ReqGetConsumerRunningInfo, func(req *remote.RemotingCommand, addr net.Addr) *remote.RemotingCommand {
+ rlog.Info("receive get consumer running info request...")
+ res := remote.NewRemotingCommand(ResError, nil, nil)
+ res.Remark = "the go client has not supported consumer running info"
+ return res
+ })
}
return actual.(*rmqClient)
}
@@ -363,7 +369,7 @@ func (c *rmqClient) SendHeartbeatToAllBrokerWithLock() {
brokerVersionMap.Store(brokerName, m)
}
m[brokerName] = int32(response.Version)
- rlog.Infof("send heart beat to broker[%s %d %s] success", brokerName, id, addr)
+ rlog.Debugf("send heart beat to broker[%s %d %s] success", brokerName, id, addr)
}
}
return true
diff --git a/internal/model.go b/internal/model.go
index f534234..86b286e 100644
--- a/internal/model.go
+++ b/internal/model.go
@@ -95,6 +95,6 @@ func (data *heartbeatData) encode() []byte {
rlog.Errorf("marshal heartbeatData error: %s", err.Error())
return nil
}
- rlog.Info("heartbeat: " + string(d))
+ rlog.Debugf("heartbeat: " + string(d))
return d
}
diff --git a/internal/namesrv.go b/internal/namesrv.go
index 66b3e22..d8e1a40 100644
--- a/internal/namesrv.go
+++ b/internal/namesrv.go
@@ -22,6 +22,8 @@ import (
"regexp"
"strings"
"sync"
+
+ "github.com/apache/rocketmq-client-go/primitive"
)
var (
@@ -42,6 +44,9 @@ type Namesrvs struct {
// index indicate the next position for getNamesrv
index int
+
+ // credentials for query topic route
+ credentials primitive.Credentials
}
// NewNamesrv init Namesrv from namesrv addr string.
@@ -80,7 +85,7 @@ func (s *Namesrvs) GetNamesrv() string {
}
index %= len(s.srvs)
s.index = index
- return addr
+ return strings.TrimLeft(addr, "http(s)://")
}
func (s *Namesrvs) Size() int {
@@ -90,8 +95,18 @@ func (s *Namesrvs) Size() int {
func (s *Namesrvs) String() string {
return strings.Join(s.srvs, ";")
}
+func (s *Namesrvs) SetCredentials(credentials primitive.Credentials) {
+ s.credentials = credentials
+}
+
+var (
+ httpPrefixRegex, _ = regexp.Compile("^(http|https)://")
+)
func verifyIP(ip string) error {
+ if httpPrefixRegex.MatchString(ip) {
+ return nil
+ }
if strings.Contains(ip, ";") {
return ErrMultiIP
}
diff --git a/internal/remote/codec.go b/internal/remote/codec.go
index 3093514..4c354cb 100644
--- a/internal/remote/codec.go
+++ b/internal/remote/codec.go
@@ -29,21 +29,52 @@ var opaque int32
const (
// 0, REQUEST_COMMAND
RPCType = 0
-
// 1, RPC
RPCOneWay = 1
-
//ResponseType for response
ResponseType = 1
+ _Flag = 0
+ _Version = 317
+)
+
+type LanguageCode byte
- _Flag = 0
- _LanguageCode = byte(9)
- _Version = 137
+const (
+ _Java = LanguageCode(0)
+ _Go = LanguageCode(9)
+ _Unknown = LanguageCode(127)
)
+func (lc LanguageCode) MarshalJSON() ([]byte, error) {
+ return []byte(`"GO"`), nil
+}
+
+func (lc *LanguageCode) UnmarshalJSON(b []byte) error {
+ switch string(b) {
+ case "JAVA":
+ *lc = _Java
+ case "GO":
+ *lc = _Go
+ default:
+ *lc = _Unknown
+ }
+ return nil
+}
+
+func (lc LanguageCode) String() string {
+ switch lc {
+ case _Java:
+ return "JAVA"
+ case _Go:
+ return "GO"
+ default:
+ return "unknown"
+ }
+}
+
type RemotingCommand struct {
Code int16 `json:"code"`
- Language byte `json:"-"`
+ Language LanguageCode `json:"language,string"`
Version int16 `json:"version"`
Opaque int32 `json:"opaque"`
Flag int32 `json:"flag"`
@@ -62,7 +93,7 @@ func NewRemotingCommand(code int16, header CustomHeader, body []byte) *RemotingC
Version: _Version,
Opaque: atomic.AddInt32(&opaque, 1),
Body: body,
- Language: _LanguageCode,
+ Language: _Go,
ExtFields: make(map[string]string),
}
@@ -264,7 +295,7 @@ func (c *rmqCodec) encodeHeader(command *RemotingCommand) ([]byte, error) {
}
// language flag, length is 1 byte
- err = binary.Write(buf, binary.BigEndian, _LanguageCode)
+ err = binary.Write(buf, binary.BigEndian, _Go)
if err != nil {
return nil, err
}
@@ -364,7 +395,7 @@ func (c *rmqCodec) decodeHeader(data []byte) (*RemotingCommand, error) {
if err != nil {
return nil, err
}
- command.Language = languageCode
+ command.Language = LanguageCode(languageCode)
var version int16
err = binary.Read(buf, binary.BigEndian, &version)
diff --git a/internal/remote/remote_client.go b/internal/remote/remote_client.go
index e8f10c2..f9b77d5 100644
--- a/internal/remote/remote_client.go
+++ b/internal/remote/remote_client.go
@@ -150,6 +150,8 @@ func (c *RemotingClient) receiveResponse(r net.Conn) {
go func() { // 单个goroutine会造成死锁
res := f(cmd, r.RemoteAddr())
if res != nil {
+ res.Opaque = cmd.Opaque
+ res.Flag |= 1 << 0
err := c.sendRequest(r, res)
if err != nil {
rlog.Warnf("send response to broker error: %s, type is: %d", err, res.Code)
diff --git a/internal/response.go b/internal/response.go
index 0937db7..ae75b9c 100644
--- a/internal/response.go
+++ b/internal/response.go
@@ -19,6 +19,7 @@ package internal
const (
ResSuccess = int16(0)
+ ResError = int16(1)
ResFlushDiskTimeout = int16(10)
ResSlaveNotAvailable = int16(11)
ResFlushSlaveTimeout = int16(12)
diff --git a/internal/route.go b/internal/route.go
index 9c6caa9..4f70eb0 100644
--- a/internal/route.go
+++ b/internal/route.go
@@ -52,6 +52,9 @@ var (
)
func RegisterNamsrv(s *Namesrvs) {
+ if !s.credentials.IsEmpty() {
+ nameSrvClient.RegisterInterceptor(remote.ACLInterceptor(s.credentials))
+ }
nameSrvs = s
}
@@ -202,8 +205,8 @@ func FindBrokerAddrByName(brokerName string) string {
func FindBrokerAddressInSubscribe(brokerName string, brokerId int64, onlyThisBroker bool) *FindBrokerResult {
var (
brokerAddr = ""
- slave = false
- found = false
+ //slave = false
+ //found = false
)
v, exist := brokerAddressesMap.Load(brokerName)
@@ -212,22 +215,27 @@ func FindBrokerAddressInSubscribe(brokerName string, brokerId int64, onlyThisBro
return nil
}
data := v.(*BrokerData)
- for k, v := range data.BrokerAddresses {
- if v != "" {
- found = true
- if k != MasterId {
- slave = true
- }
- brokerAddr = v
- break
- }
+ if len(data.BrokerAddresses) == 0 {
+ return nil
}
+ brokerAddr = data.BrokerAddresses[brokerId]
+ //for k, v := range data.BrokerAddresses {
+ // if v != "" {
+ // found = true
+ // if k != MasterId {
+ // slave = true
+ // }
+ // brokerAddr = v
+ // break
+ // }
+ //}
+
var result *FindBrokerResult
- if found {
+ if brokerAddr != "" {
result = &FindBrokerResult{
BrokerAddr: brokerAddr,
- Slave: slave,
+ Slave: brokerId != 0,
BrokerVersion: findBrokerVersion(brokerName, brokerAddr),
}
}
diff --git a/producer/option.go b/producer/option.go
index 5ec003c..ac88120 100644
--- a/producer/option.go
+++ b/producer/option.go
@@ -61,6 +61,13 @@ func WithNameServer(nameServers []string) Option {
}
}
+// WithNamespace set the namespace of producer
+func WithNamespace(namespace string) Option {
+ return func(opts *producerOptions) {
+ opts.Namespace = namespace
+ }
+}
+
func WithSendMsgTimeout(duration time.Duration) Option {
return func(opts *producerOptions) {
opts.SendMsgTimeout = duration
diff --git a/producer/producer.go b/producer/producer.go
index 1d082e9..12ac962 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -59,6 +59,9 @@ func NewDefaultProducer(opts ...Option) (*defaultProducer, error) {
if err != nil {
return nil, errors.Wrap(err, "new Namesrv failed.")
}
+ if !defaultOpts.Credentials.IsEmpty() {
+ srvs.SetCredentials(defaultOpts.Credentials)
+ }
internal.RegisterNamsrv(srvs)
producer := &defaultProducer{
@@ -140,6 +143,10 @@ func (p *defaultProducer) sendSync(ctx context.Context, msg *primitive.Message,
err error
)
+ if p.options.Namespace != "" {
+ msg.Topic = p.options.Namespace + "%" + msg.Topic
+ }
+
var producerCtx *primitive.ProducerCtx
for retryCount := 0; retryCount < retryTime; retryCount++ {
mq := p.selectMessageQueue(msg)
@@ -185,6 +192,9 @@ func (p *defaultProducer) SendAsync(ctx context.Context, f func(context.Context,
}
func (p *defaultProducer) sendAsync(ctx context.Context, msg *primitive.Message, h func(context.Context, *primitive.SendResult, error)) error {
+ if p.options.Namespace != "" {
+ msg.Topic = p.options.Namespace + "%" + msg.Topic
+ }
mq := p.selectMessageQueue(msg)
if mq == nil {
return errors.Errorf("the topic=%s route info not found", msg.Topic)
@@ -224,9 +234,11 @@ func (p *defaultProducer) SendOneWay(ctx context.Context, msg *primitive.Message
func (p *defaultProducer) sendOneWay(ctx context.Context, msg *primitive.Message) error {
retryTime := 1 + p.options.RetryTimes
- var (
- err error
- )
+ if p.options.Namespace != "" {
+ msg.Topic = p.options.Namespace + "%" + msg.Topic
+ }
+
+ var err error
for retryCount := 0; retryCount < retryTime; retryCount++ {
mq := p.selectMessageQueue(msg)
if mq == nil {
@@ -251,6 +263,9 @@ func (p *defaultProducer) sendOneWay(ctx context.Context, msg *primitive.Message
func (p *defaultProducer) buildSendRequest(mq *primitive.MessageQueue,
msg *primitive.Message) *remote.RemotingCommand {
+ if msg.Properties == nil {
+ msg.Properties = make(map[string]string, 0)
+ }
if !msg.Batch && msg.Properties[primitive.PropertyUniqueClientMessageIdKeyIndex] == "" {
msg.Properties[primitive.PropertyUniqueClientMessageIdKeyIndex] = primitive.CreateUniqID()
}