You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2020/07/24 02:14:18 UTC
[pulsar-client-go] branch master updated: fix producer block (#326)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new c75aa62 fix producer block (#326)
c75aa62 is described below
commit c75aa626183fb4086a9438215e9c55fed16565d9
Author: faker <57...@qq.com>
AuthorDate: Fri Jul 24 10:14:10 2020 +0800
fix producer block (#326)
Co-authored-by: 灰柯 <hu...@tuya.com>
---
pulsar/consumer_partition.go | 4 +++-
pulsar/internal/connection.go | 2 +-
pulsar/producer_partition.go | 3 ++-
3 files changed, 6 insertions(+), 3 deletions(-)
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 0c723c8..b83cb88 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -180,7 +180,9 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
dlq: dlq,
log: log.WithField("topic", options.topic),
}
- pc.log = pc.log.WithField("name", pc.name).WithField("subscription", options.subscription)
+ pc.log = pc.log.WithField("name", pc.name).
+ WithField("subscription", options.subscription).
+ WithField("consumerID", pc.consumerID)
pc.nackTracker = newNegativeAcksTracker(pc, options.nackRedeliveryDelay)
err := pc.grabConn()
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 703c773..4be9ba2 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -636,7 +636,7 @@ func (c *connection) handleCloseConsumer(closeConsumer *pb.CommandCloseConsumer)
if consumer, ok := c.consumerHandler(consumerID); ok {
consumer.ConnectionClosed()
- delete(c.listeners, consumerID)
+ c.DeleteConsumeHandler(consumerID)
} else {
c.log.WithField("consumerID", consumerID).Warnf("Consumer with ID not found while closing consumer")
}
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 3832d69..18f22f6 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -150,7 +150,8 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions
return nil, err
}
- p.log = p.log.WithField("producer_name", p.producerName)
+ p.log = p.log.WithField("producer_name", p.producerName).
+ WithField("producerID", p.producerID)
p.log.WithField("cnx", p.cnx.ID()).Info("Created producer")
atomic.StoreInt32(&p.state, producerReady)