You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/09/18 03:37:25 UTC

[incubator-inlong] branch INLONG-25 updated: [INLONG-1566]Fix the bug which will cause the user defined partition offset can not take effect. (#1567)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/INLONG-25 by this push:
     new 67ceecd  [INLONG-1566]Fix the bug which will cause the user defined partition offset can not take effect. (#1567)
67ceecd is described below

commit 67ceecd0e5e55a09ff5c3481e0073a7e9b215f63
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Sat Sep 18 11:35:55 2021 +0800

    [INLONG-1566]Fix the bug which will cause the user defined partition offset can not take effect. (#1567)
    
    Signed-off-by: Zijie Lu <ws...@gmail.com>
---
 .../tubemq-client-go/client/consumer_impl.go       |  6 ++--
 .../tubemq-client-go/client/heartbeat.go           |  4 ++-
 .../tubemq-client-go/remote/remote.go              |  3 +-
 tubemq-client-twins/tubemq-client-go/sub/info.go   | 36 +++++++++++++---------
 4 files changed, 29 insertions(+), 20 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
index 8183ce4..d263fea 100644
--- a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
+++ b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
@@ -182,8 +182,8 @@ func (c *consumer) sendRegRequest2Master() (*protocol.RegisterResponseM2C, error
 }
 
 func (c *consumer) processRegisterResponseM2C(rsp *protocol.RegisterResponseM2C) {
-	if rsp.GetNotAllocated() {
-		c.subInfo.SetIsNotAllocated(rsp.GetNotAllocated())
+	if !rsp.GetNotAllocated() {
+		c.subInfo.CASIsNotAllocated(1, 0)
 	}
 	if rsp.GetDefFlowCheckId() != 0 || rsp.GetGroupFlowCheckId() != 0 {
 		if rsp.GetDefFlowCheckId() != 0 {
@@ -471,7 +471,7 @@ func (c *consumer) connect2Broker(event *metadata.ConsumerEvent) {
 			}
 		}
 	}
-	c.subInfo.FirstRegistered()
+	c.subInfo.SetNotFirstRegistered()
 	event.SetEventStatus(metadata.Done)
 	log.Tracef("[connect2Broker] connect event finished, client ID=%s", c.clientID)
 }
diff --git a/tubemq-client-twins/tubemq-client-go/client/heartbeat.go b/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
index 7e9b814..a4918f5 100644
--- a/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
+++ b/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
@@ -151,7 +151,9 @@ func (h *heartbeatManager) sendHeartbeatC2M(m *metadata.Metadata) (*protocol.Hea
 
 func (h *heartbeatManager) processHBResponseM2C(rsp *protocol.HeartResponseM2C) {
 	h.consumer.masterHBRetry = 0
-	h.consumer.subInfo.SetIsNotAllocated(rsp.GetNotAllocated())
+	if !rsp.GetNotAllocated() {
+		h.consumer.subInfo.CASIsNotAllocated(1, 0)
+	}
 	if rsp.GetDefFlowCheckId() != 0 || rsp.GetGroupFlowCheckId() != 0 {
 		if rsp.GetDefFlowCheckId() != 0 {
 			h.consumer.rmtDataCache.UpdateDefFlowCtrlInfo(rsp.GetDefFlowCheckId(), rsp.GetDefFlowControlInfo())
diff --git a/tubemq-client-twins/tubemq-client-go/remote/remote.go b/tubemq-client-twins/tubemq-client-go/remote/remote.go
index b7c65d5..44f70e4 100644
--- a/tubemq-client-twins/tubemq-client-go/remote/remote.go
+++ b/tubemq-client-twins/tubemq-client-go/remote/remote.go
@@ -359,8 +359,9 @@ func (r *RmtDataCache) IsFirstRegister(partitionKey string) bool {
 
 	if _, ok := r.partitionRegBooked[partitionKey]; !ok {
 		r.partitionRegBooked[partitionKey] = true
+		return true
 	}
-	return r.partitionRegBooked[partitionKey]
+	return false
 }
 
 // GetCurConsumeStatus returns the current consumption status.
diff --git a/tubemq-client-twins/tubemq-client-go/sub/info.go b/tubemq-client-twins/tubemq-client-go/sub/info.go
index 0d4f3c7..eb37d6a 100644
--- a/tubemq-client-twins/tubemq-client-go/sub/info.go
+++ b/tubemq-client-twins/tubemq-client-go/sub/info.go
@@ -20,6 +20,7 @@ package sub
 
 import (
 	"strconv"
+	"sync/atomic"
 	"time"
 
 	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/config"
@@ -36,14 +37,14 @@ type SubInfo struct {
 	selectBig             bool
 	sourceCount           int32
 	sessionKey            string
-	notAllocated          bool
-	firstRegistered       bool
+	notAllocated          int32
+	firstRegistered       int32
 	subscribedTime        int64
 	boundPartitions       string
 	topics                []string
 	topicConds            []string
 	topicFilter           map[string]bool
-	assignedPartitions    map[string]uint64
+	assignedPartitions    map[string]int64
 	topicFilters          map[string][]string
 	authInfo              *protocol.AuthorizedInfo
 	masterCertificateInfo *protocol.MasterCertificateInfo
@@ -54,7 +55,8 @@ func NewSubInfo(config *config.Config) *SubInfo {
 	s := &SubInfo{
 		boundConsume:    config.Consumer.BoundConsume,
 		subscribedTime:  time.Now().UnixNano() / int64(time.Millisecond),
-		firstRegistered: false,
+		firstRegistered: 1,
+		notAllocated:    1,
 		topics:          config.Consumer.Topics,
 		topicFilters:    config.Consumer.TopicFilters,
 	}
@@ -74,9 +76,9 @@ func NewSubInfo(config *config.Config) *SubInfo {
 		s.sessionKey = config.Consumer.SessionKey
 		s.sourceCount = int32(config.Consumer.SourceCount)
 		s.selectBig = config.Consumer.SelectBig
-		assignedPartitions := config.Consumer.PartitionOffset
+		s.assignedPartitions = config.Consumer.PartitionOffset
 		count := 0
-		for partition, offset := range assignedPartitions {
+		for partition, offset := range s.assignedPartitions {
 			if count > 0 {
 				s.boundPartitions += ","
 			}
@@ -109,9 +111,9 @@ func (s *SubInfo) GetTopicFilters() map[string][]string {
 
 // GetAssignedPartOffset returns the assignedPartOffset of the given partitionKey.
 func (s *SubInfo) GetAssignedPartOffset(partitionKey string) int64 {
-	if !s.firstRegistered && s.boundConsume && s.notAllocated {
+	if s.isFirstRegistered() && s.boundConsume && s.IsNotAllocated() {
 		if offset, ok := s.assignedPartitions[partitionKey]; ok {
-			return int64(offset)
+			return offset
 		}
 	}
 	return InValidOffset
@@ -159,7 +161,7 @@ func (s *SubInfo) GetBoundPartInfo() string {
 
 // IsNotAllocated returns whether it is not allocated.
 func (s *SubInfo) IsNotAllocated() bool {
-	return s.notAllocated
+	return atomic.LoadInt32(&s.notAllocated) == 1
 }
 
 // GetAuthorizedInfo returns the authInfo.
@@ -172,9 +174,9 @@ func (s *SubInfo) GetMasterCertificateInfo() *protocol.MasterCertificateInfo {
 	return s.masterCertificateInfo
 }
 
-// FirstRegistered sets the firstRegistered to true.
-func (s *SubInfo) FirstRegistered() {
-	s.firstRegistered = true
+// SetNotFirstRegistered sets the firstRegistered to false.
+func (s *SubInfo) SetNotFirstRegistered() {
+	atomic.StoreInt32(&s.firstRegistered, 0)
 }
 
 // SetAuthorizedInfo sets the authorizedInfo.
@@ -187,12 +189,16 @@ func (s *SubInfo) SetMasterCertificateInfo(info *protocol.MasterCertificateInfo)
 	s.masterCertificateInfo = info
 }
 
-// SetIsNotAllocated sets the notAllocated.
-func (s *SubInfo) SetIsNotAllocated(isNotAllocated bool) {
-	s.notAllocated = isNotAllocated
+// CASIsNotAllocated sets the notAllocated.
+func (s *SubInfo) CASIsNotAllocated(expected int32, update int32) {
+	atomic.CompareAndSwapInt32(&s.notAllocated, expected, update)
 }
 
 // SetClientID sets the clientID.
 func (s *SubInfo) SetClientID(clientID string) {
 	s.clientID = clientID
 }
+
+func (s *SubInfo) isFirstRegistered() bool {
+	return atomic.LoadInt32(&s.firstRegistered) == 1
+}