You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2022/05/06 14:15:48 UTC
[pulsar-client-go] branch master updated: Add consumer state check when request commands (#772)
This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 1e37f2f Add consumer state check when request commands (#772)
1e37f2f is described below
commit 1e37f2fc93f37f39f1c1cd0fa2cba79375902f7c
Author: xiaolong ran <xi...@tencent.com>
AuthorDate: Fri May 6 22:15:44 2022 +0800
Add consumer state check when request commands (#772)
* Add consumer state check when request commands
Signed-off-by: xiaolongran <xi...@tencent.com>
* fix a little
Signed-off-by: xiaolongran <xi...@tencent.com>
---
pulsar/consumer_partition.go | 68 ++++++++++++++++++++++++++++++++++++++++----
1 file changed, 62 insertions(+), 6 deletions(-)
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index b06474d..db2994f 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -18,6 +18,7 @@
package pulsar
import (
+ "errors"
"fmt"
"math"
"strings"
@@ -278,6 +279,10 @@ func (pc *partitionConsumer) internalUnsubscribe(unsub *unsubscribeRequest) {
}
func (pc *partitionConsumer) getLastMessageID() (trackingMessageID, error) {
+ if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
+ pc.log.WithField("state", state).Error("Failed to redeliver closing or closed consumer")
+ return trackingMessageID{}, errors.New("failed to redeliver closing or closed consumer")
+ }
req := &getLastMsgIDRequest{doneCh: make(chan struct{})}
pc.eventsCh <- req
@@ -292,6 +297,11 @@ func (pc *partitionConsumer) internalGetLastMessageID(req *getLastMsgIDRequest)
}
func (pc *partitionConsumer) requestGetLastMessageID() (trackingMessageID, error) {
+ if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
+ pc.log.WithField("state", state).Error("Failed to getLastMessageID closing or closed consumer")
+ return trackingMessageID{}, errors.New("failed to getLastMessageID closing or closed consumer")
+ }
+
requestID := pc.client.rpcClient.NewRequestID()
cmdGetLastMessageID := &pb.CommandGetLastMessageId{
RequestId: proto.Uint64(requestID),
@@ -308,6 +318,10 @@ func (pc *partitionConsumer) requestGetLastMessageID() (trackingMessageID, error
}
func (pc *partitionConsumer) AckID(msgID trackingMessageID) {
+ if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
+ pc.log.WithField("state", state).Error("Failed to ack by closing or closed consumer")
+ return
+ }
if !msgID.Undefined() && msgID.ack() {
pc.metrics.AcksCounter.Inc()
pc.metrics.ProcessingTime.Observe(float64(time.Now().UnixNano()-msgID.receivedTime.UnixNano()) / 1.0e9)
@@ -331,6 +345,10 @@ func (pc *partitionConsumer) NackMsg(msg Message) {
}
func (pc *partitionConsumer) Redeliver(msgIds []messageID) {
+ if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
+ pc.log.WithField("state", state).Error("Failed to redeliver closing or closed consumer")
+ return
+ }
pc.eventsCh <- &redeliveryRequest{msgIds}
iMsgIds := make([]MessageID, len(msgIds))
@@ -341,6 +359,10 @@ func (pc *partitionConsumer) Redeliver(msgIds []messageID) {
}
func (pc *partitionConsumer) internalRedeliver(req *redeliveryRequest) {
+ if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
+ pc.log.WithField("state", state).Error("Failed to redeliver closing or closed consumer")
+ return
+ }
msgIds := req.msgIds
pc.log.Debug("Request redelivery after negative ack for messages", msgIds)
@@ -352,11 +374,14 @@ func (pc *partitionConsumer) internalRedeliver(req *redeliveryRequest) {
}
}
- pc.client.rpcClient.RequestOnCnxNoWait(pc._getConn(),
+ err := pc.client.rpcClient.RequestOnCnxNoWait(pc._getConn(),
pb.BaseCommand_REDELIVER_UNACKNOWLEDGED_MESSAGES, &pb.CommandRedeliverUnacknowledgedMessages{
ConsumerId: proto.Uint64(pc.consumerID),
MessageIds: msgIDDataList,
})
+ if err != nil {
+ pc.log.Error("Connection was closed when request redeliver cmd")
+ }
}
func (pc *partitionConsumer) getConsumerState() consumerState {
@@ -381,6 +406,10 @@ func (pc *partitionConsumer) Close() {
}
func (pc *partitionConsumer) Seek(msgID trackingMessageID) error {
+ if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
+ pc.log.WithField("state", state).Error("Failed to seek by closing or closed consumer")
+ return errors.New("failed to seek by closing or closed consumer")
+ }
req := &seekRequest{
doneCh: make(chan struct{}),
msgID: msgID,
@@ -407,7 +436,7 @@ func (pc *partitionConsumer) requestSeek(msgID messageID) error {
func (pc *partitionConsumer) requestSeekWithoutClear(msgID messageID) error {
state := pc.getConsumerState()
if state == consumerClosing || state == consumerClosed {
- pc.log.WithField("state", state).Error("Consumer is closing or has closed")
+ pc.log.WithField("state", state).Error("failed seek by consumer is closing or has closed")
return nil
}
@@ -434,6 +463,10 @@ func (pc *partitionConsumer) requestSeekWithoutClear(msgID messageID) error {
}
func (pc *partitionConsumer) SeekByTime(time time.Time) error {
+ if state := pc.getConsumerState(); state == consumerClosing || state == consumerClosed {
+ pc.log.WithField("state", pc.state).Error("Failed seekByTime by consumer is closing or has closed")
+ return errors.New("failed seekByTime by consumer is closing or has closed")
+ }
req := &seekByTimeRequest{
doneCh: make(chan struct{}),
publishTime: time,
@@ -450,7 +483,7 @@ func (pc *partitionConsumer) internalSeekByTime(seek *seekByTimeRequest) {
state := pc.getConsumerState()
if state == consumerClosing || state == consumerClosed {
- pc.log.WithField("state", pc.state).Error("Consumer is closing or has closed")
+ pc.log.WithField("state", pc.state).Error("Failed seekByTime by consumer is closing or has closed")
return
}
@@ -477,6 +510,10 @@ func (pc *partitionConsumer) clearMessageChannels() {
}
func (pc *partitionConsumer) internalAck(req *ackRequest) {
+ if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
+ pc.log.WithField("state", state).Error("Failed to ack by closing or closed consumer")
+ return
+ }
msgID := req.msgID
messageIDs := make([]*pb.MessageIdData, 1)
@@ -491,7 +528,10 @@ func (pc *partitionConsumer) internalAck(req *ackRequest) {
AckType: pb.CommandAck_Individual.Enum(),
}
- pc.client.rpcClient.RequestOnCnxNoWait(pc._getConn(), pb.BaseCommand_ACK, cmdAck)
+ err := pc.client.rpcClient.RequestOnCnxNoWait(pc._getConn(), pb.BaseCommand_ACK, cmdAck)
+ if err != nil {
+ pc.log.Error("Connection was closed when request ack cmd")
+ }
}
func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, headersAndPayload internal.Buffer) error {
@@ -728,6 +768,10 @@ func (pc *partitionConsumer) ConnectionClosed() {
// before the application is ready to consume them. After the consumer is ready,
// the client needs to give permission to the broker to push messages.
func (pc *partitionConsumer) internalFlow(permits uint32) error {
+ if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
+ pc.log.WithField("state", state).Error("Failed to redeliver closing or closed consumer")
+ return errors.New("consumer closing or closed")
+ }
if permits == 0 {
return fmt.Errorf("invalid number of permits requested: %d", permits)
}
@@ -736,7 +780,11 @@ func (pc *partitionConsumer) internalFlow(permits uint32) error {
ConsumerId: proto.Uint64(pc.consumerID),
MessagePermits: proto.Uint32(permits),
}
- pc.client.rpcClient.RequestOnCnxNoWait(pc._getConn(), pb.BaseCommand_FLOW, cmdFlow)
+ err := pc.client.rpcClient.RequestOnCnxNoWait(pc._getConn(), pb.BaseCommand_FLOW, cmdFlow)
+ if err != nil {
+ pc.log.Error("Connection was closed when request flow cmd")
+ return err
+ }
return nil
}
@@ -1259,18 +1307,26 @@ func (pc *partitionConsumer) initializeCompressionProvider(
func (pc *partitionConsumer) discardCorruptedMessage(msgID *pb.MessageIdData,
validationError pb.CommandAck_ValidationError) {
+ if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
+ pc.log.WithField("state", state).Error("Failed to discardCorruptedMessage " +
+ "by closing or closed consumer")
+ return
+ }
pc.log.WithFields(log.Fields{
"msgID": msgID,
"validationError": validationError,
}).Error("Discarding corrupted message")
- pc.client.rpcClient.RequestOnCnxNoWait(pc._getConn(),
+ err := pc.client.rpcClient.RequestOnCnxNoWait(pc._getConn(),
pb.BaseCommand_ACK, &pb.CommandAck{
ConsumerId: proto.Uint64(pc.consumerID),
MessageId: []*pb.MessageIdData{msgID},
AckType: pb.CommandAck_Individual.Enum(),
ValidationError: validationError.Enum(),
})
+ if err != nil {
+ pc.log.Error("Connection was closed when request ack cmd")
+ }
}
// _setConn sets the internal connection field of this partition consumer atomically.