You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/09/18 03:37:25 UTC
[incubator-inlong] branch INLONG-25 updated: [INLONG-1566]Fix the
bug which will cause the user defined partition offset can not take effect.
(#1567)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/INLONG-25 by this push:
new 67ceecd [INLONG-1566]Fix the bug which will cause the user defined partition offset can not take effect. (#1567)
67ceecd is described below
commit 67ceecd0e5e55a09ff5c3481e0073a7e9b215f63
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Sat Sep 18 11:35:55 2021 +0800
[INLONG-1566]Fix the bug which will cause the user defined partition offset can not take effect. (#1567)
Signed-off-by: Zijie Lu <ws...@gmail.com>
---
.../tubemq-client-go/client/consumer_impl.go | 6 ++--
.../tubemq-client-go/client/heartbeat.go | 4 ++-
.../tubemq-client-go/remote/remote.go | 3 +-
tubemq-client-twins/tubemq-client-go/sub/info.go | 36 +++++++++++++---------
4 files changed, 29 insertions(+), 20 deletions(-)
diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
index 8183ce4..d263fea 100644
--- a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
+++ b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
@@ -182,8 +182,8 @@ func (c *consumer) sendRegRequest2Master() (*protocol.RegisterResponseM2C, error
}
func (c *consumer) processRegisterResponseM2C(rsp *protocol.RegisterResponseM2C) {
- if rsp.GetNotAllocated() {
- c.subInfo.SetIsNotAllocated(rsp.GetNotAllocated())
+ if !rsp.GetNotAllocated() {
+ c.subInfo.CASIsNotAllocated(1, 0)
}
if rsp.GetDefFlowCheckId() != 0 || rsp.GetGroupFlowCheckId() != 0 {
if rsp.GetDefFlowCheckId() != 0 {
@@ -471,7 +471,7 @@ func (c *consumer) connect2Broker(event *metadata.ConsumerEvent) {
}
}
}
- c.subInfo.FirstRegistered()
+ c.subInfo.SetNotFirstRegistered()
event.SetEventStatus(metadata.Done)
log.Tracef("[connect2Broker] connect event finished, client ID=%s", c.clientID)
}
diff --git a/tubemq-client-twins/tubemq-client-go/client/heartbeat.go b/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
index 7e9b814..a4918f5 100644
--- a/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
+++ b/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
@@ -151,7 +151,9 @@ func (h *heartbeatManager) sendHeartbeatC2M(m *metadata.Metadata) (*protocol.Hea
func (h *heartbeatManager) processHBResponseM2C(rsp *protocol.HeartResponseM2C) {
h.consumer.masterHBRetry = 0
- h.consumer.subInfo.SetIsNotAllocated(rsp.GetNotAllocated())
+ if !rsp.GetNotAllocated() {
+ h.consumer.subInfo.CASIsNotAllocated(1, 0)
+ }
if rsp.GetDefFlowCheckId() != 0 || rsp.GetGroupFlowCheckId() != 0 {
if rsp.GetDefFlowCheckId() != 0 {
h.consumer.rmtDataCache.UpdateDefFlowCtrlInfo(rsp.GetDefFlowCheckId(), rsp.GetDefFlowControlInfo())
diff --git a/tubemq-client-twins/tubemq-client-go/remote/remote.go b/tubemq-client-twins/tubemq-client-go/remote/remote.go
index b7c65d5..44f70e4 100644
--- a/tubemq-client-twins/tubemq-client-go/remote/remote.go
+++ b/tubemq-client-twins/tubemq-client-go/remote/remote.go
@@ -359,8 +359,9 @@ func (r *RmtDataCache) IsFirstRegister(partitionKey string) bool {
if _, ok := r.partitionRegBooked[partitionKey]; !ok {
r.partitionRegBooked[partitionKey] = true
+ return true
}
- return r.partitionRegBooked[partitionKey]
+ return false
}
// GetCurConsumeStatus returns the current consumption status.
diff --git a/tubemq-client-twins/tubemq-client-go/sub/info.go b/tubemq-client-twins/tubemq-client-go/sub/info.go
index 0d4f3c7..eb37d6a 100644
--- a/tubemq-client-twins/tubemq-client-go/sub/info.go
+++ b/tubemq-client-twins/tubemq-client-go/sub/info.go
@@ -20,6 +20,7 @@ package sub
import (
"strconv"
+ "sync/atomic"
"time"
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/config"
@@ -36,14 +37,14 @@ type SubInfo struct {
selectBig bool
sourceCount int32
sessionKey string
- notAllocated bool
- firstRegistered bool
+ notAllocated int32
+ firstRegistered int32
subscribedTime int64
boundPartitions string
topics []string
topicConds []string
topicFilter map[string]bool
- assignedPartitions map[string]uint64
+ assignedPartitions map[string]int64
topicFilters map[string][]string
authInfo *protocol.AuthorizedInfo
masterCertificateInfo *protocol.MasterCertificateInfo
@@ -54,7 +55,8 @@ func NewSubInfo(config *config.Config) *SubInfo {
s := &SubInfo{
boundConsume: config.Consumer.BoundConsume,
subscribedTime: time.Now().UnixNano() / int64(time.Millisecond),
- firstRegistered: false,
+ firstRegistered: 1,
+ notAllocated: 1,
topics: config.Consumer.Topics,
topicFilters: config.Consumer.TopicFilters,
}
@@ -74,9 +76,9 @@ func NewSubInfo(config *config.Config) *SubInfo {
s.sessionKey = config.Consumer.SessionKey
s.sourceCount = int32(config.Consumer.SourceCount)
s.selectBig = config.Consumer.SelectBig
- assignedPartitions := config.Consumer.PartitionOffset
+ s.assignedPartitions = config.Consumer.PartitionOffset
count := 0
- for partition, offset := range assignedPartitions {
+ for partition, offset := range s.assignedPartitions {
if count > 0 {
s.boundPartitions += ","
}
@@ -109,9 +111,9 @@ func (s *SubInfo) GetTopicFilters() map[string][]string {
// GetAssignedPartOffset returns the assignedPartOffset of the given partitionKey.
func (s *SubInfo) GetAssignedPartOffset(partitionKey string) int64 {
- if !s.firstRegistered && s.boundConsume && s.notAllocated {
+ if s.isFirstRegistered() && s.boundConsume && s.IsNotAllocated() {
if offset, ok := s.assignedPartitions[partitionKey]; ok {
- return int64(offset)
+ return offset
}
}
return InValidOffset
@@ -159,7 +161,7 @@ func (s *SubInfo) GetBoundPartInfo() string {
// IsNotAllocated returns whether it is not allocated.
func (s *SubInfo) IsNotAllocated() bool {
- return s.notAllocated
+ return atomic.LoadInt32(&s.notAllocated) == 1
}
// GetAuthorizedInfo returns the authInfo.
@@ -172,9 +174,9 @@ func (s *SubInfo) GetMasterCertificateInfo() *protocol.MasterCertificateInfo {
return s.masterCertificateInfo
}
-// FirstRegistered sets the firstRegistered to true.
-func (s *SubInfo) FirstRegistered() {
- s.firstRegistered = true
+// SetNotFirstRegistered sets the firstRegistered to false.
+func (s *SubInfo) SetNotFirstRegistered() {
+ atomic.StoreInt32(&s.firstRegistered, 0)
}
// SetAuthorizedInfo sets the authorizedInfo.
@@ -187,12 +189,16 @@ func (s *SubInfo) SetMasterCertificateInfo(info *protocol.MasterCertificateInfo)
s.masterCertificateInfo = info
}
-// SetIsNotAllocated sets the notAllocated.
-func (s *SubInfo) SetIsNotAllocated(isNotAllocated bool) {
- s.notAllocated = isNotAllocated
+// CASIsNotAllocated sets the notAllocated.
+func (s *SubInfo) CASIsNotAllocated(expected int32, update int32) {
+ atomic.CompareAndSwapInt32(&s.notAllocated, expected, update)
}
// SetClientID sets the clientID.
func (s *SubInfo) SetClientID(clientID string) {
s.clientID = clientID
}
+
+func (s *SubInfo) isFirstRegistered() bool {
+ return atomic.LoadInt32(&s.firstRegistered) == 1
+}