You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2021/08/25 06:54:37 UTC
[pulsar-client-go] branch master updated: Fix producer panic by
oldProducers (#598)
This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new baaf68d Fix producer panic by oldProducers (#598)
baaf68d is described below
commit baaf68d89bc82ab83b714c0327ea51d58d9bd37f
Author: xiaolong ran <rx...@apache.org>
AuthorDate: Wed Aug 25 14:54:31 2021 +0800
Fix producer panic by oldProducers (#598)
* Fix producer panic by oldProducers
Signed-off-by: xiaolongran <xi...@tencent.com>
* fix comments
Signed-off-by: xiaolongran <xi...@tencent.com>
* fix comments
Signed-off-by: xiaolongran <xi...@tencent.com>
* fix ci error
Signed-off-by: xiaolongran <xi...@tencent.com>
---
pulsar/consumer_impl.go | 16 +++++++++++++---
pulsar/producer_impl.go | 17 ++++++++++++++---
2 files changed, 27 insertions(+), 6 deletions(-)
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index b7bc607..ec7ad7d 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -28,6 +28,7 @@ import (
"github.com/apache/pulsar-client-go/pulsar/internal"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/apache/pulsar-client-go/pulsar/log"
+ "github.com/pkg/errors"
)
const defaultNackRedeliveryDelay = 1 * time.Minute
@@ -266,6 +267,13 @@ func (c *consumer) internalTopicSubscribeToPartitions() error {
return nil
}
+ if oldNumPartitions > newNumPartitions {
+ c.log.WithField("old_partitions", oldNumPartitions).
+ WithField("new_partitions", newNumPartitions).
+ Error("Does not support scaling down operations on topic partitions")
+ return errors.New("Does not support scaling down operations on topic partitions")
+ }
+
c.log.WithField("old_partitions", oldNumPartitions).
WithField("new_partitions", newNumPartitions).
Info("Changed number of partitions in topic")
@@ -273,9 +281,11 @@ func (c *consumer) internalTopicSubscribeToPartitions() error {
c.consumers = make([]*partitionConsumer, newNumPartitions)
- // Copy over the existing consumer instances
- for i := 0; i < oldNumPartitions; i++ {
- c.consumers[i] = oldConsumers[i]
+ if oldConsumers != nil {
+ // Copy over the existing consumer instances
+ for i := 0; i < oldNumPartitions; i++ {
+ c.consumers[i] = oldConsumers[i]
+ }
}
type ConsumerError struct {
diff --git a/pulsar/producer_impl.go b/pulsar/producer_impl.go
index 1ffd24c..adf9b14 100644
--- a/pulsar/producer_impl.go
+++ b/pulsar/producer_impl.go
@@ -26,6 +26,7 @@ import (
"github.com/apache/pulsar-client-go/pulsar/internal"
"github.com/apache/pulsar-client-go/pulsar/log"
+ "github.com/pkg/errors"
)
const (
@@ -182,16 +183,26 @@ func (p *producer) internalCreatePartitionsProducers() error {
return nil
}
+ if oldNumPartitions > newNumPartitions {
+ p.log.WithField("old_partitions", oldNumPartitions).
+ WithField("new_partitions", newNumPartitions).
+ Error("Does not support scaling down operations on topic partitions")
+ return errors.New("Does not support scaling down operations on topic partitions")
+ }
+
p.log.WithField("old_partitions", oldNumPartitions).
WithField("new_partitions", newNumPartitions).
Info("Changed number of partitions in topic")
+
}
p.producers = make([]Producer, newNumPartitions)
- // Copy over the existing consumer instances
- for i := 0; i < oldNumPartitions; i++ {
- p.producers[i] = oldProducers[i]
+ if oldProducers != nil {
+ // Copy over the existing consumer instances
+ for i := 0; i < oldNumPartitions; i++ {
+ p.producers[i] = oldProducers[i]
+ }
}
type ProducerError struct {