You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2021/08/26 08:51:26 UTC

[pulsar-client-go] branch master updated: Fix panic when scale down partitions (#601)

This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new b684151  Fix panic when scale down partitions (#601)
b684151 is described below

commit b6841513379ea9ca503d1e350c5f93198fc2b03f
Author: xiaolong ran <rx...@apache.org>
AuthorDate: Thu Aug 26 16:51:20 2021 +0800

    Fix panic when scale down partitions (#601)
    
    Signed-off-by: xiaolongran <xi...@tencent.com>
    
    
    
    ### Motivation
    
    When the program is running, if the business is forced to delete certain sub partitions, the following error message will be caused, that is, old_partitions is greater than new_partitions, it looks like it is doing scale down partitions, and the current code logic only deals with the scenario of scale up partitions , So if the user is forced to delete some sub partitions, the following error will be encountered:
    
    ```
    level=info msg="[Changed number of partitions in topic]" new_partitions=1 old_partitions=20 topic="persistent://pulsar-xxxxxxx/xxxx/gxxxxxxxx"
    ```
    
    ```
    panic: runtime error: index out of range [1] with length 1
    
    goroutine 166288 [running]:
    github.com/apache/pulsar-client-go/pulsar.(*producer).internalCreatePartitionsProducers(0xc0070aa6e0, 0x0, 0x0)
            github.com/apache/pulsar-client-go/pulsar/producer_impl.go:194 +0x785
    github.com/apache/pulsar-client-go/pulsar.(*producer).runBackgroundPartitionDiscovery.func1(0xc004167cd0, 0xc00559f5c0, 0xc006af6dc0, 0xc0070aa6e0)
           github.com/apache/pulsar-client-go/pulsar/producer_impl.go:152 +0xce
    created by github.com/apache/pulsar-client-go/pulsar.(*producer).runBackgroundPartitionDiscovery
           github.com/apache/pulsar-client-go/pulsar/producer_impl.go:144 +0xcd
    ```
    
    ### Modifications
    
    Increase the processing logic of scale down partition
---
 pulsar/consumer_impl.go | 30 ++++++++++++++++++------------
 pulsar/producer_impl.go | 27 +++++++++++++++------------
 2 files changed, 33 insertions(+), 24 deletions(-)

diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index ec7ad7d..78ae0d7 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -28,7 +28,6 @@ import (
 	"github.com/apache/pulsar-client-go/pulsar/internal"
 	pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
 	"github.com/apache/pulsar-client-go/pulsar/log"
-	"github.com/pkg/errors"
 )
 
 const defaultNackRedeliveryDelay = 1 * time.Minute
@@ -258,22 +257,16 @@ func (c *consumer) internalTopicSubscribeToPartitions() error {
 
 	c.Lock()
 	defer c.Unlock()
+
 	oldConsumers := c.consumers
+	oldNumPartitions = len(oldConsumers)
 
 	if oldConsumers != nil {
-		oldNumPartitions = len(oldConsumers)
 		if oldNumPartitions == newNumPartitions {
 			c.log.Debug("Number of partitions in topic has not changed")
 			return nil
 		}
 
-		if oldNumPartitions > newNumPartitions {
-			c.log.WithField("old_partitions", oldNumPartitions).
-				WithField("new_partitions", newNumPartitions).
-				Error("Does not support scaling down operations on topic partitions")
-			return errors.New("Does not support scaling down operations on topic partitions")
-		}
-
 		c.log.WithField("old_partitions", oldNumPartitions).
 			WithField("new_partitions", newNumPartitions).
 			Info("Changed number of partitions in topic")
@@ -281,7 +274,9 @@ func (c *consumer) internalTopicSubscribeToPartitions() error {
 
 	c.consumers = make([]*partitionConsumer, newNumPartitions)
 
-	if oldConsumers != nil {
+	// When for some reason (eg: forced deletion of sub partition) causes oldNumPartitions> newNumPartitions,
+	// we need to rebuild the cache of new consumers, otherwise the array will be out of bounds.
+	if oldConsumers != nil && oldNumPartitions < newNumPartitions {
 		// Copy over the existing consumer instances
 		for i := 0; i < oldNumPartitions; i++ {
 			c.consumers[i] = oldConsumers[i]
@@ -297,12 +292,19 @@ func (c *consumer) internalTopicSubscribeToPartitions() error {
 	receiverQueueSize := c.options.ReceiverQueueSize
 	metadata := c.options.Properties
 
+	startPartition := oldNumPartitions
 	partitionsToAdd := newNumPartitions - oldNumPartitions
+
+	if partitionsToAdd < 0 {
+		partitionsToAdd = newNumPartitions
+		startPartition = 0
+	}
+
 	var wg sync.WaitGroup
 	ch := make(chan ConsumerError, partitionsToAdd)
 	wg.Add(partitionsToAdd)
 
-	for partitionIdx := oldNumPartitions; partitionIdx < newNumPartitions; partitionIdx++ {
+	for partitionIdx := startPartition; partitionIdx < newNumPartitions; partitionIdx++ {
 		partitionTopic := partitions[partitionIdx]
 
 		go func(idx int, pt string) {
@@ -366,7 +368,11 @@ func (c *consumer) internalTopicSubscribeToPartitions() error {
 		return err
 	}
 
-	c.metrics.ConsumersPartitions.Add(float64(partitionsToAdd))
+	if newNumPartitions < oldNumPartitions {
+		c.metrics.ConsumersPartitions.Set(float64(newNumPartitions))
+	} else {
+		c.metrics.ConsumersPartitions.Add(float64(partitionsToAdd))
+	}
 	return nil
 }
 
diff --git a/pulsar/producer_impl.go b/pulsar/producer_impl.go
index adf9b14..20e8d3d 100644
--- a/pulsar/producer_impl.go
+++ b/pulsar/producer_impl.go
@@ -26,7 +26,6 @@ import (
 
 	"github.com/apache/pulsar-client-go/pulsar/internal"
 	"github.com/apache/pulsar-client-go/pulsar/log"
-	"github.com/pkg/errors"
 )
 
 const (
@@ -175,21 +174,14 @@ func (p *producer) internalCreatePartitionsProducers() error {
 	defer p.Unlock()
 
 	oldProducers := p.producers
+	oldNumPartitions = len(oldProducers)
 
 	if oldProducers != nil {
-		oldNumPartitions = len(oldProducers)
 		if oldNumPartitions == newNumPartitions {
 			p.log.Debug("Number of partitions in topic has not changed")
 			return nil
 		}
 
-		if oldNumPartitions > newNumPartitions {
-			p.log.WithField("old_partitions", oldNumPartitions).
-				WithField("new_partitions", newNumPartitions).
-				Error("Does not support scaling down operations on topic partitions")
-			return errors.New("Does not support scaling down operations on topic partitions")
-		}
-
 		p.log.WithField("old_partitions", oldNumPartitions).
 			WithField("new_partitions", newNumPartitions).
 			Info("Changed number of partitions in topic")
@@ -198,7 +190,9 @@ func (p *producer) internalCreatePartitionsProducers() error {
 
 	p.producers = make([]Producer, newNumPartitions)
 
-	if oldProducers != nil {
+	// When for some reason (eg: forced deletion of sub partition) causes oldNumPartitions> newNumPartitions,
+	// we need to rebuild the cache of new producers, otherwise the array will be out of bounds.
+	if oldProducers != nil && oldNumPartitions < newNumPartitions {
 		// Copy over the existing consumer instances
 		for i := 0; i < oldNumPartitions; i++ {
 			p.producers[i] = oldProducers[i]
@@ -211,10 +205,15 @@ func (p *producer) internalCreatePartitionsProducers() error {
 		err       error
 	}
 
+	startPartition := oldNumPartitions
 	partitionsToAdd := newNumPartitions - oldNumPartitions
+	if partitionsToAdd < 0 {
+		partitionsToAdd = newNumPartitions
+		startPartition = 0
+	}
 	c := make(chan ProducerError, partitionsToAdd)
 
-	for partitionIdx := oldNumPartitions; partitionIdx < newNumPartitions; partitionIdx++ {
+	for partitionIdx := startPartition; partitionIdx < newNumPartitions; partitionIdx++ {
 		partition := partitions[partitionIdx]
 
 		go func(partitionIdx int, partition string) {
@@ -248,7 +247,11 @@ func (p *producer) internalCreatePartitionsProducers() error {
 		return err
 	}
 
-	p.metrics.ProducersPartitions.Add(float64(partitionsToAdd))
+	if newNumPartitions < oldNumPartitions {
+		p.metrics.ProducersPartitions.Set(float64(newNumPartitions))
+	} else {
+		p.metrics.ProducersPartitions.Add(float64(partitionsToAdd))
+	}
 	atomic.StorePointer(&p.producersPtr, unsafe.Pointer(&p.producers))
 	atomic.StoreUint32(&p.numPartitions, uint32(len(p.producers)))
 	return nil