You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by cc...@apache.org on 2021/08/11 19:48:40 UTC
[pulsar-client-go] branch master updated: add epoch to handle
create producer timeout (#582)
This is an automated email from the ASF dual-hosted git repository.
cckellogg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 887853e add epoch to handle create producer timeout (#582)
887853e is described below
commit 887853e5c8f734bbfefce77f4444bed8877c86d0
Author: Rui Fu <fr...@users.noreply.github.com>
AuthorDate: Thu Aug 12 03:48:33 2021 +0800
add epoch to handle create producer timeout (#582)
* add epoch to producer
* fix CI
* address comments
* address comments
* update style
* better logging
---
pulsar/producer_partition.go | 24 +++++++++++++++++-------
pulsar/producer_test.go | 36 ++++++++++++++++++++++++++++++++++++
2 files changed, 53 insertions(+), 7 deletions(-)
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index abec4fc..ca6850d 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -79,6 +79,8 @@ type partitionProducer struct {
schemaInfo *SchemaInfo
partitionIdx int32
metrics *internal.TopicMetrics
+
+ epoch uint64
}
func newPartitionProducer(client *client, topic string, options *ProducerOptions, partitionIdx int,
@@ -114,6 +116,7 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions
lastSequenceID: -1,
partitionIdx: int32(partitionIdx),
metrics: metrics,
+ epoch: 0,
}
p.setProducerState(producerInit)
@@ -176,12 +179,16 @@ func (p *partitionProducer) grabCnx() error {
p.log.Debug("The partition consumer schema is nil")
}
+ userProvidedProducerName := p.producerName != ""
+
cmdProducer := &pb.CommandProducer{
- RequestId: proto.Uint64(id),
- Topic: proto.String(p.topic),
- Encrypted: nil,
- ProducerId: proto.Uint64(p.producerID),
- Schema: pbSchema,
+ RequestId: proto.Uint64(id),
+ Topic: proto.String(p.topic),
+ Encrypted: nil,
+ ProducerId: proto.Uint64(p.producerID),
+ Schema: pbSchema,
+ Epoch: proto.Uint64(atomic.LoadUint64(&p.epoch)),
+ UserProvidedProducerName: proto.Bool(userProvidedProducerName),
}
if p.producerName != "" {
@@ -230,7 +237,10 @@ func (p *partitionProducer) grabCnx() error {
}
p.cnx = res.Cnx
p.cnx.RegisterListener(p.producerID, p)
- p.log.WithField("cnx", res.Cnx.ID()).Debug("Connected producer")
+ p.log.WithFields(log.Fields{
+ "cnx": res.Cnx.ID(),
+ "epoch": atomic.LoadUint64(&p.epoch),
+ }).Debug("Connected producer")
pendingItems := p.pendingQueue.ReadableSlice()
viewSize := len(pendingItems)
@@ -298,7 +308,7 @@ func (p *partitionProducer) reconnectToBroker() {
d := backoff.Next()
p.log.Info("Reconnecting to broker in ", d)
time.Sleep(d)
-
+ atomic.AddUint64(&p.epoch, 1)
err := p.grabCnx()
if err == nil {
// Successfully reconnected
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index bbe8028..dc7a5ef 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -1127,3 +1127,39 @@ func TestProducerSendAfterClose(t *testing.T) {
assert.Nil(t, ID)
assert.Error(t, err)
}
+
+func TestExactlyOnceWithProducerNameSpecified(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: serviceURL,
+ })
+ assert.NoError(t, err)
+ defer client.Close()
+
+ topicName := newTopicName()
+
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topicName,
+ Name: "p-name-1",
+ })
+
+ assert.NoError(t, err)
+ assert.NotNil(t, producer)
+ defer producer.Close()
+
+ producer2, err := client.CreateProducer(ProducerOptions{
+ Topic: topicName,
+ Name: "p-name-2",
+ })
+
+ assert.NoError(t, err)
+ assert.NotNil(t, producer2)
+ defer producer2.Close()
+
+ producer3, err := client.CreateProducer(ProducerOptions{
+ Topic: topicName,
+ Name: "p-name-2",
+ })
+
+ assert.NotNil(t, err)
+ assert.Nil(t, producer3)
+}