You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/09/27 01:16:35 UTC
[pulsar-client-go] branch master updated: Introduce doneCh for ack error (#777)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new e78dc3c Introduce doneCh for ack error (#777)
e78dc3c is described below
commit e78dc3c28372fde1e1ce40f727ff183902be59c7
Author: xiaolong ran <xi...@tencent.com>
AuthorDate: Tue Sep 27 09:16:29 2022 +0800
Introduce doneCh for ack error (#777)
* Introduce doneCh for ack error
Signed-off-by: xiaolongran <xi...@tencent.com>
* remove consumer partition test file
Signed-off-by: xiaolongran <xi...@tencent.com>
* Refactor ack response func
Signed-off-by: xiaolongran <xi...@tencent.com>
* fix ack error
Signed-off-by: xiaolongran <xi...@tencent.com>
Signed-off-by: xiaolongran <xi...@tencent.com>
---
pulsar/consumer_impl.go | 6 ++++++
pulsar/consumer_multitopic.go | 4 ++++
pulsar/consumer_partition.go | 31 +++++++++++++++++++++++++++++--
pulsar/consumer_regex.go | 4 ++++
pulsar/impl_message.go | 11 +++++++++++
5 files changed, 54 insertions(+), 2 deletions(-)
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 2328ca8..e6135bf 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -36,7 +36,9 @@ import (
const defaultNackRedeliveryDelay = 1 * time.Minute
type acker interface {
+ // AckID does not handle errors returned by the Broker side, so no need to wait for doneCh to finish.
AckID(id trackingMessageID) error
+ AckIDWithResponse(id trackingMessageID) error
NackID(id trackingMessageID)
NackMsg(msg Message)
}
@@ -462,6 +464,10 @@ func (c *consumer) AckID(msgID MessageID) error {
return mid.Ack()
}
+ if c.options.AckWithResponse {
+ return c.consumers[mid.partitionIdx].AckIDWithResponse(mid)
+ }
+
return c.consumers[mid.partitionIdx].AckID(mid)
}
diff --git a/pulsar/consumer_multitopic.go b/pulsar/consumer_multitopic.go
index 1d75a24..380dd75 100644
--- a/pulsar/consumer_multitopic.go
+++ b/pulsar/consumer_multitopic.go
@@ -136,6 +136,10 @@ func (c *multiTopicConsumer) AckID(msgID MessageID) error {
return errors.New("unable to ack message because consumer is nil")
}
+ if c.options.AckWithResponse {
+ return mid.AckWithResponse()
+ }
+
return mid.Ack()
}
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 50fa3c4..cc9e710 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -366,6 +366,29 @@ func (pc *partitionConsumer) requestGetLastMessageID() (trackingMessageID, error
return convertToMessageID(id), nil
}
+func (pc *partitionConsumer) AckIDWithResponse(msgID trackingMessageID) error {
+ if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
+ pc.log.WithField("state", state).Error("Failed to ack by closing or closed consumer")
+ return errors.New("consumer state is closed")
+ }
+
+ ackReq := new(ackRequest)
+ ackReq.doneCh = make(chan struct{})
+ if !msgID.Undefined() && msgID.ack() {
+ pc.metrics.AcksCounter.Inc()
+ pc.metrics.ProcessingTime.Observe(float64(time.Now().UnixNano()-msgID.receivedTime.UnixNano()) / 1.0e9)
+ ackReq.msgID = msgID
+ // send ack request to eventsCh
+ pc.eventsCh <- ackReq
+ // wait for the request to complete
+ <-ackReq.doneCh
+
+ pc.options.interceptors.OnAcknowledge(pc.parentConsumer, msgID)
+ }
+
+ return ackReq.err
+}
+
func (pc *partitionConsumer) AckID(msgID trackingMessageID) error {
if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
pc.log.WithField("state", state).Error("Failed to ack by closing or closed consumer")
@@ -373,12 +396,14 @@ func (pc *partitionConsumer) AckID(msgID trackingMessageID) error {
}
ackReq := new(ackRequest)
+ ackReq.doneCh = make(chan struct{})
if !msgID.Undefined() && msgID.ack() {
pc.metrics.AcksCounter.Inc()
pc.metrics.ProcessingTime.Observe(float64(time.Now().UnixNano()-msgID.receivedTime.UnixNano()) / 1.0e9)
ackReq.msgID = msgID
// send ack request to eventsCh
pc.eventsCh <- ackReq
+ // No need to wait for ackReq.doneCh to finish
pc.options.interceptors.OnAcknowledge(pc.parentConsumer, msgID)
}
@@ -562,6 +587,7 @@ func (pc *partitionConsumer) clearMessageChannels() {
}
func (pc *partitionConsumer) internalAck(req *ackRequest) {
+ defer close(req.doneCh)
if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
pc.log.WithField("state", state).Error("Failed to ack by closing or closed consumer")
return
@@ -986,8 +1012,9 @@ func (pc *partitionConsumer) dispatcher() {
}
type ackRequest struct {
- msgID trackingMessageID
- err error
+ doneCh chan struct{}
+ msgID trackingMessageID
+ err error
}
type unsubscribeRequest struct {
diff --git a/pulsar/consumer_regex.go b/pulsar/consumer_regex.go
index e4d2077..c55a1c1 100644
--- a/pulsar/consumer_regex.go
+++ b/pulsar/consumer_regex.go
@@ -180,6 +180,10 @@ func (c *regexConsumer) AckID(msgID MessageID) error {
return errors.New("consumer is nil in consumer_regex")
}
+ if c.options.AckWithResponse {
+ return mid.AckWithResponse()
+ }
+
return mid.Ack()
}
diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go
index 067439f..d155ae7 100644
--- a/pulsar/impl_message.go
+++ b/pulsar/impl_message.go
@@ -75,6 +75,17 @@ func (id trackingMessageID) Ack() error {
return nil
}
+func (id trackingMessageID) AckWithResponse() error {
+ if id.consumer == nil {
+ return errors.New("consumer is nil in trackingMessageID")
+ }
+ if id.ack() {
+ return id.consumer.AckIDWithResponse(id)
+ }
+
+ return nil
+}
+
func (id trackingMessageID) Nack() {
if id.consumer == nil {
return