You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/12/29 06:47:49 UTC

[GitHub] [pulsar-client-go] EAHITechnology opened a new pull request #696: Add parition reader to solve the Reader fragmented read data.

EAHITechnology opened a new pull request #696:
URL: https://github.com/apache/pulsar-client-go/pull/696


   - fix issues #553 [detail](https://github.com/apache/pulsar-client-go/issues/553)
   
   list:
   - add partition reader file; /pulsar/reader_partition.go
   - update reader Impl /pulsar/reader_impl.go ;
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] EAHITechnology commented on a change in pull request #696: Add parition reader to solve the Reader fragmented read data.

Posted by GitBox <gi...@apache.org>.
EAHITechnology commented on a change in pull request #696:
URL: https://github.com/apache/pulsar-client-go/pull/696#discussion_r777798166



##########
File path: pulsar/reader_impl.go
##########
@@ -207,12 +191,15 @@ func (r *reader) Seek(msgID MessageID) error {
 		return nil
 	}
 
-	return r.pc.Seek(mid)
+	return r.consumers[mid.partitionIdx].Seek(mid)

Review comment:
       why?The number of Consumers is determined by the TopicPartitions() function. If there is a problem here, then the Patition_Consumer is also problematic.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] EAHITechnology commented on a change in pull request #696: Add parition reader to solve the Reader fragmented read data.

Posted by GitBox <gi...@apache.org>.
EAHITechnology commented on a change in pull request #696:
URL: https://github.com/apache/pulsar-client-go/pull/696#discussion_r778612324



##########
File path: pulsar/reader_impl.go
##########
@@ -207,12 +191,15 @@ func (r *reader) Seek(msgID MessageID) error {
 		return nil
 	}
 
-	return r.pc.Seek(mid)
+	return r.consumers[mid.partitionIdx].Seek(mid)

Review comment:
       @cckellogg 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wolfstudy commented on pull request #696: Add parition reader to solve the Reader fragmented read data.

Posted by GitBox <gi...@apache.org>.
wolfstudy commented on pull request #696:
URL: https://github.com/apache/pulsar-client-go/pull/696#issuecomment-1002635291


   @EAHITechnology Nick work, can you add test case for this feature?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] EAHITechnology commented on a change in pull request #696: Add parition reader to solve the Reader fragmented read data.

Posted by GitBox <gi...@apache.org>.
EAHITechnology commented on a change in pull request #696:
URL: https://github.com/apache/pulsar-client-go/pull/696#discussion_r784657691



##########
File path: pulsar/reader_partition.go
##########
@@ -0,0 +1,142 @@
+package pulsar

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] EAHITechnology commented on a change in pull request #696: Add parition reader to solve the Reader fragmented read data.

Posted by GitBox <gi...@apache.org>.
EAHITechnology commented on a change in pull request #696:
URL: https://github.com/apache/pulsar-client-go/pull/696#discussion_r777805044



##########
File path: pulsar/reader_impl.go
##########
@@ -148,35 +113,54 @@ func (r *reader) HasNext() bool {
 		return true
 	}
 
+retryLoop:
 	for {
-		lastMsgID, err := r.pc.getLastMessageID()
-		if err != nil {
-			r.log.WithError(err).Error("Failed to get last message id from broker")
-			continue
-		} else {
+	consumerLoop:
+		for _, consumer := range r.consumers {
+			lastMsgID, err := consumer.getLastMessageID()
+			if err != nil {
+				r.log.WithError(err).Error("Failed to get last message id from broker")
+				continue retryLoop
+			}
+			if r.lastMessageInBroker.greater(lastMsgID.messageID) {
+				continue consumerLoop
+			}
 			r.lastMessageInBroker = lastMsgID
-			break
 		}
+		break retryLoop
 	}
 
 	return r.hasMoreMessages()
 }
 
 func (r *reader) hasMoreMessages() bool {
-	if !r.pc.lastDequeuedMsg.Undefined() {
-		return r.lastMessageInBroker.isEntryIDValid() && r.lastMessageInBroker.greater(r.pc.lastDequeuedMsg.messageID)
-	}
+	moreMessagesCheck := func(idx int) bool {
+		if !r.consumers[idx].lastDequeuedMsg.Undefined() {
+			return r.lastMessageInBroker.isEntryIDValid() && r.lastMessageInBroker.greater(r.consumers[idx].lastDequeuedMsg.messageID)
+		}
 
-	if r.pc.options.startMessageIDInclusive {
-		return r.lastMessageInBroker.isEntryIDValid() && r.lastMessageInBroker.greaterEqual(r.pc.startMessageID.messageID)
-	}
+		if r.consumers[idx].options.startMessageIDInclusive {
+			return r.lastMessageInBroker.isEntryIDValid() && r.lastMessageInBroker.greaterEqual(r.consumers[idx].startMessageID.messageID)
+		}
 
-	// Non-inclusive
-	return r.lastMessageInBroker.isEntryIDValid() && r.lastMessageInBroker.greater(r.pc.startMessageID.messageID)
+		// Non-inclusive
+		return r.lastMessageInBroker.isEntryIDValid() && r.lastMessageInBroker.greater(r.consumers[idx].startMessageID.messageID)
+	}
+	for idx := range r.consumers {
+		if moreMessagesCheck(idx) {

Review comment:
       Done.It sounds pretty good




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] EAHITechnology commented on a change in pull request #696: Add parition reader to solve the Reader fragmented read data.

Posted by GitBox <gi...@apache.org>.
EAHITechnology commented on a change in pull request #696:
URL: https://github.com/apache/pulsar-client-go/pull/696#discussion_r780786317



##########
File path: pulsar/reader_partition.go
##########
@@ -0,0 +1,142 @@
+package pulsar
+
+import (
+	"sync"
+	"time"
+)
+
+func (r *reader) internalTopicReadToPartitions() error {
+	partitions, err := r.client.TopicPartitions(r.topic)
+	if err != nil {
+		return err
+	}
+
+	oldNumPartitions, newNumPartitions := 0, len(partitions)
+
+	r.Lock()
+	defer r.Unlock()
+
+	oldReaders, oldNumPartitions := r.consumers, len(r.consumers)
+	if oldReaders != nil {
+		if oldNumPartitions == newNumPartitions {
+			r.log.Debug("Number of partitions in topic has not changed")
+			return nil
+		}
+
+		r.log.WithField("old_partitions", oldNumPartitions).
+			WithField("new_partitions", newNumPartitions).
+			Info("Changed number of partitions in topic")
+	}
+
+	r.consumers = make([]*partitionConsumer, newNumPartitions)
+
+	// When for some reason (eg: forced deletion of sub partition) causes oldNumPartitions> newNumPartitions,
+	// we need to rebuild the cache of new consumers, otherwise the array will be out of bounds.
+	if oldReaders != nil && oldNumPartitions < newNumPartitions {
+		// Copy over the existing consumer instances
+		for i := 0; i < oldNumPartitions; i++ {
+			r.consumers[i] = oldReaders[i]
+		}
+	}
+
+	type ConsumerError struct {
+		err       error
+		partition int
+		consumer  *partitionConsumer
+	}
+
+	subscriptionName := r.options.SubscriptionRolePrefix
+	if subscriptionName == "" {
+		subscriptionName = "reader"

Review comment:
       ok




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] EAHITechnology commented on a change in pull request #696: Add parition reader to solve the Reader fragmented read data.

Posted by GitBox <gi...@apache.org>.
EAHITechnology commented on a change in pull request #696:
URL: https://github.com/apache/pulsar-client-go/pull/696#discussion_r777806923



##########
File path: pulsar/reader_partition.go
##########
@@ -0,0 +1,142 @@
+package pulsar
+
+import (
+	"sync"
+	"time"
+)
+
+func (r *reader) internalTopicReadToPartitions() error {

Review comment:
       What's more difficult is that reader requires a lot of specific parameters. After thinking about it, I decided to encapsulate directly. If not, this will swell the member variables of the reader.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] EAHITechnology commented on a change in pull request #696: Add parition reader to solve the Reader fragmented read data.

Posted by GitBox <gi...@apache.org>.
EAHITechnology commented on a change in pull request #696:
URL: https://github.com/apache/pulsar-client-go/pull/696#discussion_r777805854



##########
File path: pulsar/reader_partition.go
##########
@@ -0,0 +1,142 @@
+package pulsar
+
+import (
+	"sync"
+	"time"
+)
+
+func (r *reader) internalTopicReadToPartitions() error {

Review comment:
       According to issues, we need to subscribe to partition_consumer separately. The meaning of this function is to convert topic into partition_topic for subscription




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wolfstudy commented on pull request #696: Add parition reader to solve the Reader fragmented read data.

Posted by GitBox <gi...@apache.org>.
wolfstudy commented on pull request #696:
URL: https://github.com/apache/pulsar-client-go/pull/696#issuecomment-1006493330


   @EAHITechnology Please add unit test case for this change? 
   
   We can use the following code to create a topic with 3 partitions
   
   ```
   	topic := newTopicName()
   	testURL := adminURL + "/" + "admin/v2/persistent/public/default/" + topic + "/partitions"
   	makeHTTPCall(t, http.MethodPut, testURL, "3")
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] EAHITechnology commented on a change in pull request #696: Add parition reader to solve the Reader fragmented read data.

Posted by GitBox <gi...@apache.org>.
EAHITechnology commented on a change in pull request #696:
URL: https://github.com/apache/pulsar-client-go/pull/696#discussion_r784657424



##########
File path: pulsar/reader_partition.go
##########
@@ -0,0 +1,142 @@
+package pulsar
+
+import (
+	"sync"
+	"time"
+)
+
+func (r *reader) internalTopicReadToPartitions() error {
+	partitions, err := r.client.TopicPartitions(r.topic)
+	if err != nil {
+		return err
+	}
+
+	oldNumPartitions, newNumPartitions := 0, len(partitions)
+
+	r.Lock()
+	defer r.Unlock()
+
+	oldReaders, oldNumPartitions := r.consumers, len(r.consumers)
+	if oldReaders != nil {
+		if oldNumPartitions == newNumPartitions {
+			r.log.Debug("Number of partitions in topic has not changed")
+			return nil
+		}
+
+		r.log.WithField("old_partitions", oldNumPartitions).
+			WithField("new_partitions", newNumPartitions).
+			Info("Changed number of partitions in topic")
+	}
+
+	r.consumers = make([]*partitionConsumer, newNumPartitions)
+
+	// When for some reason (eg: forced deletion of sub partition) causes oldNumPartitions> newNumPartitions,
+	// we need to rebuild the cache of new consumers, otherwise the array will be out of bounds.
+	if oldReaders != nil && oldNumPartitions < newNumPartitions {
+		// Copy over the existing consumer instances
+		for i := 0; i < oldNumPartitions; i++ {
+			r.consumers[i] = oldReaders[i]
+		}
+	}
+
+	type ConsumerError struct {
+		err       error
+		partition int
+		consumer  *partitionConsumer
+	}
+
+	subscriptionName := r.options.SubscriptionRolePrefix
+	if subscriptionName == "" {
+		subscriptionName = "reader"
+	}
+	subscriptionName += "-" + generateRandomName()
+
+	startMessageID, ok := toTrackingMessageID(r.options.StartMessageID)
+	if !ok {
+		// a custom type satisfying MessageID may not be a messageID or trackingMessageID
+		// so re-create messageID using its data
+		deserMsgID, err := deserializeMessageID(r.options.StartMessageID.Serialize())
+		if err != nil {
+			return err
+		}
+		// de-serialized MessageID is a messageID
+		startMessageID = trackingMessageID{
+			messageID:    deserMsgID.(messageID),
+			receivedTime: time.Now(),
+		}
+	}
+
+	startPartition := oldNumPartitions
+	partitionsToAdd := newNumPartitions - oldNumPartitions
+
+	if partitionsToAdd < 0 {
+		partitionsToAdd = newNumPartitions
+		startPartition = 0
+	}
+
+	var wg sync.WaitGroup
+	ch := make(chan ConsumerError, partitionsToAdd)
+	wg.Add(partitionsToAdd)
+
+	for partitionIdx := startPartition; partitionIdx < newNumPartitions; partitionIdx++ {
+		partitionTopic := partitions[partitionIdx]
+
+		go func(idx int, pt string) {
+			defer wg.Done()
+
+			opts := &partitionConsumerOpts{
+				topic:                      pt,
+				consumerName:               r.options.Name,
+				subscription:               subscriptionName,
+				subscriptionType:           Exclusive,
+				partitionIdx:               idx,
+				receiverQueueSize:          r.options.ReceiverQueueSize,
+				nackRedeliveryDelay:        defaultNackRedeliveryDelay,
+				metadata:                   r.options.Properties,
+				replicateSubscriptionState: false,
+				startMessageID:             startMessageID,
+				subscriptionMode:           durable,

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] cckellogg commented on a change in pull request #696: Add parition reader to solve the Reader fragmented read data.

Posted by GitBox <gi...@apache.org>.
cckellogg commented on a change in pull request #696:
URL: https://github.com/apache/pulsar-client-go/pull/696#discussion_r776832958



##########
File path: pulsar/reader_impl.go
##########
@@ -148,35 +113,54 @@ func (r *reader) HasNext() bool {
 		return true
 	}
 
+retryLoop:
 	for {
-		lastMsgID, err := r.pc.getLastMessageID()
-		if err != nil {
-			r.log.WithError(err).Error("Failed to get last message id from broker")
-			continue
-		} else {
+	consumerLoop:
+		for _, consumer := range r.consumers {
+			lastMsgID, err := consumer.getLastMessageID()
+			if err != nil {
+				r.log.WithError(err).Error("Failed to get last message id from broker")
+				continue retryLoop
+			}
+			if r.lastMessageInBroker.greater(lastMsgID.messageID) {
+				continue consumerLoop
+			}
 			r.lastMessageInBroker = lastMsgID
-			break
 		}
+		break retryLoop
 	}
 
 	return r.hasMoreMessages()
 }
 
 func (r *reader) hasMoreMessages() bool {
-	if !r.pc.lastDequeuedMsg.Undefined() {
-		return r.lastMessageInBroker.isEntryIDValid() && r.lastMessageInBroker.greater(r.pc.lastDequeuedMsg.messageID)
-	}
+	moreMessagesCheck := func(idx int) bool {
+		if !r.consumers[idx].lastDequeuedMsg.Undefined() {
+			return r.lastMessageInBroker.isEntryIDValid() && r.lastMessageInBroker.greater(r.consumers[idx].lastDequeuedMsg.messageID)
+		}
 
-	if r.pc.options.startMessageIDInclusive {
-		return r.lastMessageInBroker.isEntryIDValid() && r.lastMessageInBroker.greaterEqual(r.pc.startMessageID.messageID)
-	}
+		if r.consumers[idx].options.startMessageIDInclusive {
+			return r.lastMessageInBroker.isEntryIDValid() && r.lastMessageInBroker.greaterEqual(r.consumers[idx].startMessageID.messageID)
+		}
 
-	// Non-inclusive
-	return r.lastMessageInBroker.isEntryIDValid() && r.lastMessageInBroker.greater(r.pc.startMessageID.messageID)
+		// Non-inclusive
+		return r.lastMessageInBroker.isEntryIDValid() && r.lastMessageInBroker.greater(r.consumers[idx].startMessageID.messageID)
+	}
+	for idx := range r.consumers {
+		if moreMessagesCheck(idx) {

Review comment:
       Let's pass the consumer and the lastMessageInBroker (instead of an index) `consumerHasMoreMessages(consumer Consumer, lastMessageInBroker)` Then the logic can be Tested.

##########
File path: pulsar/reader_partition.go
##########
@@ -0,0 +1,142 @@
+package pulsar
+
+import (
+	"sync"
+	"time"
+)
+
+func (r *reader) internalTopicReadToPartitions() error {

Review comment:
       What is this function doing I'm a little confused by the name. Also, this looks like a lot of duplication code from the consumer partitions is there any way to refactor/combine in to helper functions to avoid duplication?

##########
File path: pulsar/reader_impl.go
##########
@@ -207,12 +191,15 @@ func (r *reader) Seek(msgID MessageID) error {
 		return nil
 	}
 
-	return r.pc.Seek(mid)
+	return r.consumers[mid.partitionIdx].Seek(mid)

Review comment:
       could there be an invalid partition (out of range) index here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] EAHITechnology commented on a change in pull request #696: Add parition reader to solve the Reader fragmented read data.

Posted by GitBox <gi...@apache.org>.
EAHITechnology commented on a change in pull request #696:
URL: https://github.com/apache/pulsar-client-go/pull/696#discussion_r777813297



##########
File path: pulsar/reader_partition.go
##########
@@ -0,0 +1,142 @@
+package pulsar
+
+import (
+	"sync"
+	"time"
+)
+
+func (r *reader) internalTopicReadToPartitions() error {

Review comment:
       For example, the Reader will process startMessageID, but the consumer will not.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wolfstudy commented on a change in pull request #696: Add parition reader to solve the Reader fragmented read data.

Posted by GitBox <gi...@apache.org>.
wolfstudy commented on a change in pull request #696:
URL: https://github.com/apache/pulsar-client-go/pull/696#discussion_r779470116



##########
File path: pulsar/reader_partition.go
##########
@@ -0,0 +1,142 @@
+package pulsar

Review comment:
       Please add license header for new file?
   
   ```
   // Licensed to the Apache Software Foundation (ASF) under one
   // or more contributor license agreements.  See the NOTICE file
   // distributed with this work for additional information
   // regarding copyright ownership.  The ASF licenses this file
   // to you under the Apache License, Version 2.0 (the
   // "License"); you may not use this file except in compliance
   // with the License.  You may obtain a copy of the License at
   //
   //   http://www.apache.org/licenses/LICENSE-2.0
   //
   // Unless required by applicable law or agreed to in writing,
   // software distributed under the License is distributed on an
   // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
   // KIND, either express or implied.  See the License for the
   // specific language governing permissions and limitations
   // under the License.
   ```

##########
File path: pulsar/reader_partition.go
##########
@@ -0,0 +1,142 @@
+package pulsar
+
+import (
+	"sync"
+	"time"
+)
+
+func (r *reader) internalTopicReadToPartitions() error {
+	partitions, err := r.client.TopicPartitions(r.topic)
+	if err != nil {
+		return err
+	}
+
+	oldNumPartitions, newNumPartitions := 0, len(partitions)
+
+	r.Lock()
+	defer r.Unlock()
+
+	oldReaders, oldNumPartitions := r.consumers, len(r.consumers)
+	if oldReaders != nil {
+		if oldNumPartitions == newNumPartitions {
+			r.log.Debug("Number of partitions in topic has not changed")
+			return nil
+		}
+
+		r.log.WithField("old_partitions", oldNumPartitions).
+			WithField("new_partitions", newNumPartitions).
+			Info("Changed number of partitions in topic")
+	}
+
+	r.consumers = make([]*partitionConsumer, newNumPartitions)
+
+	// When for some reason (eg: forced deletion of sub partition) causes oldNumPartitions> newNumPartitions,
+	// we need to rebuild the cache of new consumers, otherwise the array will be out of bounds.
+	if oldReaders != nil && oldNumPartitions < newNumPartitions {
+		// Copy over the existing consumer instances
+		for i := 0; i < oldNumPartitions; i++ {
+			r.consumers[i] = oldReaders[i]
+		}
+	}
+
+	type ConsumerError struct {
+		err       error
+		partition int
+		consumer  *partitionConsumer
+	}
+
+	subscriptionName := r.options.SubscriptionRolePrefix
+	if subscriptionName == "" {
+		subscriptionName = "reader"

Review comment:
       In here, maybe we can define a string constant
   
   ```
   const ReaderSubNamePrefix = "reader"
   ```

##########
File path: pulsar/reader_partition.go
##########
@@ -0,0 +1,142 @@
+package pulsar
+
+import (
+	"sync"
+	"time"
+)
+
+func (r *reader) internalTopicReadToPartitions() error {
+	partitions, err := r.client.TopicPartitions(r.topic)
+	if err != nil {
+		return err
+	}
+
+	oldNumPartitions, newNumPartitions := 0, len(partitions)
+
+	r.Lock()
+	defer r.Unlock()
+
+	oldReaders, oldNumPartitions := r.consumers, len(r.consumers)
+	if oldReaders != nil {
+		if oldNumPartitions == newNumPartitions {
+			r.log.Debug("Number of partitions in topic has not changed")
+			return nil
+		}
+
+		r.log.WithField("old_partitions", oldNumPartitions).
+			WithField("new_partitions", newNumPartitions).
+			Info("Changed number of partitions in topic")
+	}
+
+	r.consumers = make([]*partitionConsumer, newNumPartitions)
+
+	// When for some reason (eg: forced deletion of sub partition) causes oldNumPartitions> newNumPartitions,
+	// we need to rebuild the cache of new consumers, otherwise the array will be out of bounds.
+	if oldReaders != nil && oldNumPartitions < newNumPartitions {
+		// Copy over the existing consumer instances
+		for i := 0; i < oldNumPartitions; i++ {
+			r.consumers[i] = oldReaders[i]
+		}
+	}
+
+	type ConsumerError struct {
+		err       error
+		partition int
+		consumer  *partitionConsumer
+	}
+
+	subscriptionName := r.options.SubscriptionRolePrefix
+	if subscriptionName == "" {
+		subscriptionName = "reader"
+	}
+	subscriptionName += "-" + generateRandomName()
+
+	startMessageID, ok := toTrackingMessageID(r.options.StartMessageID)
+	if !ok {
+		// a custom type satisfying MessageID may not be a messageID or trackingMessageID
+		// so re-create messageID using its data
+		deserMsgID, err := deserializeMessageID(r.options.StartMessageID.Serialize())
+		if err != nil {
+			return err
+		}
+		// de-serialized MessageID is a messageID
+		startMessageID = trackingMessageID{
+			messageID:    deserMsgID.(messageID),
+			receivedTime: time.Now(),
+		}
+	}
+
+	startPartition := oldNumPartitions
+	partitionsToAdd := newNumPartitions - oldNumPartitions
+
+	if partitionsToAdd < 0 {
+		partitionsToAdd = newNumPartitions
+		startPartition = 0
+	}
+
+	var wg sync.WaitGroup
+	ch := make(chan ConsumerError, partitionsToAdd)
+	wg.Add(partitionsToAdd)
+
+	for partitionIdx := startPartition; partitionIdx < newNumPartitions; partitionIdx++ {
+		partitionTopic := partitions[partitionIdx]
+
+		go func(idx int, pt string) {
+			defer wg.Done()
+
+			opts := &partitionConsumerOpts{
+				topic:                      pt,
+				consumerName:               r.options.Name,
+				subscription:               subscriptionName,
+				subscriptionType:           Exclusive,
+				partitionIdx:               idx,
+				receiverQueueSize:          r.options.ReceiverQueueSize,
+				nackRedeliveryDelay:        defaultNackRedeliveryDelay,
+				metadata:                   r.options.Properties,
+				replicateSubscriptionState: false,
+				startMessageID:             startMessageID,
+				subscriptionMode:           durable,

Review comment:
       The reader use `subscriptionMode` is **nonDurable**?

##########
File path: pulsar/reader_partition.go
##########
@@ -0,0 +1,142 @@
+package pulsar
+
+import (
+	"sync"
+	"time"
+)
+
+func (r *reader) internalTopicReadToPartitions() error {
+	partitions, err := r.client.TopicPartitions(r.topic)
+	if err != nil {
+		return err
+	}
+
+	oldNumPartitions, newNumPartitions := 0, len(partitions)
+
+	r.Lock()
+	defer r.Unlock()
+
+	oldReaders, oldNumPartitions := r.consumers, len(r.consumers)
+	if oldReaders != nil {
+		if oldNumPartitions == newNumPartitions {
+			r.log.Debug("Number of partitions in topic has not changed")
+			return nil
+		}
+
+		r.log.WithField("old_partitions", oldNumPartitions).
+			WithField("new_partitions", newNumPartitions).
+			Info("Changed number of partitions in topic")
+	}
+
+	r.consumers = make([]*partitionConsumer, newNumPartitions)
+
+	// When for some reason (eg: forced deletion of sub partition) causes oldNumPartitions> newNumPartitions,
+	// we need to rebuild the cache of new consumers, otherwise the array will be out of bounds.
+	if oldReaders != nil && oldNumPartitions < newNumPartitions {
+		// Copy over the existing consumer instances
+		for i := 0; i < oldNumPartitions; i++ {
+			r.consumers[i] = oldReaders[i]
+		}
+	}
+
+	type ConsumerError struct {
+		err       error
+		partition int
+		consumer  *partitionConsumer
+	}
+
+	subscriptionName := r.options.SubscriptionRolePrefix
+	if subscriptionName == "" {
+		subscriptionName = "reader"
+	}
+	subscriptionName += "-" + generateRandomName()
+
+	startMessageID, ok := toTrackingMessageID(r.options.StartMessageID)
+	if !ok {
+		// a custom type satisfying MessageID may not be a messageID or trackingMessageID
+		// so re-create messageID using its data
+		deserMsgID, err := deserializeMessageID(r.options.StartMessageID.Serialize())
+		if err != nil {
+			return err
+		}
+		// de-serialized MessageID is a messageID
+		startMessageID = trackingMessageID{
+			messageID:    deserMsgID.(messageID),
+			receivedTime: time.Now(),
+		}
+	}
+
+	startPartition := oldNumPartitions
+	partitionsToAdd := newNumPartitions - oldNumPartitions
+
+	if partitionsToAdd < 0 {
+		partitionsToAdd = newNumPartitions
+		startPartition = 0
+	}
+
+	var wg sync.WaitGroup
+	ch := make(chan ConsumerError, partitionsToAdd)
+	wg.Add(partitionsToAdd)
+
+	for partitionIdx := startPartition; partitionIdx < newNumPartitions; partitionIdx++ {
+		partitionTopic := partitions[partitionIdx]
+
+		go func(idx int, pt string) {
+			defer wg.Done()
+
+			opts := &partitionConsumerOpts{
+				topic:                      pt,
+				consumerName:               r.options.Name,
+				subscription:               subscriptionName,
+				subscriptionType:           Exclusive,
+				partitionIdx:               idx,
+				receiverQueueSize:          r.options.ReceiverQueueSize,
+				nackRedeliveryDelay:        defaultNackRedeliveryDelay,
+				metadata:                   r.options.Properties,
+				replicateSubscriptionState: false,
+				startMessageID:             startMessageID,
+				subscriptionMode:           durable,

Review comment:
       It seems that we have lost the attribute of the `startMessageIDInclusive` field?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org