You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/04/14 01:20:07 UTC

[GitHub] [pulsar-client-go] merlimat opened a new pull request #221: Auto update the client to handle changes in number of partitions

merlimat opened a new pull request #221: Auto update the client to handle changes in number of partitions
URL: https://github.com/apache/pulsar-client-go/pull/221
 
 
   ### Motivation
   
   Client should automatically discover new partitions as they're added.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [pulsar-client-go] jerrypeng commented on a change in pull request #221: Auto update the client to handle changes in number of partitions

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #221: Auto update the client to handle changes in number of partitions
URL: https://github.com/apache/pulsar-client-go/pull/221#discussion_r408426896
 
 

 ##########
 File path: pulsar/consumer_impl.go
 ##########
 @@ -130,56 +136,108 @@ func internalTopicSubscribe(client *client, options ConsumerOptions, topic strin
 		log:       log.WithField("topic", topic),
 	}
 
-	partitions, err := client.TopicPartitions(topic)
+	if options.Name != "" {
+		consumer.consumerName = options.Name
+	} else {
+		consumer.consumerName = generateRandomName()
+	}
+
+	err := consumer.internalTopicSubscribeToPartitions()
 	if err != nil {
 		return nil, err
 	}
 
-	numPartitions := len(partitions)
-	consumer.consumers = make([]*partitionConsumer, numPartitions)
+	// set up timer to monitor for new partitions being added
+	duration := options.AutoDiscoveryPeriod
+	if duration <= 0 {
+		duration = defaultAutoDiscoveryDuration
+	}
+	consumer.ticker = time.NewTicker(duration)
+
+	go func() {
+		for range consumer.ticker.C {
+			consumer.log.Debug("Auto discovering new partitions")
+			consumer.internalTopicSubscribeToPartitions()
+		}
+	}()
+
+	return consumer, nil
+}
+
+func (c *consumer) internalTopicSubscribeToPartitions() error {
 
 Review comment:
   I think it make sense that that methods like "Unsubscribe", "Close" and "internalTopicSubscribeToPartitions" cannot run at the same time

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [pulsar-client-go] jerrypeng commented on a change in pull request #221: Auto update the client to handle changes in number of partitions

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #221: Auto update the client to handle changes in number of partitions
URL: https://github.com/apache/pulsar-client-go/pull/221#discussion_r408421517
 
 

 ##########
 File path: pulsar/consumer_impl.go
 ##########
 @@ -130,56 +136,108 @@ func internalTopicSubscribe(client *client, options ConsumerOptions, topic strin
 		log:       log.WithField("topic", topic),
 	}
 
-	partitions, err := client.TopicPartitions(topic)
+	if options.Name != "" {
+		consumer.consumerName = options.Name
+	} else {
+		consumer.consumerName = generateRandomName()
+	}
+
+	err := consumer.internalTopicSubscribeToPartitions()
 	if err != nil {
 		return nil, err
 	}
 
-	numPartitions := len(partitions)
-	consumer.consumers = make([]*partitionConsumer, numPartitions)
+	// set up timer to monitor for new partitions being added
+	duration := options.AutoDiscoveryPeriod
+	if duration <= 0 {
+		duration = defaultAutoDiscoveryDuration
+	}
+	consumer.ticker = time.NewTicker(duration)
+
+	go func() {
+		for range consumer.ticker.C {
+			consumer.log.Debug("Auto discovering new partitions")
+			consumer.internalTopicSubscribeToPartitions()
+		}
+	}()
+
+	return consumer, nil
+}
+
+func (c *consumer) internalTopicSubscribeToPartitions() error {
+	partitions, err := c.client.TopicPartitions(c.topic)
+	if err != nil {
+		return err
+	}
+
+	oldNumPartitions := 0
+	newNumPartitions := len(partitions)
+
+	c.Lock()
+	oldConsumers := c.consumers
+	c.Unlock()
+
+	if oldConsumers != nil {
+		oldNumPartitions = len(oldConsumers)
+		if oldNumPartitions == newNumPartitions {
+			c.log.Debug("Number of partitions in topic has not changed")
+			return nil
+		}
+
+		c.log.WithField("old_partitions", oldNumPartitions).
+			WithField("new_partitions", newNumPartitions).
+			Info("Changed number of partitions in topic")
+	}
+
+	consumers := make([]*partitionConsumer, newNumPartitions)
+
+	// Copy over the existing consumer instances
+	for i := 0; i < oldNumPartitions; i++ {
 
 Review comment:
   Why isn't there a mutex here when we are setting new consumers to old consumers?  Shouldn't this whole block be in a mutux to prevent race conditions?
   
   https://github.com/apache/pulsar-client-go/pull/221/files#diff-8b65201e8d71d0f36fdcbdfc7b5d5f0aR176-R197

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [pulsar-client-go] merlimat merged pull request #221: Auto update the client to handle changes in number of partitions

Posted by GitBox <gi...@apache.org>.
merlimat merged pull request #221: Auto update the client to handle changes in number of partitions
URL: https://github.com/apache/pulsar-client-go/pull/221
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [pulsar-client-go] cckellogg commented on a change in pull request #221: Auto update the client to handle changes in number of partitions

Posted by GitBox <gi...@apache.org>.
cckellogg commented on a change in pull request #221: Auto update the client to handle changes in number of partitions
URL: https://github.com/apache/pulsar-client-go/pull/221#discussion_r407814385
 
 

 ##########
 File path: pulsar/producer_impl.go
 ##########
 @@ -73,25 +83,67 @@ func newProducer(client *client, options *ProducerOptions) (*producer, error) {
 		p.messageRouter = options.MessageRouter
 	}
 
-	partitions, err := client.TopicPartitions(options.Topic)
+	err := p.internalCreatePartitionsProducers()
 	if err != nil {
 		return nil, err
 	}
 
-	numPartitions := len(partitions)
-	p.producers = make([]Producer, numPartitions)
+	p.ticker = time.NewTicker(partitionsAutoDiscoveryInterval)
+
+	go func() {
 
 Review comment:
   Will this run for non-partitioned topics as well? Is that ok?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [pulsar-client-go] cckellogg commented on a change in pull request #221: Auto update the client to handle changes in number of partitions

Posted by GitBox <gi...@apache.org>.
cckellogg commented on a change in pull request #221: Auto update the client to handle changes in number of partitions
URL: https://github.com/apache/pulsar-client-go/pull/221#discussion_r407813506
 
 

 ##########
 File path: pulsar/consumer_impl.go
 ##########
 @@ -130,56 +135,106 @@ func internalTopicSubscribe(client *client, options ConsumerOptions, topic strin
 		log:       log.WithField("topic", topic),
 	}
 
-	partitions, err := client.TopicPartitions(topic)
+	if options.Name != "" {
+		consumer.consumerName = options.Name
+	} else {
+		consumer.consumerName = generateRandomName()
+	}
+
+	err := consumer.internalTopicSubscribeToPartitions()
 	if err != nil {
 		return nil, err
 	}
 
-	numPartitions := len(partitions)
-	consumer.consumers = make([]*partitionConsumer, numPartitions)
+	// set up timer to monitor for new partitions being added
+	duration := options.AutoDiscoveryPeriod
+	if duration <= 0 {
+		duration = defaultAutoDiscoveryDuration
+	}
+	consumer.ticker = time.NewTicker(duration)
+
+	go func() {
 
 Review comment:
   Should we only start this if the topic is partitioned?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [pulsar-client-go] jerrypeng commented on a change in pull request #221: Auto update the client to handle changes in number of partitions

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #221: Auto update the client to handle changes in number of partitions
URL: https://github.com/apache/pulsar-client-go/pull/221#discussion_r408421517
 
 

 ##########
 File path: pulsar/consumer_impl.go
 ##########
 @@ -130,56 +136,108 @@ func internalTopicSubscribe(client *client, options ConsumerOptions, topic strin
 		log:       log.WithField("topic", topic),
 	}
 
-	partitions, err := client.TopicPartitions(topic)
+	if options.Name != "" {
+		consumer.consumerName = options.Name
+	} else {
+		consumer.consumerName = generateRandomName()
+	}
+
+	err := consumer.internalTopicSubscribeToPartitions()
 	if err != nil {
 		return nil, err
 	}
 
-	numPartitions := len(partitions)
-	consumer.consumers = make([]*partitionConsumer, numPartitions)
+	// set up timer to monitor for new partitions being added
+	duration := options.AutoDiscoveryPeriod
+	if duration <= 0 {
+		duration = defaultAutoDiscoveryDuration
+	}
+	consumer.ticker = time.NewTicker(duration)
+
+	go func() {
+		for range consumer.ticker.C {
+			consumer.log.Debug("Auto discovering new partitions")
+			consumer.internalTopicSubscribeToPartitions()
+		}
+	}()
+
+	return consumer, nil
+}
+
+func (c *consumer) internalTopicSubscribeToPartitions() error {
+	partitions, err := c.client.TopicPartitions(c.topic)
+	if err != nil {
+		return err
+	}
+
+	oldNumPartitions := 0
+	newNumPartitions := len(partitions)
+
+	c.Lock()
+	oldConsumers := c.consumers
+	c.Unlock()
+
+	if oldConsumers != nil {
+		oldNumPartitions = len(oldConsumers)
+		if oldNumPartitions == newNumPartitions {
+			c.log.Debug("Number of partitions in topic has not changed")
+			return nil
+		}
+
+		c.log.WithField("old_partitions", oldNumPartitions).
+			WithField("new_partitions", newNumPartitions).
+			Info("Changed number of partitions in topic")
+	}
+
+	consumers := make([]*partitionConsumer, newNumPartitions)
+
+	// Copy over the existing consumer instances
+	for i := 0; i < oldNumPartitions; i++ {
 
 Review comment:
   Why isn't there a mutex here when we are setting new consumers to old consumers?  Shouldn't this whole block be in a mutux to prevent race conditions?
   
   https://github.com/apache/pulsar-client-go/pull/221/files#diff-8b65201e8d71d0f36fdcbdfc7b5d5f0aR176-R197

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [pulsar-client-go] jerrypeng commented on issue #221: Auto update the client to handle changes in number of partitions

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on issue #221: Auto update the client to handle changes in number of partitions
URL: https://github.com/apache/pulsar-client-go/pull/221#issuecomment-613681074
 
 
   General question about how locks are handled in Go, when multiple threads are waiting on a lock. In what order are the locks released? FIFO?  Or just everyone attempts to acquire and first one wins?  I am asking because I am wondering if there are going to be any thread starvation going on for the thread that periodically checks for new partitions especially when "sendAsync" can be called quite frequently.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [pulsar-client-go] cckellogg commented on a change in pull request #221: Auto update the client to handle changes in number of partitions

Posted by GitBox <gi...@apache.org>.
cckellogg commented on a change in pull request #221: Auto update the client to handle changes in number of partitions
URL: https://github.com/apache/pulsar-client-go/pull/221#discussion_r407814437
 
 

 ##########
 File path: pulsar/producer_impl.go
 ##########
 @@ -73,25 +83,67 @@ func newProducer(client *client, options *ProducerOptions) (*producer, error) {
 		p.messageRouter = options.MessageRouter
 	}
 
-	partitions, err := client.TopicPartitions(options.Topic)
+	err := p.internalCreatePartitionsProducers()
 	if err != nil {
 		return nil, err
 	}
 
-	numPartitions := len(partitions)
-	p.producers = make([]Producer, numPartitions)
+	p.ticker = time.NewTicker(partitionsAutoDiscoveryInterval)
+
+	go func() {
+		for range p.ticker.C {
+			p.log.Debug("Auto discovering new partitions")
+			p.internalCreatePartitionsProducers()
+		}
+	}()
+
+	return p, nil
+}
+
+func (p *producer) internalCreatePartitionsProducers() error {
+	partitions, err := p.client.TopicPartitions(p.topic)
+	if err != nil {
+		return err
+	}
+
+	oldNumPartitions := 0
+	newNumPartitions := len(partitions)
+
+	if p.producers != nil {
 
 Review comment:
   Do we need a lock for the producers?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [pulsar-client-go] merlimat commented on issue #221: Auto update the client to handle changes in number of partitions

Posted by GitBox <gi...@apache.org>.
merlimat commented on issue #221: Auto update the client to handle changes in number of partitions
URL: https://github.com/apache/pulsar-client-go/pull/221#issuecomment-613689417
 
 
   Yes, I was trying to avoid getting the extra mutex on the write path, though I think it shouldn't be too bad. 
   
   > when multiple threads are waiting on a lock. In what order are the locks released? FIFO? 
   
   I haven't really seen it specified, though a mutex always should include some sort of queuing for fairness. I'm not worried about that aspect. Also, the thread that updates the list of producers is not "critical" in the sense that it only needs to get executed eventually, with no hard time bound.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [pulsar-client-go] merlimat commented on a change in pull request #221: Auto update the client to handle changes in number of partitions

Posted by GitBox <gi...@apache.org>.
merlimat commented on a change in pull request #221: Auto update the client to handle changes in number of partitions
URL: https://github.com/apache/pulsar-client-go/pull/221#discussion_r408291331
 
 

 ##########
 File path: pulsar/consumer_impl.go
 ##########
 @@ -130,56 +135,106 @@ func internalTopicSubscribe(client *client, options ConsumerOptions, topic strin
 		log:       log.WithField("topic", topic),
 	}
 
-	partitions, err := client.TopicPartitions(topic)
+	if options.Name != "" {
+		consumer.consumerName = options.Name
+	} else {
+		consumer.consumerName = generateRandomName()
+	}
+
+	err := consumer.internalTopicSubscribeToPartitions()
 	if err != nil {
 		return nil, err
 	}
 
-	numPartitions := len(partitions)
-	consumer.consumers = make([]*partitionConsumer, numPartitions)
+	// set up timer to monitor for new partitions being added
+	duration := options.AutoDiscoveryPeriod
+	if duration <= 0 {
+		duration = defaultAutoDiscoveryDuration
+	}
+	consumer.ticker = time.NewTicker(duration)
+
+	go func() {
 
 Review comment:
   Problem is that the `TopicPartitions()` will just return a list, so we don't necessarely know if it's 1 partition or no partitions. In any case, if the topic is not partitioned, it would not be possible to create a partitioned topic with same name. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [pulsar-client-go] jerrypeng commented on a change in pull request #221: Auto update the client to handle changes in number of partitions

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #221: Auto update the client to handle changes in number of partitions
URL: https://github.com/apache/pulsar-client-go/pull/221#discussion_r408426183
 
 

 ##########
 File path: pulsar/consumer_impl.go
 ##########
 @@ -130,56 +136,108 @@ func internalTopicSubscribe(client *client, options ConsumerOptions, topic strin
 		log:       log.WithField("topic", topic),
 	}
 
-	partitions, err := client.TopicPartitions(topic)
+	if options.Name != "" {
+		consumer.consumerName = options.Name
+	} else {
+		consumer.consumerName = generateRandomName()
+	}
+
+	err := consumer.internalTopicSubscribeToPartitions()
 	if err != nil {
 		return nil, err
 	}
 
-	numPartitions := len(partitions)
-	consumer.consumers = make([]*partitionConsumer, numPartitions)
+	// set up timer to monitor for new partitions being added
+	duration := options.AutoDiscoveryPeriod
+	if duration <= 0 {
+		duration = defaultAutoDiscoveryDuration
+	}
+	consumer.ticker = time.NewTicker(duration)
+
+	go func() {
+		for range consumer.ticker.C {
+			consumer.log.Debug("Auto discovering new partitions")
+			consumer.internalTopicSubscribeToPartitions()
+		}
+	}()
+
+	return consumer, nil
+}
+
+func (c *consumer) internalTopicSubscribeToPartitions() error {
 
 Review comment:
   Shouldn't this whole method be under a mutex instead of only where c.consumers is be in set?  There could be race conditions if "Unsubscribe", "Close", etc are being called while this method is running.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [pulsar-client-go] merlimat commented on a change in pull request #221: Auto update the client to handle changes in number of partitions

Posted by GitBox <gi...@apache.org>.
merlimat commented on a change in pull request #221: Auto update the client to handle changes in number of partitions
URL: https://github.com/apache/pulsar-client-go/pull/221#discussion_r408358775
 
 

 ##########
 File path: pulsar/producer_impl.go
 ##########
 @@ -73,25 +83,67 @@ func newProducer(client *client, options *ProducerOptions) (*producer, error) {
 		p.messageRouter = options.MessageRouter
 	}
 
-	partitions, err := client.TopicPartitions(options.Topic)
+	err := p.internalCreatePartitionsProducers()
 	if err != nil {
 		return nil, err
 	}
 
-	numPartitions := len(partitions)
-	p.producers = make([]Producer, numPartitions)
+	p.ticker = time.NewTicker(partitionsAutoDiscoveryInterval)
+
+	go func() {
+		for range p.ticker.C {
+			p.log.Debug("Auto discovering new partitions")
+			p.internalCreatePartitionsProducers()
+		}
+	}()
+
+	return p, nil
+}
+
+func (p *producer) internalCreatePartitionsProducers() error {
+	partitions, err := p.client.TopicPartitions(p.topic)
+	if err != nil {
+		return err
+	}
+
+	oldNumPartitions := 0
+	newNumPartitions := len(partitions)
+
+	if p.producers != nil {
 
 Review comment:
   Added

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [pulsar-client-go] merlimat commented on a change in pull request #221: Auto update the client to handle changes in number of partitions

Posted by GitBox <gi...@apache.org>.
merlimat commented on a change in pull request #221: Auto update the client to handle changes in number of partitions
URL: https://github.com/apache/pulsar-client-go/pull/221#discussion_r408291912
 
 

 ##########
 File path: pulsar/consumer_impl.go
 ##########
 @@ -130,56 +135,106 @@ func internalTopicSubscribe(client *client, options ConsumerOptions, topic strin
 		log:       log.WithField("topic", topic),
 	}
 
-	partitions, err := client.TopicPartitions(topic)
+	if options.Name != "" {
+		consumer.consumerName = options.Name
+	} else {
+		consumer.consumerName = generateRandomName()
+	}
+
+	err := consumer.internalTopicSubscribeToPartitions()
 	if err != nil {
 		return nil, err
 	}
 
-	numPartitions := len(partitions)
-	consumer.consumers = make([]*partitionConsumer, numPartitions)
+	// set up timer to monitor for new partitions being added
+	duration := options.AutoDiscoveryPeriod
+	if duration <= 0 {
+		duration = defaultAutoDiscoveryDuration
+	}
+	consumer.ticker = time.NewTicker(duration)
+
+	go func() {
+		for range consumer.ticker.C {
+			consumer.log.Debug("Auto discovering new partitions")
+			consumer.internalTopicSubscribeToPartitions()
+		}
+	}()
+
+	return consumer, nil
+}
+
+func (c *consumer) internalTopicSubscribeToPartitions() error {
+	partitions, err := c.client.TopicPartitions(c.topic)
+	if err != nil {
+		return err
+	}
+
+	oldNumPartitions := 0
+	newNumPartitions := len(partitions)
+
+	if c.consumers != nil {
 
 Review comment:
   Good point, the changes to the `c.consumers` and `p.producers` should be protected by mutex to avoid races.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [pulsar-client-go] jerrypeng commented on a change in pull request #221: Auto update the client to handle changes in number of partitions

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #221: Auto update the client to handle changes in number of partitions
URL: https://github.com/apache/pulsar-client-go/pull/221#discussion_r408421517
 
 

 ##########
 File path: pulsar/consumer_impl.go
 ##########
 @@ -130,56 +136,108 @@ func internalTopicSubscribe(client *client, options ConsumerOptions, topic strin
 		log:       log.WithField("topic", topic),
 	}
 
-	partitions, err := client.TopicPartitions(topic)
+	if options.Name != "" {
+		consumer.consumerName = options.Name
+	} else {
+		consumer.consumerName = generateRandomName()
+	}
+
+	err := consumer.internalTopicSubscribeToPartitions()
 	if err != nil {
 		return nil, err
 	}
 
-	numPartitions := len(partitions)
-	consumer.consumers = make([]*partitionConsumer, numPartitions)
+	// set up timer to monitor for new partitions being added
+	duration := options.AutoDiscoveryPeriod
+	if duration <= 0 {
+		duration = defaultAutoDiscoveryDuration
+	}
+	consumer.ticker = time.NewTicker(duration)
+
+	go func() {
+		for range consumer.ticker.C {
+			consumer.log.Debug("Auto discovering new partitions")
+			consumer.internalTopicSubscribeToPartitions()
+		}
+	}()
+
+	return consumer, nil
+}
+
+func (c *consumer) internalTopicSubscribeToPartitions() error {
+	partitions, err := c.client.TopicPartitions(c.topic)
+	if err != nil {
+		return err
+	}
+
+	oldNumPartitions := 0
+	newNumPartitions := len(partitions)
+
+	c.Lock()
+	oldConsumers := c.consumers
+	c.Unlock()
+
+	if oldConsumers != nil {
+		oldNumPartitions = len(oldConsumers)
+		if oldNumPartitions == newNumPartitions {
+			c.log.Debug("Number of partitions in topic has not changed")
+			return nil
+		}
+
+		c.log.WithField("old_partitions", oldNumPartitions).
+			WithField("new_partitions", newNumPartitions).
+			Info("Changed number of partitions in topic")
+	}
+
+	consumers := make([]*partitionConsumer, newNumPartitions)
+
+	// Copy over the existing consumer instances
+	for i := 0; i < oldNumPartitions; i++ {
 
 Review comment:
   Why isn't there a mutex here when we are setting new consumers to old consumers?  Should this whole block in a mutux?
   
   https://github.com/apache/pulsar-client-go/pull/221/files#diff-8b65201e8d71d0f36fdcbdfc7b5d5f0aR176-R197

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [pulsar-client-go] cckellogg commented on a change in pull request #221: Auto update the client to handle changes in number of partitions

Posted by GitBox <gi...@apache.org>.
cckellogg commented on a change in pull request #221: Auto update the client to handle changes in number of partitions
URL: https://github.com/apache/pulsar-client-go/pull/221#discussion_r407814070
 
 

 ##########
 File path: pulsar/consumer_impl.go
 ##########
 @@ -130,56 +135,106 @@ func internalTopicSubscribe(client *client, options ConsumerOptions, topic strin
 		log:       log.WithField("topic", topic),
 	}
 
-	partitions, err := client.TopicPartitions(topic)
+	if options.Name != "" {
+		consumer.consumerName = options.Name
+	} else {
+		consumer.consumerName = generateRandomName()
+	}
+
+	err := consumer.internalTopicSubscribeToPartitions()
 	if err != nil {
 		return nil, err
 	}
 
-	numPartitions := len(partitions)
-	consumer.consumers = make([]*partitionConsumer, numPartitions)
+	// set up timer to monitor for new partitions being added
+	duration := options.AutoDiscoveryPeriod
+	if duration <= 0 {
+		duration = defaultAutoDiscoveryDuration
+	}
+	consumer.ticker = time.NewTicker(duration)
+
+	go func() {
+		for range consumer.ticker.C {
+			consumer.log.Debug("Auto discovering new partitions")
+			consumer.internalTopicSubscribeToPartitions()
+		}
+	}()
+
+	return consumer, nil
+}
+
+func (c *consumer) internalTopicSubscribeToPartitions() error {
+	partitions, err := c.client.TopicPartitions(c.topic)
+	if err != nil {
+		return err
+	}
+
+	oldNumPartitions := 0
+	newNumPartitions := len(partitions)
+
+	if c.consumers != nil {
 
 Review comment:
   do we need a lock for c.consumers? We use this for acking/nacking messages right?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [pulsar-client-go] merlimat commented on a change in pull request #221: Auto update the client to handle changes in number of partitions

Posted by GitBox <gi...@apache.org>.
merlimat commented on a change in pull request #221: Auto update the client to handle changes in number of partitions
URL: https://github.com/apache/pulsar-client-go/pull/221#discussion_r408430573
 
 

 ##########
 File path: pulsar/consumer_impl.go
 ##########
 @@ -130,56 +136,108 @@ func internalTopicSubscribe(client *client, options ConsumerOptions, topic strin
 		log:       log.WithField("topic", topic),
 	}
 
-	partitions, err := client.TopicPartitions(topic)
+	if options.Name != "" {
+		consumer.consumerName = options.Name
+	} else {
+		consumer.consumerName = generateRandomName()
+	}
+
+	err := consumer.internalTopicSubscribeToPartitions()
 	if err != nil {
 		return nil, err
 	}
 
-	numPartitions := len(partitions)
-	consumer.consumers = make([]*partitionConsumer, numPartitions)
+	// set up timer to monitor for new partitions being added
+	duration := options.AutoDiscoveryPeriod
+	if duration <= 0 {
+		duration = defaultAutoDiscoveryDuration
+	}
+	consumer.ticker = time.NewTicker(duration)
+
+	go func() {
+		for range consumer.ticker.C {
+			consumer.log.Debug("Auto discovering new partitions")
+			consumer.internalTopicSubscribeToPartitions()
+		}
+	}()
+
+	return consumer, nil
+}
+
+func (c *consumer) internalTopicSubscribeToPartitions() error {
 
 Review comment:
   Sure, that makes sense.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [pulsar-client-go] jerrypeng commented on a change in pull request #221: Auto update the client to handle changes in number of partitions

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #221: Auto update the client to handle changes in number of partitions
URL: https://github.com/apache/pulsar-client-go/pull/221#discussion_r408430663
 
 

 ##########
 File path: pulsar/producer_impl.go
 ##########
 @@ -73,25 +87,69 @@ func newProducer(client *client, options *ProducerOptions) (*producer, error) {
 		p.messageRouter = options.MessageRouter
 	}
 
-	partitions, err := client.TopicPartitions(options.Topic)
+	err := p.internalCreatePartitionsProducers()
 	if err != nil {
 		return nil, err
 	}
 
-	numPartitions := len(partitions)
-	p.producers = make([]Producer, numPartitions)
+	p.ticker = time.NewTicker(partitionsAutoDiscoveryInterval)
+
+	go func() {
+		for range p.ticker.C {
+			p.log.Debug("Auto discovering new partitions")
+			p.internalCreatePartitionsProducers()
+		}
+	}()
+
+	return p, nil
+}
+
+func (p *producer) internalCreatePartitionsProducers() error {
 
 Review comment:
   Same comment as consumers

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [pulsar-client-go] merlimat commented on a change in pull request #221: Auto update the client to handle changes in number of partitions

Posted by GitBox <gi...@apache.org>.
merlimat commented on a change in pull request #221: Auto update the client to handle changes in number of partitions
URL: https://github.com/apache/pulsar-client-go/pull/221#discussion_r408433389
 
 

 ##########
 File path: pulsar/consumer_impl.go
 ##########
 @@ -130,56 +136,108 @@ func internalTopicSubscribe(client *client, options ConsumerOptions, topic strin
 		log:       log.WithField("topic", topic),
 	}
 
-	partitions, err := client.TopicPartitions(topic)
+	if options.Name != "" {
+		consumer.consumerName = options.Name
+	} else {
+		consumer.consumerName = generateRandomName()
+	}
+
+	err := consumer.internalTopicSubscribeToPartitions()
 	if err != nil {
 		return nil, err
 	}
 
-	numPartitions := len(partitions)
-	consumer.consumers = make([]*partitionConsumer, numPartitions)
+	// set up timer to monitor for new partitions being added
+	duration := options.AutoDiscoveryPeriod
+	if duration <= 0 {
+		duration = defaultAutoDiscoveryDuration
+	}
+	consumer.ticker = time.NewTicker(duration)
+
+	go func() {
+		for range consumer.ticker.C {
+			consumer.log.Debug("Auto discovering new partitions")
+			consumer.internalTopicSubscribeToPartitions()
+		}
+	}()
+
+	return consumer, nil
+}
+
+func (c *consumer) internalTopicSubscribeToPartitions() error {
 
 Review comment:
   Updated

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services