You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2022/05/24 06:32:09 UTC
[pulsar-client-go] branch master updated: Support ack response for Go SDK (#776)
This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new c41616b Support ack response for Go SDK (#776)
c41616b is described below
commit c41616b2f5125603fe252a90ca004bc0bd3d76d8
Author: xiaolong ran <xi...@tencent.com>
AuthorDate: Tue May 24 14:32:05 2022 +0800
Support ack response for Go SDK (#776)
* Support ack response for Go SDK
Signed-off-by: xiaolongran <xi...@tencent.com>
* add test case for this change
Signed-off-by: xiaolongran <xi...@tencent.com>
---
pulsar/consumer.go | 7 +++++++
pulsar/consumer_impl.go | 1 +
pulsar/consumer_partition.go | 11 +++++++++++
pulsar/consumer_test.go | 35 +++++++++++++++++++++++++++++++++++
pulsar/internal/connection.go | 23 +++++++++++++++++++++++
5 files changed, 77 insertions(+)
diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index dfe27c5..2df1637 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -182,6 +182,13 @@ type ConsumerOptions struct {
// > Notice: the NackBackoffPolicy will not work with `consumer.NackID(MessageID)`
// > because we are not able to get the redeliveryCount from the message ID.
NackBackoffPolicy NackBackoffPolicy
+
+ // AckWithResponse is a return value added to Ack Command, and its purpose is to confirm whether Ack Command
+ // is executed correctly on the Broker side. When set to true, the error information returned by the Ack
+ // method contains the return value of the Ack Command processed by the Broker side; when set to false, the
+ // error information of the Ack method only contains errors that may occur in the Go SDK's own processing.
+ // Default: false
+ AckWithResponse bool
}
// Consumer is an interface that abstracts behavior of Pulsar's consumer
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index e36f040..e887538 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -361,6 +361,7 @@ func (c *consumer) internalTopicSubscribeToPartitions() error {
keySharedPolicy: c.options.KeySharedPolicy,
schema: c.options.Schema,
decryption: c.options.Decryption,
+ ackWithResponse: c.options.AckWithResponse,
}
cons, err := newPartitionConsumer(c, c.client, opts, c.messageCh, c.dlq, c.metrics)
ch <- ConsumerError{
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 30ffcb2..06ccfd5 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -104,6 +104,7 @@ type partitionConsumerOpts struct {
keySharedPolicy *KeySharedPolicy
schema Schema
decryption *MessageDecryptionInfo
+ ackWithResponse bool
}
type partitionConsumer struct {
@@ -525,12 +526,22 @@ func (pc *partitionConsumer) internalAck(req *ackRequest) {
EntryId: proto.Uint64(uint64(msgID.entryID)),
}
+ reqID := pc.client.rpcClient.NewRequestID()
cmdAck := &pb.CommandAck{
ConsumerId: proto.Uint64(pc.consumerID),
MessageId: messageIDs,
AckType: pb.CommandAck_Individual.Enum(),
}
+ if pc.options.ackWithResponse {
+ _, err := pc.client.rpcClient.RequestOnCnx(pc._getConn(), reqID, pb.BaseCommand_ACK, cmdAck)
+ if err != nil {
+ pc.log.WithError(err).Error("Ack with response error")
+ req.err = err
+ }
+ return
+ }
+
err := pc.client.rpcClient.RequestOnCnxNoWait(pc._getConn(), pb.BaseCommand_ACK, cmdAck)
if err != nil {
pc.log.Error("Connection was closed when request ack cmd")
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index cadd8e4..0366884 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -1321,6 +1321,41 @@ func TestRLQ(t *testing.T) {
assert.Nil(t, checkMsg)
}
+func TestAckWithResponse(t *testing.T) {
+ now := time.Now().Unix()
+ topic01 := fmt.Sprintf("persistent://public/default/topic-%d-01", now)
+ ctx := context.Background()
+
+ client, err := NewClient(ClientOptions{URL: lookupURL})
+ assert.Nil(t, err)
+ defer client.Close()
+
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topic01,
+ SubscriptionName: "my-sub",
+ Type: Shared,
+ SubscriptionInitialPosition: SubscriptionPositionEarliest,
+ AckWithResponse: true,
+ })
+ assert.Nil(t, err)
+ defer consumer.Close()
+
+ producer01, err := client.CreateProducer(ProducerOptions{Topic: topic01})
+ assert.Nil(t, err)
+ defer producer01.Close()
+ for i := 0; i < 10; i++ {
+ _, err = producer01.Send(ctx, &ProducerMessage{Payload: []byte(fmt.Sprintf("MSG_01_%d", i))})
+ assert.Nil(t, err)
+ }
+
+ for i := 0; i < 10; i++ {
+ msg, err := consumer.Receive(ctx)
+ assert.Nil(t, err)
+ err = consumer.Ack(msg)
+ assert.Nil(t, err)
+ }
+}
+
func TestRLQMultiTopics(t *testing.T) {
now := time.Now().Unix()
topic01 := fmt.Sprintf("persistent://public/default/topic-%d-1", now)
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index da9a901..fa8d055 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -564,6 +564,9 @@ func (c *connection) internalReceivedCommand(cmd *pb.BaseCommand, headersAndPayl
case pb.BaseCommand_MESSAGE:
c.handleMessage(cmd.GetMessage(), headersAndPayload)
+ case pb.BaseCommand_ACK_RESPONSE:
+ c.handleAckResponse(cmd.GetAckResponse())
+
case pb.BaseCommand_PING:
c.handlePing()
case pb.BaseCommand_PONG:
@@ -676,6 +679,26 @@ func (c *connection) handleResponseError(serverError *pb.CommandError) {
request.callback(nil, errors.New(errMsg))
}
+func (c *connection) handleAckResponse(ackResponse *pb.CommandAckResponse) {
+ requestID := ackResponse.GetRequestId()
+ consumerID := ackResponse.GetConsumerId()
+
+ request, ok := c.deletePendingRequest(requestID)
+ if !ok {
+ c.log.Warnf("AckResponse has complete when receive response! requestId : %d, consumerId : %d",
+ requestID, consumerID)
+ return
+ }
+
+ if ackResponse.GetMessage() == "" {
+ request.callback(nil, nil)
+ return
+ }
+
+ errMsg := fmt.Sprintf("ack response error: %s: %s", ackResponse.GetError(), ackResponse.GetMessage())
+ request.callback(nil, errors.New(errMsg))
+}
+
func (c *connection) handleSendReceipt(response *pb.CommandSendReceipt) {
producerID := response.GetProducerId()