You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2021/08/25 06:54:37 UTC

[pulsar-client-go] branch master updated: Fix producer panic by oldProducers (#598)

This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new baaf68d  Fix producer panic by oldProducers (#598)
baaf68d is described below

commit baaf68d89bc82ab83b714c0327ea51d58d9bd37f
Author: xiaolong ran <rx...@apache.org>
AuthorDate: Wed Aug 25 14:54:31 2021 +0800

    Fix producer panic by oldProducers (#598)
    
    * Fix producer panic by oldProducers
    
    Signed-off-by: xiaolongran <xi...@tencent.com>
    
    * fix comments
    
    Signed-off-by: xiaolongran <xi...@tencent.com>
    
    * fix comments
    
    Signed-off-by: xiaolongran <xi...@tencent.com>
    
    * fix ci error
    
    Signed-off-by: xiaolongran <xi...@tencent.com>
---
 pulsar/consumer_impl.go | 16 +++++++++++++---
 pulsar/producer_impl.go | 17 ++++++++++++++---
 2 files changed, 27 insertions(+), 6 deletions(-)

diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index b7bc607..ec7ad7d 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -28,6 +28,7 @@ import (
 	"github.com/apache/pulsar-client-go/pulsar/internal"
 	pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
 	"github.com/apache/pulsar-client-go/pulsar/log"
+	"github.com/pkg/errors"
 )
 
 const defaultNackRedeliveryDelay = 1 * time.Minute
@@ -266,6 +267,13 @@ func (c *consumer) internalTopicSubscribeToPartitions() error {
 			return nil
 		}
 
+		if oldNumPartitions > newNumPartitions {
+			c.log.WithField("old_partitions", oldNumPartitions).
+				WithField("new_partitions", newNumPartitions).
+				Error("Does not support scaling down operations on topic partitions")
+			return errors.New("Does not support scaling down operations on topic partitions")
+		}
+
 		c.log.WithField("old_partitions", oldNumPartitions).
 			WithField("new_partitions", newNumPartitions).
 			Info("Changed number of partitions in topic")
@@ -273,9 +281,11 @@ func (c *consumer) internalTopicSubscribeToPartitions() error {
 
 	c.consumers = make([]*partitionConsumer, newNumPartitions)
 
-	// Copy over the existing consumer instances
-	for i := 0; i < oldNumPartitions; i++ {
-		c.consumers[i] = oldConsumers[i]
+	if oldConsumers != nil {
+		// Copy over the existing consumer instances
+		for i := 0; i < oldNumPartitions; i++ {
+			c.consumers[i] = oldConsumers[i]
+		}
 	}
 
 	type ConsumerError struct {
diff --git a/pulsar/producer_impl.go b/pulsar/producer_impl.go
index 1ffd24c..adf9b14 100644
--- a/pulsar/producer_impl.go
+++ b/pulsar/producer_impl.go
@@ -26,6 +26,7 @@ import (
 
 	"github.com/apache/pulsar-client-go/pulsar/internal"
 	"github.com/apache/pulsar-client-go/pulsar/log"
+	"github.com/pkg/errors"
 )
 
 const (
@@ -182,16 +183,26 @@ func (p *producer) internalCreatePartitionsProducers() error {
 			return nil
 		}
 
+		if oldNumPartitions > newNumPartitions {
+			p.log.WithField("old_partitions", oldNumPartitions).
+				WithField("new_partitions", newNumPartitions).
+				Error("Does not support scaling down operations on topic partitions")
+			return errors.New("Does not support scaling down operations on topic partitions")
+		}
+
 		p.log.WithField("old_partitions", oldNumPartitions).
 			WithField("new_partitions", newNumPartitions).
 			Info("Changed number of partitions in topic")
+
 	}
 
 	p.producers = make([]Producer, newNumPartitions)
 
-	// Copy over the existing consumer instances
-	for i := 0; i < oldNumPartitions; i++ {
-		p.producers[i] = oldProducers[i]
+	if oldProducers != nil {
+		// Copy over the existing consumer instances
+		for i := 0; i < oldNumPartitions; i++ {
+			p.producers[i] = oldProducers[i]
+		}
 	}
 
 	type ProducerError struct {