You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2020/07/24 02:14:18 UTC

[pulsar-client-go] branch master updated: fix producer block (#326)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new c75aa62  fix producer block (#326)
c75aa62 is described below

commit c75aa626183fb4086a9438215e9c55fed16565d9
Author: faker <57...@qq.com>
AuthorDate: Fri Jul 24 10:14:10 2020 +0800

    fix producer block (#326)
    
    Co-authored-by: 灰柯 <hu...@tuya.com>
---
 pulsar/consumer_partition.go  | 4 +++-
 pulsar/internal/connection.go | 2 +-
 pulsar/producer_partition.go  | 3 ++-
 3 files changed, 6 insertions(+), 3 deletions(-)

diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 0c723c8..b83cb88 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -180,7 +180,9 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
 		dlq:                  dlq,
 		log:                  log.WithField("topic", options.topic),
 	}
-	pc.log = pc.log.WithField("name", pc.name).WithField("subscription", options.subscription)
+	pc.log = pc.log.WithField("name", pc.name).
+		WithField("subscription", options.subscription).
+		WithField("consumerID", pc.consumerID)
 	pc.nackTracker = newNegativeAcksTracker(pc, options.nackRedeliveryDelay)
 
 	err := pc.grabConn()
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 703c773..4be9ba2 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -636,7 +636,7 @@ func (c *connection) handleCloseConsumer(closeConsumer *pb.CommandCloseConsumer)
 
 	if consumer, ok := c.consumerHandler(consumerID); ok {
 		consumer.ConnectionClosed()
-		delete(c.listeners, consumerID)
+		c.DeleteConsumeHandler(consumerID)
 	} else {
 		c.log.WithField("consumerID", consumerID).Warnf("Consumer with ID not found while closing consumer")
 	}
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 3832d69..18f22f6 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -150,7 +150,8 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions
 		return nil, err
 	}
 
-	p.log = p.log.WithField("producer_name", p.producerName)
+	p.log = p.log.WithField("producer_name", p.producerName).
+		WithField("producerID", p.producerID)
 	p.log.WithField("cnx", p.cnx.ID()).Info("Created producer")
 	atomic.StoreInt32(&p.state, producerReady)