You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/06/24 09:27:20 UTC
[pulsar-client-go] branch master updated: Fix data race while
accessing connection in partitionConsumer (#535)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 96ce2de Fix data race while accessing connection in partitionConsumer (#535)
96ce2de is described below
commit 96ce2de573ea8ad14173f41d6e48b6b48a8b0aa0
Author: dferstay <df...@users.noreply.github.com>
AuthorDate: Thu Jun 24 02:27:12 2021 -0700
Fix data race while accessing connection in partitionConsumer (#535)
The partitionConsumer maintains a few internal go-routines, two of which
access the underlying internal.Connection. The main runEvenstLoop()
go-routine reads the connection field while a separate go-routine is used
to detect connnection loss, initiate reconnection, and sets the connection.
Previously, access to the conn field was not synchronized.
Now, the conn field is read and written atomically; resolving the data race.
Signed-off-by: Daniel Ferstay <df...@splunk.com>
Co-authored-by: Daniel Ferstay <df...@splunk.com>
---
pulsar/consumer_partition.go | 43 +++++++++++++++++++++++++++++--------------
1 file changed, 29 insertions(+), 14 deletions(-)
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 94a914d..daaf759 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -107,7 +107,7 @@ type partitionConsumer struct {
state atomic.Int32
options *partitionConsumerOpts
- conn internal.Connection
+ conn atomic.Value
topic string
name string
@@ -238,7 +238,7 @@ func (pc *partitionConsumer) internalUnsubscribe(unsub *unsubscribeRequest) {
RequestId: proto.Uint64(requestID),
ConsumerId: proto.Uint64(pc.consumerID),
}
- _, err := pc.client.rpcClient.RequestOnCnx(pc.conn, requestID, pb.BaseCommand_UNSUBSCRIBE, cmdUnsubscribe)
+ _, err := pc.client.rpcClient.RequestOnCnx(pc._getConn(), requestID, pb.BaseCommand_UNSUBSCRIBE, cmdUnsubscribe)
if err != nil {
pc.log.WithError(err).Error("Failed to unsubscribe consumer")
unsub.err = err
@@ -248,7 +248,7 @@ func (pc *partitionConsumer) internalUnsubscribe(unsub *unsubscribeRequest) {
return
}
- pc.conn.DeleteConsumeHandler(pc.consumerID)
+ pc._getConn().DeleteConsumeHandler(pc.consumerID)
if pc.nackTracker != nil {
pc.nackTracker.Close()
}
@@ -276,7 +276,7 @@ func (pc *partitionConsumer) requestGetLastMessageID() (trackingMessageID, error
RequestId: proto.Uint64(requestID),
ConsumerId: proto.Uint64(pc.consumerID),
}
- res, err := pc.client.rpcClient.RequestOnCnx(pc.conn, requestID,
+ res, err := pc.client.rpcClient.RequestOnCnx(pc._getConn(), requestID,
pb.BaseCommand_GET_LAST_MESSAGE_ID, cmdGetLastMessageID)
if err != nil {
pc.log.WithError(err).Error("Failed to get last message id")
@@ -326,7 +326,7 @@ func (pc *partitionConsumer) internalRedeliver(req *redeliveryRequest) {
}
}
- pc.client.rpcClient.RequestOnCnxNoWait(pc.conn,
+ pc.client.rpcClient.RequestOnCnxNoWait(pc._getConn(),
pb.BaseCommand_REDELIVER_UNACKNOWLEDGED_MESSAGES, &pb.CommandRedeliverUnacknowledgedMessages{
ConsumerId: proto.Uint64(pc.consumerID),
MessageIds: msgIDDataList,
@@ -399,7 +399,7 @@ func (pc *partitionConsumer) requestSeekWithoutClear(msgID messageID) error {
MessageId: id,
}
- _, err = pc.client.rpcClient.RequestOnCnx(pc.conn, requestID, pb.BaseCommand_SEEK, cmdSeek)
+ _, err = pc.client.rpcClient.RequestOnCnx(pc._getConn(), requestID, pb.BaseCommand_SEEK, cmdSeek)
if err != nil {
pc.log.WithError(err).Error("Failed to reset to message id")
return err
@@ -435,7 +435,7 @@ func (pc *partitionConsumer) internalSeekByTime(seek *seekByTimeRequest) {
MessagePublishTime: proto.Uint64(uint64(seek.publishTime.UnixNano() / int64(time.Millisecond))),
}
- _, err := pc.client.rpcClient.RequestOnCnx(pc.conn, requestID, pb.BaseCommand_SEEK, cmdSeek)
+ _, err := pc.client.rpcClient.RequestOnCnx(pc._getConn(), requestID, pb.BaseCommand_SEEK, cmdSeek)
if err != nil {
pc.log.WithError(err).Error("Failed to reset to message publish time")
seek.err = err
@@ -465,7 +465,7 @@ func (pc *partitionConsumer) internalAck(req *ackRequest) {
AckType: pb.CommandAck_Individual.Enum(),
}
- pc.client.rpcClient.RequestOnCnxNoWait(pc.conn, pb.BaseCommand_ACK, cmdAck)
+ pc.client.rpcClient.RequestOnCnxNoWait(pc._getConn(), pb.BaseCommand_ACK, cmdAck)
}
func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, headersAndPayload internal.Buffer) error {
@@ -607,7 +607,7 @@ func (pc *partitionConsumer) internalFlow(permits uint32) error {
ConsumerId: proto.Uint64(pc.consumerID),
MessagePermits: proto.Uint32(permits),
}
- pc.client.rpcClient.RequestOnCnxNoWait(pc.conn, pb.BaseCommand_FLOW, cmdFlow)
+ pc.client.rpcClient.RequestOnCnxNoWait(pc._getConn(), pb.BaseCommand_FLOW, cmdFlow)
return nil
}
@@ -843,7 +843,7 @@ func (pc *partitionConsumer) internalClose(req *closeRequest) {
ConsumerId: proto.Uint64(pc.consumerID),
RequestId: proto.Uint64(requestID),
}
- _, err := pc.client.rpcClient.RequestOnCnx(pc.conn, requestID, pb.BaseCommand_CLOSE_CONSUMER, cmdClose)
+ _, err := pc.client.rpcClient.RequestOnCnx(pc._getConn(), requestID, pb.BaseCommand_CLOSE_CONSUMER, cmdClose)
if err != nil {
pc.log.WithError(err).Warn("Failed to close consumer")
} else {
@@ -855,7 +855,7 @@ func (pc *partitionConsumer) internalClose(req *closeRequest) {
}
pc.setConsumerState(consumerClosed)
- pc.conn.DeleteConsumeHandler(pc.consumerID)
+ pc._getConn().DeleteConsumeHandler(pc.consumerID)
if pc.nackTracker != nil {
pc.nackTracker.Close()
}
@@ -971,9 +971,9 @@ func (pc *partitionConsumer) grabConn() error {
pc.name = res.Response.ConsumerStatsResponse.GetConsumerName()
}
- pc.conn = res.Cnx
+ pc._setConn(res.Cnx)
pc.log.Info("Connected consumer")
- pc.conn.AddConsumeHandler(pc.consumerID, pc)
+ pc._getConn().AddConsumeHandler(pc.consumerID, pc)
msgType := res.Response.GetType()
@@ -1104,7 +1104,7 @@ func (pc *partitionConsumer) discardCorruptedMessage(msgID *pb.MessageIdData,
"validationError": validationError,
}).Error("Discarding corrupted message")
- pc.client.rpcClient.RequestOnCnxNoWait(pc.conn,
+ pc.client.rpcClient.RequestOnCnxNoWait(pc._getConn(),
pb.BaseCommand_ACK, &pb.CommandAck{
ConsumerId: proto.Uint64(pc.consumerID),
MessageId: []*pb.MessageIdData{msgID},
@@ -1113,6 +1113,21 @@ func (pc *partitionConsumer) discardCorruptedMessage(msgID *pb.MessageIdData,
})
}
+// _setConn sets the internal connection field of this partition consumer atomically.
+// Note: should only be called by this partition consumer when a new connection is available.
+func (pc *partitionConsumer) _setConn(conn internal.Connection) {
+ pc.conn.Store(conn)
+}
+
+// _getConn returns internal connection field of this partition consumer atomically.
+// Note: should only be called by this partition consumer before attempting to use the connection
+func (pc *partitionConsumer) _getConn() internal.Connection {
+ // Invariant: The conn must be non-nill for the lifetime of the partitionConsumer.
+ // For this reason we leave this cast unchecked and panic() if the
+ // invariant is broken
+ return pc.conn.Load().(internal.Connection)
+}
+
func convertToMessageIDData(msgID trackingMessageID) *pb.MessageIdData {
if msgID.Undefined() {
return nil