You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/09/27 01:16:35 UTC

[pulsar-client-go] branch master updated: Introduce doneCh for ack error (#777)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new e78dc3c  Introduce doneCh for ack error (#777)
e78dc3c is described below

commit e78dc3c28372fde1e1ce40f727ff183902be59c7
Author: xiaolong ran <xi...@tencent.com>
AuthorDate: Tue Sep 27 09:16:29 2022 +0800

    Introduce doneCh for ack error (#777)
    
    * Introduce doneCh for ack error
    
    Signed-off-by: xiaolongran <xi...@tencent.com>
    
    * remove consumer partition test file
    
    Signed-off-by: xiaolongran <xi...@tencent.com>
    
    * Refactor ack response func
    
    Signed-off-by: xiaolongran <xi...@tencent.com>
    
    * fix ack error
    
    Signed-off-by: xiaolongran <xi...@tencent.com>
    
    Signed-off-by: xiaolongran <xi...@tencent.com>
---
 pulsar/consumer_impl.go       |  6 ++++++
 pulsar/consumer_multitopic.go |  4 ++++
 pulsar/consumer_partition.go  | 31 +++++++++++++++++++++++++++++--
 pulsar/consumer_regex.go      |  4 ++++
 pulsar/impl_message.go        | 11 +++++++++++
 5 files changed, 54 insertions(+), 2 deletions(-)

diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 2328ca8..e6135bf 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -36,7 +36,9 @@ import (
 const defaultNackRedeliveryDelay = 1 * time.Minute
 
 type acker interface {
+	// AckID does not handle errors returned by the Broker side, so no need to wait for doneCh to finish.
 	AckID(id trackingMessageID) error
+	AckIDWithResponse(id trackingMessageID) error
 	NackID(id trackingMessageID)
 	NackMsg(msg Message)
 }
@@ -462,6 +464,10 @@ func (c *consumer) AckID(msgID MessageID) error {
 		return mid.Ack()
 	}
 
+	if c.options.AckWithResponse {
+		return c.consumers[mid.partitionIdx].AckIDWithResponse(mid)
+	}
+
 	return c.consumers[mid.partitionIdx].AckID(mid)
 }
 
diff --git a/pulsar/consumer_multitopic.go b/pulsar/consumer_multitopic.go
index 1d75a24..380dd75 100644
--- a/pulsar/consumer_multitopic.go
+++ b/pulsar/consumer_multitopic.go
@@ -136,6 +136,10 @@ func (c *multiTopicConsumer) AckID(msgID MessageID) error {
 		return errors.New("unable to ack message because consumer is nil")
 	}
 
+	if c.options.AckWithResponse {
+		return mid.AckWithResponse()
+	}
+
 	return mid.Ack()
 }
 
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 50fa3c4..cc9e710 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -366,6 +366,29 @@ func (pc *partitionConsumer) requestGetLastMessageID() (trackingMessageID, error
 	return convertToMessageID(id), nil
 }
 
+func (pc *partitionConsumer) AckIDWithResponse(msgID trackingMessageID) error {
+	if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
+		pc.log.WithField("state", state).Error("Failed to ack by closing or closed consumer")
+		return errors.New("consumer state is closed")
+	}
+
+	ackReq := new(ackRequest)
+	ackReq.doneCh = make(chan struct{})
+	if !msgID.Undefined() && msgID.ack() {
+		pc.metrics.AcksCounter.Inc()
+		pc.metrics.ProcessingTime.Observe(float64(time.Now().UnixNano()-msgID.receivedTime.UnixNano()) / 1.0e9)
+		ackReq.msgID = msgID
+		// send ack request to eventsCh
+		pc.eventsCh <- ackReq
+		// wait for the request to complete
+		<-ackReq.doneCh
+
+		pc.options.interceptors.OnAcknowledge(pc.parentConsumer, msgID)
+	}
+
+	return ackReq.err
+}
+
 func (pc *partitionConsumer) AckID(msgID trackingMessageID) error {
 	if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
 		pc.log.WithField("state", state).Error("Failed to ack by closing or closed consumer")
@@ -373,12 +396,14 @@ func (pc *partitionConsumer) AckID(msgID trackingMessageID) error {
 	}
 
 	ackReq := new(ackRequest)
+	ackReq.doneCh = make(chan struct{})
 	if !msgID.Undefined() && msgID.ack() {
 		pc.metrics.AcksCounter.Inc()
 		pc.metrics.ProcessingTime.Observe(float64(time.Now().UnixNano()-msgID.receivedTime.UnixNano()) / 1.0e9)
 		ackReq.msgID = msgID
 		// send ack request to eventsCh
 		pc.eventsCh <- ackReq
+		// No need to wait for ackReq.doneCh to finish
 
 		pc.options.interceptors.OnAcknowledge(pc.parentConsumer, msgID)
 	}
@@ -562,6 +587,7 @@ func (pc *partitionConsumer) clearMessageChannels() {
 }
 
 func (pc *partitionConsumer) internalAck(req *ackRequest) {
+	defer close(req.doneCh)
 	if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
 		pc.log.WithField("state", state).Error("Failed to ack by closing or closed consumer")
 		return
@@ -986,8 +1012,9 @@ func (pc *partitionConsumer) dispatcher() {
 }
 
 type ackRequest struct {
-	msgID trackingMessageID
-	err   error
+	doneCh chan struct{}
+	msgID  trackingMessageID
+	err    error
 }
 
 type unsubscribeRequest struct {
diff --git a/pulsar/consumer_regex.go b/pulsar/consumer_regex.go
index e4d2077..c55a1c1 100644
--- a/pulsar/consumer_regex.go
+++ b/pulsar/consumer_regex.go
@@ -180,6 +180,10 @@ func (c *regexConsumer) AckID(msgID MessageID) error {
 		return errors.New("consumer is nil in consumer_regex")
 	}
 
+	if c.options.AckWithResponse {
+		return mid.AckWithResponse()
+	}
+
 	return mid.Ack()
 }
 
diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go
index 067439f..d155ae7 100644
--- a/pulsar/impl_message.go
+++ b/pulsar/impl_message.go
@@ -75,6 +75,17 @@ func (id trackingMessageID) Ack() error {
 	return nil
 }
 
+func (id trackingMessageID) AckWithResponse() error {
+	if id.consumer == nil {
+		return errors.New("consumer is nil in trackingMessageID")
+	}
+	if id.ack() {
+		return id.consumer.AckIDWithResponse(id)
+	}
+
+	return nil
+}
+
 func (id trackingMessageID) Nack() {
 	if id.consumer == nil {
 		return