You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2022/05/06 14:15:48 UTC

[pulsar-client-go] branch master updated: Add consumer state check when request commands (#772)

This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e37f2f  Add consumer state check when request commands (#772)
1e37f2f is described below

commit 1e37f2fc93f37f39f1c1cd0fa2cba79375902f7c
Author: xiaolong ran <xi...@tencent.com>
AuthorDate: Fri May 6 22:15:44 2022 +0800

    Add consumer state check when request commands (#772)
    
    * Add consumer state check when request commands
    
    Signed-off-by: xiaolongran <xi...@tencent.com>
    
    * fix a little
    
    Signed-off-by: xiaolongran <xi...@tencent.com>
---
 pulsar/consumer_partition.go | 68 ++++++++++++++++++++++++++++++++++++++++----
 1 file changed, 62 insertions(+), 6 deletions(-)

diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index b06474d..db2994f 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -18,6 +18,7 @@
 package pulsar
 
 import (
+	"errors"
 	"fmt"
 	"math"
 	"strings"
@@ -278,6 +279,10 @@ func (pc *partitionConsumer) internalUnsubscribe(unsub *unsubscribeRequest) {
 }
 
 func (pc *partitionConsumer) getLastMessageID() (trackingMessageID, error) {
+	if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
+		pc.log.WithField("state", state).Error("Failed to redeliver closing or closed consumer")
+		return trackingMessageID{}, errors.New("failed to redeliver closing or closed consumer")
+	}
 	req := &getLastMsgIDRequest{doneCh: make(chan struct{})}
 	pc.eventsCh <- req
 
@@ -292,6 +297,11 @@ func (pc *partitionConsumer) internalGetLastMessageID(req *getLastMsgIDRequest)
 }
 
 func (pc *partitionConsumer) requestGetLastMessageID() (trackingMessageID, error) {
+	if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
+		pc.log.WithField("state", state).Error("Failed to getLastMessageID closing or closed consumer")
+		return trackingMessageID{}, errors.New("failed to getLastMessageID closing or closed consumer")
+	}
+
 	requestID := pc.client.rpcClient.NewRequestID()
 	cmdGetLastMessageID := &pb.CommandGetLastMessageId{
 		RequestId:  proto.Uint64(requestID),
@@ -308,6 +318,10 @@ func (pc *partitionConsumer) requestGetLastMessageID() (trackingMessageID, error
 }
 
 func (pc *partitionConsumer) AckID(msgID trackingMessageID) {
+	if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
+		pc.log.WithField("state", state).Error("Failed to ack by closing or closed consumer")
+		return
+	}
 	if !msgID.Undefined() && msgID.ack() {
 		pc.metrics.AcksCounter.Inc()
 		pc.metrics.ProcessingTime.Observe(float64(time.Now().UnixNano()-msgID.receivedTime.UnixNano()) / 1.0e9)
@@ -331,6 +345,10 @@ func (pc *partitionConsumer) NackMsg(msg Message) {
 }
 
 func (pc *partitionConsumer) Redeliver(msgIds []messageID) {
+	if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
+		pc.log.WithField("state", state).Error("Failed to redeliver closing or closed consumer")
+		return
+	}
 	pc.eventsCh <- &redeliveryRequest{msgIds}
 
 	iMsgIds := make([]MessageID, len(msgIds))
@@ -341,6 +359,10 @@ func (pc *partitionConsumer) Redeliver(msgIds []messageID) {
 }
 
 func (pc *partitionConsumer) internalRedeliver(req *redeliveryRequest) {
+	if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
+		pc.log.WithField("state", state).Error("Failed to redeliver closing or closed consumer")
+		return
+	}
 	msgIds := req.msgIds
 	pc.log.Debug("Request redelivery after negative ack for messages", msgIds)
 
@@ -352,11 +374,14 @@ func (pc *partitionConsumer) internalRedeliver(req *redeliveryRequest) {
 		}
 	}
 
-	pc.client.rpcClient.RequestOnCnxNoWait(pc._getConn(),
+	err := pc.client.rpcClient.RequestOnCnxNoWait(pc._getConn(),
 		pb.BaseCommand_REDELIVER_UNACKNOWLEDGED_MESSAGES, &pb.CommandRedeliverUnacknowledgedMessages{
 			ConsumerId: proto.Uint64(pc.consumerID),
 			MessageIds: msgIDDataList,
 		})
+	if err != nil {
+		pc.log.Error("Connection was closed when request redeliver cmd")
+	}
 }
 
 func (pc *partitionConsumer) getConsumerState() consumerState {
@@ -381,6 +406,10 @@ func (pc *partitionConsumer) Close() {
 }
 
 func (pc *partitionConsumer) Seek(msgID trackingMessageID) error {
+	if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
+		pc.log.WithField("state", state).Error("Failed to seek by closing or closed consumer")
+		return errors.New("failed to seek by closing or closed consumer")
+	}
 	req := &seekRequest{
 		doneCh: make(chan struct{}),
 		msgID:  msgID,
@@ -407,7 +436,7 @@ func (pc *partitionConsumer) requestSeek(msgID messageID) error {
 func (pc *partitionConsumer) requestSeekWithoutClear(msgID messageID) error {
 	state := pc.getConsumerState()
 	if state == consumerClosing || state == consumerClosed {
-		pc.log.WithField("state", state).Error("Consumer is closing or has closed")
+		pc.log.WithField("state", state).Error("failed seek by consumer is closing or has closed")
 		return nil
 	}
 
@@ -434,6 +463,10 @@ func (pc *partitionConsumer) requestSeekWithoutClear(msgID messageID) error {
 }
 
 func (pc *partitionConsumer) SeekByTime(time time.Time) error {
+	if state := pc.getConsumerState(); state == consumerClosing || state == consumerClosed {
+		pc.log.WithField("state", pc.state).Error("Failed seekByTime by consumer is closing or has closed")
+		return errors.New("failed seekByTime by consumer is closing or has closed")
+	}
 	req := &seekByTimeRequest{
 		doneCh:      make(chan struct{}),
 		publishTime: time,
@@ -450,7 +483,7 @@ func (pc *partitionConsumer) internalSeekByTime(seek *seekByTimeRequest) {
 
 	state := pc.getConsumerState()
 	if state == consumerClosing || state == consumerClosed {
-		pc.log.WithField("state", pc.state).Error("Consumer is closing or has closed")
+		pc.log.WithField("state", pc.state).Error("Failed seekByTime by consumer is closing or has closed")
 		return
 	}
 
@@ -477,6 +510,10 @@ func (pc *partitionConsumer) clearMessageChannels() {
 }
 
 func (pc *partitionConsumer) internalAck(req *ackRequest) {
+	if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
+		pc.log.WithField("state", state).Error("Failed to ack by closing or closed consumer")
+		return
+	}
 	msgID := req.msgID
 
 	messageIDs := make([]*pb.MessageIdData, 1)
@@ -491,7 +528,10 @@ func (pc *partitionConsumer) internalAck(req *ackRequest) {
 		AckType:    pb.CommandAck_Individual.Enum(),
 	}
 
-	pc.client.rpcClient.RequestOnCnxNoWait(pc._getConn(), pb.BaseCommand_ACK, cmdAck)
+	err := pc.client.rpcClient.RequestOnCnxNoWait(pc._getConn(), pb.BaseCommand_ACK, cmdAck)
+	if err != nil {
+		pc.log.Error("Connection was closed when request ack cmd")
+	}
 }
 
 func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, headersAndPayload internal.Buffer) error {
@@ -728,6 +768,10 @@ func (pc *partitionConsumer) ConnectionClosed() {
 // before the application is ready to consume them. After the consumer is ready,
 // the client needs to give permission to the broker to push messages.
 func (pc *partitionConsumer) internalFlow(permits uint32) error {
+	if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
+		pc.log.WithField("state", state).Error("Failed to redeliver closing or closed consumer")
+		return errors.New("consumer closing or closed")
+	}
 	if permits == 0 {
 		return fmt.Errorf("invalid number of permits requested: %d", permits)
 	}
@@ -736,7 +780,11 @@ func (pc *partitionConsumer) internalFlow(permits uint32) error {
 		ConsumerId:     proto.Uint64(pc.consumerID),
 		MessagePermits: proto.Uint32(permits),
 	}
-	pc.client.rpcClient.RequestOnCnxNoWait(pc._getConn(), pb.BaseCommand_FLOW, cmdFlow)
+	err := pc.client.rpcClient.RequestOnCnxNoWait(pc._getConn(), pb.BaseCommand_FLOW, cmdFlow)
+	if err != nil {
+		pc.log.Error("Connection was closed when request flow cmd")
+		return err
+	}
 
 	return nil
 }
@@ -1259,18 +1307,26 @@ func (pc *partitionConsumer) initializeCompressionProvider(
 
 func (pc *partitionConsumer) discardCorruptedMessage(msgID *pb.MessageIdData,
 	validationError pb.CommandAck_ValidationError) {
+	if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
+		pc.log.WithField("state", state).Error("Failed to discardCorruptedMessage " +
+			"by closing or closed consumer")
+		return
+	}
 	pc.log.WithFields(log.Fields{
 		"msgID":           msgID,
 		"validationError": validationError,
 	}).Error("Discarding corrupted message")
 
-	pc.client.rpcClient.RequestOnCnxNoWait(pc._getConn(),
+	err := pc.client.rpcClient.RequestOnCnxNoWait(pc._getConn(),
 		pb.BaseCommand_ACK, &pb.CommandAck{
 			ConsumerId:      proto.Uint64(pc.consumerID),
 			MessageId:       []*pb.MessageIdData{msgID},
 			AckType:         pb.CommandAck_Individual.Enum(),
 			ValidationError: validationError.Enum(),
 		})
+	if err != nil {
+		pc.log.Error("Connection was closed when request ack cmd")
+	}
 }
 
 // _setConn sets the internal connection field of this partition consumer atomically.