You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2022/05/24 06:32:09 UTC

[pulsar-client-go] branch master updated: Support ack response for Go SDK (#776)

This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new c41616b  Support ack response for Go SDK (#776)
c41616b is described below

commit c41616b2f5125603fe252a90ca004bc0bd3d76d8
Author: xiaolong ran <xi...@tencent.com>
AuthorDate: Tue May 24 14:32:05 2022 +0800

    Support ack response for Go SDK (#776)
    
    * Support ack response for Go SDK
    
    Signed-off-by: xiaolongran <xi...@tencent.com>
    
    * add test case for this change
    
    Signed-off-by: xiaolongran <xi...@tencent.com>
---
 pulsar/consumer.go            |  7 +++++++
 pulsar/consumer_impl.go       |  1 +
 pulsar/consumer_partition.go  | 11 +++++++++++
 pulsar/consumer_test.go       | 35 +++++++++++++++++++++++++++++++++++
 pulsar/internal/connection.go | 23 +++++++++++++++++++++++
 5 files changed, 77 insertions(+)

diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index dfe27c5..2df1637 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -182,6 +182,13 @@ type ConsumerOptions struct {
 	// > Notice: the NackBackoffPolicy will not work with `consumer.NackID(MessageID)`
 	// > because we are not able to get the redeliveryCount from the message ID.
 	NackBackoffPolicy NackBackoffPolicy
+
+	// AckWithResponse is a return value added to Ack Command, and its purpose is to confirm whether Ack Command
+	// is executed correctly on the Broker side. When set to true, the error information returned by the Ack
+	// method contains the return value of the Ack Command processed by the Broker side; when set to false, the
+	// error information of the Ack method only contains errors that may occur in the Go SDK's own processing.
+	// Default: false
+	AckWithResponse bool
 }
 
 // Consumer is an interface that abstracts behavior of Pulsar's consumer
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index e36f040..e887538 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -361,6 +361,7 @@ func (c *consumer) internalTopicSubscribeToPartitions() error {
 				keySharedPolicy:            c.options.KeySharedPolicy,
 				schema:                     c.options.Schema,
 				decryption:                 c.options.Decryption,
+				ackWithResponse:            c.options.AckWithResponse,
 			}
 			cons, err := newPartitionConsumer(c, c.client, opts, c.messageCh, c.dlq, c.metrics)
 			ch <- ConsumerError{
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 30ffcb2..06ccfd5 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -104,6 +104,7 @@ type partitionConsumerOpts struct {
 	keySharedPolicy            *KeySharedPolicy
 	schema                     Schema
 	decryption                 *MessageDecryptionInfo
+	ackWithResponse            bool
 }
 
 type partitionConsumer struct {
@@ -525,12 +526,22 @@ func (pc *partitionConsumer) internalAck(req *ackRequest) {
 		EntryId:  proto.Uint64(uint64(msgID.entryID)),
 	}
 
+	reqID := pc.client.rpcClient.NewRequestID()
 	cmdAck := &pb.CommandAck{
 		ConsumerId: proto.Uint64(pc.consumerID),
 		MessageId:  messageIDs,
 		AckType:    pb.CommandAck_Individual.Enum(),
 	}
 
+	if pc.options.ackWithResponse {
+		_, err := pc.client.rpcClient.RequestOnCnx(pc._getConn(), reqID, pb.BaseCommand_ACK, cmdAck)
+		if err != nil {
+			pc.log.WithError(err).Error("Ack with response error")
+			req.err = err
+		}
+		return
+	}
+
 	err := pc.client.rpcClient.RequestOnCnxNoWait(pc._getConn(), pb.BaseCommand_ACK, cmdAck)
 	if err != nil {
 		pc.log.Error("Connection was closed when request ack cmd")
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index cadd8e4..0366884 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -1321,6 +1321,41 @@ func TestRLQ(t *testing.T) {
 	assert.Nil(t, checkMsg)
 }
 
+func TestAckWithResponse(t *testing.T) {
+	now := time.Now().Unix()
+	topic01 := fmt.Sprintf("persistent://public/default/topic-%d-01", now)
+	ctx := context.Background()
+
+	client, err := NewClient(ClientOptions{URL: lookupURL})
+	assert.Nil(t, err)
+	defer client.Close()
+
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topic:                       topic01,
+		SubscriptionName:            "my-sub",
+		Type:                        Shared,
+		SubscriptionInitialPosition: SubscriptionPositionEarliest,
+		AckWithResponse:             true,
+	})
+	assert.Nil(t, err)
+	defer consumer.Close()
+
+	producer01, err := client.CreateProducer(ProducerOptions{Topic: topic01})
+	assert.Nil(t, err)
+	defer producer01.Close()
+	for i := 0; i < 10; i++ {
+		_, err = producer01.Send(ctx, &ProducerMessage{Payload: []byte(fmt.Sprintf("MSG_01_%d", i))})
+		assert.Nil(t, err)
+	}
+
+	for i := 0; i < 10; i++ {
+		msg, err := consumer.Receive(ctx)
+		assert.Nil(t, err)
+		err = consumer.Ack(msg)
+		assert.Nil(t, err)
+	}
+}
+
 func TestRLQMultiTopics(t *testing.T) {
 	now := time.Now().Unix()
 	topic01 := fmt.Sprintf("persistent://public/default/topic-%d-1", now)
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index da9a901..fa8d055 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -564,6 +564,9 @@ func (c *connection) internalReceivedCommand(cmd *pb.BaseCommand, headersAndPayl
 	case pb.BaseCommand_MESSAGE:
 		c.handleMessage(cmd.GetMessage(), headersAndPayload)
 
+	case pb.BaseCommand_ACK_RESPONSE:
+		c.handleAckResponse(cmd.GetAckResponse())
+
 	case pb.BaseCommand_PING:
 		c.handlePing()
 	case pb.BaseCommand_PONG:
@@ -676,6 +679,26 @@ func (c *connection) handleResponseError(serverError *pb.CommandError) {
 	request.callback(nil, errors.New(errMsg))
 }
 
+func (c *connection) handleAckResponse(ackResponse *pb.CommandAckResponse) {
+	requestID := ackResponse.GetRequestId()
+	consumerID := ackResponse.GetConsumerId()
+
+	request, ok := c.deletePendingRequest(requestID)
+	if !ok {
+		c.log.Warnf("AckResponse has complete when receive response! requestId : %d, consumerId : %d",
+			requestID, consumerID)
+		return
+	}
+
+	if ackResponse.GetMessage() == "" {
+		request.callback(nil, nil)
+		return
+	}
+
+	errMsg := fmt.Sprintf("ack response error: %s: %s", ackResponse.GetError(), ackResponse.GetMessage())
+	request.callback(nil, errors.New(errMsg))
+}
+
 func (c *connection) handleSendReceipt(response *pb.CommandSendReceipt) {
 	producerID := response.GetProducerId()