You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2021/08/26 08:51:26 UTC
[pulsar-client-go] branch master updated: Fix panic when scale down
partitions (#601)
This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new b684151 Fix panic when scale down partitions (#601)
b684151 is described below
commit b6841513379ea9ca503d1e350c5f93198fc2b03f
Author: xiaolong ran <rx...@apache.org>
AuthorDate: Thu Aug 26 16:51:20 2021 +0800
Fix panic when scale down partitions (#601)
Signed-off-by: xiaolongran <xi...@tencent.com>
### Motivation
When the program is running, if the business is forced to delete certain sub partitions, the following error message will be caused, that is, old_partitions is greater than new_partitions, it looks like it is doing scale down partitions, and the current code logic only deals with the scenario of scale up partitions , So if the user is forced to delete some sub partitions, the following error will be encountered:
```
level=info msg="[Changed number of partitions in topic]" new_partitions=1 old_partitions=20 topic="persistent://pulsar-xxxxxxx/xxxx/gxxxxxxxx"
```
```
panic: runtime error: index out of range [1] with length 1
goroutine 166288 [running]:
github.com/apache/pulsar-client-go/pulsar.(*producer).internalCreatePartitionsProducers(0xc0070aa6e0, 0x0, 0x0)
github.com/apache/pulsar-client-go/pulsar/producer_impl.go:194 +0x785
github.com/apache/pulsar-client-go/pulsar.(*producer).runBackgroundPartitionDiscovery.func1(0xc004167cd0, 0xc00559f5c0, 0xc006af6dc0, 0xc0070aa6e0)
github.com/apache/pulsar-client-go/pulsar/producer_impl.go:152 +0xce
created by github.com/apache/pulsar-client-go/pulsar.(*producer).runBackgroundPartitionDiscovery
github.com/apache/pulsar-client-go/pulsar/producer_impl.go:144 +0xcd
```
### Modifications
Increase the processing logic of scale down partition
---
pulsar/consumer_impl.go | 30 ++++++++++++++++++------------
pulsar/producer_impl.go | 27 +++++++++++++++------------
2 files changed, 33 insertions(+), 24 deletions(-)
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index ec7ad7d..78ae0d7 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -28,7 +28,6 @@ import (
"github.com/apache/pulsar-client-go/pulsar/internal"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/apache/pulsar-client-go/pulsar/log"
- "github.com/pkg/errors"
)
const defaultNackRedeliveryDelay = 1 * time.Minute
@@ -258,22 +257,16 @@ func (c *consumer) internalTopicSubscribeToPartitions() error {
c.Lock()
defer c.Unlock()
+
oldConsumers := c.consumers
+ oldNumPartitions = len(oldConsumers)
if oldConsumers != nil {
- oldNumPartitions = len(oldConsumers)
if oldNumPartitions == newNumPartitions {
c.log.Debug("Number of partitions in topic has not changed")
return nil
}
- if oldNumPartitions > newNumPartitions {
- c.log.WithField("old_partitions", oldNumPartitions).
- WithField("new_partitions", newNumPartitions).
- Error("Does not support scaling down operations on topic partitions")
- return errors.New("Does not support scaling down operations on topic partitions")
- }
-
c.log.WithField("old_partitions", oldNumPartitions).
WithField("new_partitions", newNumPartitions).
Info("Changed number of partitions in topic")
@@ -281,7 +274,9 @@ func (c *consumer) internalTopicSubscribeToPartitions() error {
c.consumers = make([]*partitionConsumer, newNumPartitions)
- if oldConsumers != nil {
+ // When for some reason (eg: forced deletion of sub partition) causes oldNumPartitions> newNumPartitions,
+ // we need to rebuild the cache of new consumers, otherwise the array will be out of bounds.
+ if oldConsumers != nil && oldNumPartitions < newNumPartitions {
// Copy over the existing consumer instances
for i := 0; i < oldNumPartitions; i++ {
c.consumers[i] = oldConsumers[i]
@@ -297,12 +292,19 @@ func (c *consumer) internalTopicSubscribeToPartitions() error {
receiverQueueSize := c.options.ReceiverQueueSize
metadata := c.options.Properties
+ startPartition := oldNumPartitions
partitionsToAdd := newNumPartitions - oldNumPartitions
+
+ if partitionsToAdd < 0 {
+ partitionsToAdd = newNumPartitions
+ startPartition = 0
+ }
+
var wg sync.WaitGroup
ch := make(chan ConsumerError, partitionsToAdd)
wg.Add(partitionsToAdd)
- for partitionIdx := oldNumPartitions; partitionIdx < newNumPartitions; partitionIdx++ {
+ for partitionIdx := startPartition; partitionIdx < newNumPartitions; partitionIdx++ {
partitionTopic := partitions[partitionIdx]
go func(idx int, pt string) {
@@ -366,7 +368,11 @@ func (c *consumer) internalTopicSubscribeToPartitions() error {
return err
}
- c.metrics.ConsumersPartitions.Add(float64(partitionsToAdd))
+ if newNumPartitions < oldNumPartitions {
+ c.metrics.ConsumersPartitions.Set(float64(newNumPartitions))
+ } else {
+ c.metrics.ConsumersPartitions.Add(float64(partitionsToAdd))
+ }
return nil
}
diff --git a/pulsar/producer_impl.go b/pulsar/producer_impl.go
index adf9b14..20e8d3d 100644
--- a/pulsar/producer_impl.go
+++ b/pulsar/producer_impl.go
@@ -26,7 +26,6 @@ import (
"github.com/apache/pulsar-client-go/pulsar/internal"
"github.com/apache/pulsar-client-go/pulsar/log"
- "github.com/pkg/errors"
)
const (
@@ -175,21 +174,14 @@ func (p *producer) internalCreatePartitionsProducers() error {
defer p.Unlock()
oldProducers := p.producers
+ oldNumPartitions = len(oldProducers)
if oldProducers != nil {
- oldNumPartitions = len(oldProducers)
if oldNumPartitions == newNumPartitions {
p.log.Debug("Number of partitions in topic has not changed")
return nil
}
- if oldNumPartitions > newNumPartitions {
- p.log.WithField("old_partitions", oldNumPartitions).
- WithField("new_partitions", newNumPartitions).
- Error("Does not support scaling down operations on topic partitions")
- return errors.New("Does not support scaling down operations on topic partitions")
- }
-
p.log.WithField("old_partitions", oldNumPartitions).
WithField("new_partitions", newNumPartitions).
Info("Changed number of partitions in topic")
@@ -198,7 +190,9 @@ func (p *producer) internalCreatePartitionsProducers() error {
p.producers = make([]Producer, newNumPartitions)
- if oldProducers != nil {
+ // When for some reason (eg: forced deletion of sub partition) causes oldNumPartitions> newNumPartitions,
+ // we need to rebuild the cache of new producers, otherwise the array will be out of bounds.
+ if oldProducers != nil && oldNumPartitions < newNumPartitions {
// Copy over the existing consumer instances
for i := 0; i < oldNumPartitions; i++ {
p.producers[i] = oldProducers[i]
@@ -211,10 +205,15 @@ func (p *producer) internalCreatePartitionsProducers() error {
err error
}
+ startPartition := oldNumPartitions
partitionsToAdd := newNumPartitions - oldNumPartitions
+ if partitionsToAdd < 0 {
+ partitionsToAdd = newNumPartitions
+ startPartition = 0
+ }
c := make(chan ProducerError, partitionsToAdd)
- for partitionIdx := oldNumPartitions; partitionIdx < newNumPartitions; partitionIdx++ {
+ for partitionIdx := startPartition; partitionIdx < newNumPartitions; partitionIdx++ {
partition := partitions[partitionIdx]
go func(partitionIdx int, partition string) {
@@ -248,7 +247,11 @@ func (p *producer) internalCreatePartitionsProducers() error {
return err
}
- p.metrics.ProducersPartitions.Add(float64(partitionsToAdd))
+ if newNumPartitions < oldNumPartitions {
+ p.metrics.ProducersPartitions.Set(float64(newNumPartitions))
+ } else {
+ p.metrics.ProducersPartitions.Add(float64(partitionsToAdd))
+ }
atomic.StorePointer(&p.producersPtr, unsafe.Pointer(&p.producers))
atomic.StoreUint32(&p.numPartitions, uint32(len(p.producers)))
return nil