You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/14 16:33:58 UTC
[pulsar-client-go] 21/38: Producer last sequence id
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
commit b2c9d8ca7c069e68cdb931e08ce211fba881ed0d
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Fri May 3 16:07:44 2019 -0700
Producer last sequence id
---
pulsar/impl_partition_producer.go | 7 +++++--
pulsar/producer_test.go | 31 +++++++++++++++++++++++++++++++
2 files changed, 36 insertions(+), 2 deletions(-)
diff --git a/pulsar/impl_partition_producer.go b/pulsar/impl_partition_producer.go
index b87384d..7b32fdc 100644
--- a/pulsar/impl_partition_producer.go
+++ b/pulsar/impl_partition_producer.go
@@ -8,6 +8,7 @@ import (
"pulsar-client-go-native/pulsar/impl/util"
pb "pulsar-client-go-native/pulsar/pulsar_proto"
"sync"
+ "sync/atomic"
"time"
)
@@ -39,6 +40,7 @@ type partitionProducer struct {
publishSemaphore util.Semaphore
pendingQueue util.BlockingQueue
+ lastSequenceID int64
}
const defaultBatchingMaxPublishDelay = 10 * time.Millisecond
@@ -70,6 +72,7 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions
batchFlushTicker: time.NewTicker(batchingMaxPublishDelay),
publishSemaphore: make(util.Semaphore, maxPendingMessages),
pendingQueue: util.NewBlockingQueue(maxPendingMessages),
+ lastSequenceID: -1,
}
if options.Name != "" {
@@ -313,6 +316,7 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt)
p.publishSemaphore.Release()
for _, i := range pi.sendRequests {
sr := i.(*sendRequest)
+ atomic.StoreInt64(&p.lastSequenceID, int64(pi.sequenceId))
if sr.callback != nil {
sr.callback(nil, sr.msg, nil)
}
@@ -347,8 +351,7 @@ func (p *partitionProducer) internalClose(req *closeProducer) {
}
func (p *partitionProducer) LastSequenceID() int64 {
- // TODO: return real last sequence id
- return -1
+ return atomic.LoadInt64(&p.lastSequenceID)
}
func (p *partitionProducer) Flush() error {
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index ab3d739..fa197b4 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -78,3 +78,34 @@ func TestProducerCompression(t *testing.T) {
})
}
}
+
+func TestProducerLastSequenceID(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: serviceUrl,
+ })
+ assert.NoError(t, err)
+
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: newTopicName(),
+ })
+
+ assert.NoError(t, err)
+ assert.NotNil(t, producer)
+
+ assert.Equal(t, int64(-1), producer.LastSequenceID())
+
+ for i := 0; i < 10; i++ {
+ err := producer.Send(context.Background(), &ProducerMessage{
+ Payload: []byte("hello"),
+ })
+
+ assert.NoError(t, err)
+ assert.Equal(t, int64(i), producer.LastSequenceID())
+ }
+
+ err = producer.Close()
+ assert.NoError(t, err)
+
+ err = client.Close()
+ assert.NoError(t, err)
+}
\ No newline at end of file