You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/14 16:33:58 UTC

[pulsar-client-go] 21/38: Producer last sequence id

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git

commit b2c9d8ca7c069e68cdb931e08ce211fba881ed0d
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Fri May 3 16:07:44 2019 -0700

    Producer last sequence id
---
 pulsar/impl_partition_producer.go |  7 +++++--
 pulsar/producer_test.go           | 31 +++++++++++++++++++++++++++++++
 2 files changed, 36 insertions(+), 2 deletions(-)

diff --git a/pulsar/impl_partition_producer.go b/pulsar/impl_partition_producer.go
index b87384d..7b32fdc 100644
--- a/pulsar/impl_partition_producer.go
+++ b/pulsar/impl_partition_producer.go
@@ -8,6 +8,7 @@ import (
 	"pulsar-client-go-native/pulsar/impl/util"
 	pb "pulsar-client-go-native/pulsar/pulsar_proto"
 	"sync"
+	"sync/atomic"
 	"time"
 )
 
@@ -39,6 +40,7 @@ type partitionProducer struct {
 
 	publishSemaphore util.Semaphore
 	pendingQueue     util.BlockingQueue
+	lastSequenceID   int64
 }
 
 const defaultBatchingMaxPublishDelay = 10 * time.Millisecond
@@ -70,6 +72,7 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions
 		batchFlushTicker: time.NewTicker(batchingMaxPublishDelay),
 		publishSemaphore: make(util.Semaphore, maxPendingMessages),
 		pendingQueue:     util.NewBlockingQueue(maxPendingMessages),
+		lastSequenceID:   -1,
 	}
 
 	if options.Name != "" {
@@ -313,6 +316,7 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt)
 	p.publishSemaphore.Release()
 	for _, i := range pi.sendRequests {
 		sr := i.(*sendRequest)
+		atomic.StoreInt64(&p.lastSequenceID, int64(pi.sequenceId))
 		if sr.callback != nil {
 			sr.callback(nil, sr.msg, nil)
 		}
@@ -347,8 +351,7 @@ func (p *partitionProducer) internalClose(req *closeProducer) {
 }
 
 func (p *partitionProducer) LastSequenceID() int64 {
-	// TODO: return real last sequence id
-	return -1
+	return atomic.LoadInt64(&p.lastSequenceID)
 }
 
 func (p *partitionProducer) Flush() error {
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index ab3d739..fa197b4 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -78,3 +78,34 @@ func TestProducerCompression(t *testing.T) {
 		})
 	}
 }
+
+func TestProducerLastSequenceID(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: serviceUrl,
+	})
+	assert.NoError(t, err)
+
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic: newTopicName(),
+	})
+
+	assert.NoError(t, err)
+	assert.NotNil(t, producer)
+
+	assert.Equal(t, int64(-1), producer.LastSequenceID())
+
+	for i := 0; i < 10; i++ {
+		err := producer.Send(context.Background(), &ProducerMessage{
+			Payload: []byte("hello"),
+		})
+
+		assert.NoError(t, err)
+		assert.Equal(t, int64(i), producer.LastSequenceID())
+	}
+
+	err = producer.Close()
+	assert.NoError(t, err)
+
+	err = client.Close()
+	assert.NoError(t, err)
+}
\ No newline at end of file