You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/11/03 10:06:29 UTC

[GitHub] [pulsar-client-go] wolfstudy opened a new pull request #660: [WIP]Support nack backoff policy for SDK

wolfstudy opened a new pull request #660:
URL: https://github.com/apache/pulsar-client-go/pull/660


   Signed-off-by: xiaolongran <xi...@tencent.com>
   
   
   Master Issue: #658
   
   ### Motivation
   
   Support nack backoff policy
   
   ### Modifications
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wolfstudy commented on a change in pull request #660: Support nack backoff policy for SDK

Posted by GitBox <gi...@apache.org>.
wolfstudy commented on a change in pull request #660:
URL: https://github.com/apache/pulsar-client-go/pull/660#discussion_r743352353



##########
File path: pulsar/negative_backoff_policy.go
##########
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import "math"
+
+// NackBackoffPolicy is a interface for custom message negativeAcked policy, users can specify a NackBackoffPolicy
+// for a consumer.
+//
+// > Notice: the consumer crashes will trigger the redelivery of the unacked message, this case will not respect the
+// > NackBackoffPolicy, which means the message might get redelivered earlier than the delay time
+// > from the backoff.
+type NackBackoffPolicy interface {
+	// The redeliveryCount indicates the number of times the message was redelivered.
+	// We can get the redeliveryCount from the CommandMessage.
+	Next(redeliveryCount uint32) int64
+}
+
+// defaultNackBackoffPolicy is default impl for NackBackoffPolicy.
+type defaultNackBackoffPolicy struct{}
+
+func (nbp *defaultNackBackoffPolicy) Next(redeliveryCount uint32) int64 {
+	minNackTimeMs := int64(1000 * 10) // 10sec
+	maxNackTimeMs := 1000 * 60 * 10   // 10min
+
+	if redeliveryCount < 0 {
+		return minNackTimeMs
+	}
+
+	return int64(math.Min(math.Abs(float64(minNackTimeMs<<redeliveryCount)), float64(maxNackTimeMs)))

Review comment:
       Sure, will add comment for this change




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] cckellogg commented on a change in pull request #660: Support nack backoff policy for SDK

Posted by GitBox <gi...@apache.org>.
cckellogg commented on a change in pull request #660:
URL: https://github.com/apache/pulsar-client-go/pull/660#discussion_r744035482



##########
File path: pulsar/consumer.go
##########
@@ -158,6 +158,17 @@ type ConsumerOptions struct {
 
 	// Decryption decryption related fields to decrypt the encrypted message
 	Decryption *MessageDecryptionInfo
+
+	// If enabled, the default implementation of NackBackoffPolicy will be used to calculate the delay time of
+	// nack backoff, Default: false.
+	EnableDefaultNackBackoffPolicy bool

Review comment:
       To me a cleaner API is to just have `NackBackoffPolicy` and expose the basic/default policy. If the policy is not set than it uses the current behavior. This way there is only 1 configuration knob to worry about.
   
   ```
   // current behavior
   ConsumerOpts{}
   
   // custom behavior
   ConsumerOpts{
     NackBackoffPolicy: pulsar.NewExpNackBackoffPolicy(),
   }
   ```
   
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wolfstudy commented on a change in pull request #660: Support nack backoff policy for SDK

Posted by GitBox <gi...@apache.org>.
wolfstudy commented on a change in pull request #660:
URL: https://github.com/apache/pulsar-client-go/pull/660#discussion_r743363658



##########
File path: pulsar/negative_acks_tracker.go
##########
@@ -35,22 +35,41 @@ type negativeAcksTracker struct {
 	doneOnce     sync.Once
 	negativeAcks map[messageID]time.Time
 	rc           redeliveryConsumer
-	tick         *time.Ticker
+	nackBackoff  NackBackoffPolicy
+	trackFlag    bool
 	delay        time.Duration
 	log          log.Logger
 }
 
-func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration, logger log.Logger) *negativeAcksTracker {
-	t := &negativeAcksTracker{
-		doneCh:       make(chan interface{}),
-		negativeAcks: make(map[messageID]time.Time),
-		rc:           rc,
-		tick:         time.NewTicker(delay / 3),
-		delay:        delay,
-		log:          logger,
-	}
+func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration,
+	nackBackoffPolicy NackBackoffPolicy, logger log.Logger) *negativeAcksTracker {
+
+	t := new(negativeAcksTracker)
+
+	// When using NackBackoffPolicy, the delay time needs to be calculated based on the RedeliveryCount field in
+	// the CommandMessage, so for the original default Nack() logic, we still keep the negativeAcksTracker created
+	// when we open a gorutine to execute the logic of `t.track()`. But for the NackBackoffPolicy method, we need
+	// to execute the logic of `t.track()` when AddMessage().
+	if nackBackoffPolicy != nil {

Review comment:
       Yes, agree with your point of view. The problem here is because, for nackbackoff, we can't directly get the corresponding nackDelayTime, we need to get the redeliveryCount through the CommandMessage and then calculate the nackDelayTime, then we can determine the time.NewTicker based on the nackDelayTime. It is precisely because of such a relationship that the if statement is added




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] cckellogg commented on a change in pull request #660: Support nack backoff policy for SDK

Posted by GitBox <gi...@apache.org>.
cckellogg commented on a change in pull request #660:
URL: https://github.com/apache/pulsar-client-go/pull/660#discussion_r743308255



##########
File path: pulsar/negative_acks_tracker.go
##########
@@ -35,22 +35,41 @@ type negativeAcksTracker struct {
 	doneOnce     sync.Once
 	negativeAcks map[messageID]time.Time
 	rc           redeliveryConsumer
-	tick         *time.Ticker
+	nackBackoff  NackBackoffPolicy
+	trackFlag    bool
 	delay        time.Duration
 	log          log.Logger
 }
 
-func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration, logger log.Logger) *negativeAcksTracker {
-	t := &negativeAcksTracker{
-		doneCh:       make(chan interface{}),
-		negativeAcks: make(map[messageID]time.Time),
-		rc:           rc,
-		tick:         time.NewTicker(delay / 3),
-		delay:        delay,
-		log:          logger,
-	}
+func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration,
+	nackBackoffPolicy NackBackoffPolicy, logger log.Logger) *negativeAcksTracker {
+
+	t := new(negativeAcksTracker)

Review comment:
       this can just be `var t *negativeAcksTracker`

##########
File path: pulsar/negative_backoff_policy.go
##########
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import "math"
+
+// NackBackoffPolicy is a interface for custom message negativeAcked policy, users can specify a NackBackoffPolicy
+// for a consumer.
+//
+// > Notice: the consumer crashes will trigger the redelivery of the unacked message, this case will not respect the
+// > NackBackoffPolicy, which means the message might get redelivered earlier than the delay time
+// > from the backoff.
+type NackBackoffPolicy interface {
+	// The redeliveryCount indicates the number of times the message was redelivered.
+	// We can get the redeliveryCount from the CommandMessage.
+	Next(redeliveryCount uint32) int64
+}
+
+// defaultNackBackoffPolicy is default impl for NackBackoffPolicy.
+type defaultNackBackoffPolicy struct{}
+
+func (nbp *defaultNackBackoffPolicy) Next(redeliveryCount uint32) int64 {
+	minNackTimeMs := int64(1000 * 10) // 10sec

Review comment:
       Let's use the time.Duration constants 
   ```
   10 * time.Seconds
   10 * time.Minutes
   ```

##########
File path: pulsar/negative_acks_tracker.go
##########
@@ -76,14 +95,48 @@ func (t *negativeAcksTracker) Add(msgID messageID) {
 	t.negativeAcks[batchMsgID] = targetTime
 }
 
-func (t *negativeAcksTracker) track() {
+func (t *negativeAcksTracker) AddMessage(msg Message) {

Review comment:
       Why is there a new method here?
   
   Also, it looks like state is changing here without a lock. If multiple go routines call this at once multiple tracking routines could be started right? 
   
   Can the tracking go routine just be started at creation time?

##########
File path: pulsar/consumer.go
##########
@@ -158,6 +158,17 @@ type ConsumerOptions struct {
 
 	// Decryption decryption related fields to decrypt the encrypted message
 	Decryption *MessageDecryptionInfo
+
+	// If enabled, the default implementation of NackBackoffPolicy will be used to calculate the delay time of
+	// nack backoff, Default: false.
+	EnableDefaultNackBackoffPolicy bool

Review comment:
       Why is this needed `EnableDefaultNackBackoffPolicy`?. If the `NackBackoffPolicy` is not supplied we can just the default?

##########
File path: pulsar/negative_acks_tracker.go
##########
@@ -105,15 +158,13 @@ func (t *negativeAcksTracker) track() {
 					t.rc.Redeliver(msgIds)
 				}
 			}
-
 		}
 	}
 }
 
 func (t *negativeAcksTracker) Close() {
 	// allow Close() to be invoked multiple times by consumer_partition to avoid panic
 	t.doneOnce.Do(func() {
-		t.tick.Stop()

Review comment:
       How is the ticker getting cleanup now?

##########
File path: pulsar/negative_acks_tracker.go
##########
@@ -35,22 +35,41 @@ type negativeAcksTracker struct {
 	doneOnce     sync.Once
 	negativeAcks map[messageID]time.Time
 	rc           redeliveryConsumer
-	tick         *time.Ticker
+	nackBackoff  NackBackoffPolicy
+	trackFlag    bool
 	delay        time.Duration
 	log          log.Logger
 }
 
-func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration, logger log.Logger) *negativeAcksTracker {
-	t := &negativeAcksTracker{
-		doneCh:       make(chan interface{}),
-		negativeAcks: make(map[messageID]time.Time),
-		rc:           rc,
-		tick:         time.NewTicker(delay / 3),
-		delay:        delay,
-		log:          logger,
-	}
+func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration,
+	nackBackoffPolicy NackBackoffPolicy, logger log.Logger) *negativeAcksTracker {
+
+	t := new(negativeAcksTracker)
+
+	// When using NackBackoffPolicy, the delay time needs to be calculated based on the RedeliveryCount field in
+	// the CommandMessage, so for the original default Nack() logic, we still keep the negativeAcksTracker created
+	// when we open a gorutine to execute the logic of `t.track()`. But for the NackBackoffPolicy method, we need
+	// to execute the logic of `t.track()` when AddMessage().
+	if nackBackoffPolicy != nil {

Review comment:
       I'm a little confused on why we need an if statement. Shouldn't the default Implementation of the `NackBackoffPolicy` be what the current behavior is? The benefit of the interface is to simply the code and delegate the to the implementation.
   
   ```
   bp := nackBackoffPolicy
   if bp == nil {
     bp = newDefaultBackoffPolicy(delay)
   }
   t = &negativeAcksTracker{
   			doneCh:       make(chan interface{}),
   			negativeAcks: make(map[messageID]time.Time),
   			nackBackoff:  bp,
   			rc:           rc,
   			log:          logger,
   		}
   ```
   
    

##########
File path: pulsar/negative_backoff_policy.go
##########
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import "math"
+
+// NackBackoffPolicy is a interface for custom message negativeAcked policy, users can specify a NackBackoffPolicy
+// for a consumer.
+//
+// > Notice: the consumer crashes will trigger the redelivery of the unacked message, this case will not respect the
+// > NackBackoffPolicy, which means the message might get redelivered earlier than the delay time
+// > from the backoff.
+type NackBackoffPolicy interface {
+	// The redeliveryCount indicates the number of times the message was redelivered.
+	// We can get the redeliveryCount from the CommandMessage.
+	Next(redeliveryCount uint32) int64
+}
+
+// defaultNackBackoffPolicy is default impl for NackBackoffPolicy.
+type defaultNackBackoffPolicy struct{}
+
+func (nbp *defaultNackBackoffPolicy) Next(redeliveryCount uint32) int64 {
+	minNackTimeMs := int64(1000 * 10) // 10sec
+	maxNackTimeMs := 1000 * 60 * 10   // 10min
+
+	if redeliveryCount < 0 {
+		return minNackTimeMs
+	}
+
+	return int64(math.Min(math.Abs(float64(minNackTimeMs<<redeliveryCount)), float64(maxNackTimeMs)))

Review comment:
       Can you add some comments to what this logic is doing. For me it's difficult to look and just understand it.

##########
File path: pulsar/negative_acks_tracker.go
##########
@@ -35,22 +35,41 @@ type negativeAcksTracker struct {
 	doneOnce     sync.Once
 	negativeAcks map[messageID]time.Time
 	rc           redeliveryConsumer
-	tick         *time.Ticker
+	nackBackoff  NackBackoffPolicy
+	trackFlag    bool
 	delay        time.Duration
 	log          log.Logger
 }
 
-func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration, logger log.Logger) *negativeAcksTracker {
-	t := &negativeAcksTracker{
-		doneCh:       make(chan interface{}),
-		negativeAcks: make(map[messageID]time.Time),
-		rc:           rc,
-		tick:         time.NewTicker(delay / 3),
-		delay:        delay,
-		log:          logger,
-	}
+func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration,
+	nackBackoffPolicy NackBackoffPolicy, logger log.Logger) *negativeAcksTracker {
+
+	t := new(negativeAcksTracker)
+
+	// When using NackBackoffPolicy, the delay time needs to be calculated based on the RedeliveryCount field in
+	// the CommandMessage, so for the original default Nack() logic, we still keep the negativeAcksTracker created
+	// when we open a gorutine to execute the logic of `t.track()`. But for the NackBackoffPolicy method, we need
+	// to execute the logic of `t.track()` when AddMessage().
+	if nackBackoffPolicy != nil {

Review comment:
       I'm a little confused on why we need an if statement. Shouldn't the default Implementation of the `NackBackoffPolicy` be what the current behavior is? The benefit of the interface is to simply the code and delegate to the implementation.
   
   ```
   bp := nackBackoffPolicy
   if bp == nil {
     bp = newDefaultBackoffPolicy(delay)
   }
   t = &negativeAcksTracker{
   			doneCh:       make(chan interface{}),
   			negativeAcks: make(map[messageID]time.Time),
   			nackBackoff:  bp,
   			rc:           rc,
   			log:          logger,
   		}
   ```
   
    Thoughts?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wolfstudy commented on a change in pull request #660: Support nack backoff policy for SDK

Posted by GitBox <gi...@apache.org>.
wolfstudy commented on a change in pull request #660:
URL: https://github.com/apache/pulsar-client-go/pull/660#discussion_r743356899



##########
File path: pulsar/negative_backoff_policy.go
##########
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import "math"
+
+// NackBackoffPolicy is a interface for custom message negativeAcked policy, users can specify a NackBackoffPolicy
+// for a consumer.
+//
+// > Notice: the consumer crashes will trigger the redelivery of the unacked message, this case will not respect the
+// > NackBackoffPolicy, which means the message might get redelivered earlier than the delay time
+// > from the backoff.
+type NackBackoffPolicy interface {
+	// The redeliveryCount indicates the number of times the message was redelivered.
+	// We can get the redeliveryCount from the CommandMessage.
+	Next(redeliveryCount uint32) int64
+}
+
+// defaultNackBackoffPolicy is default impl for NackBackoffPolicy.
+type defaultNackBackoffPolicy struct{}
+
+func (nbp *defaultNackBackoffPolicy) Next(redeliveryCount uint32) int64 {
+	minNackTimeMs := int64(1000 * 10) // 10sec
+	maxNackTimeMs := 1000 * 60 * 10   // 10min
+
+	if redeliveryCount < 0 {
+		return minNackTimeMs
+	}
+
+	return int64(math.Min(math.Abs(float64(minNackTimeMs<<redeliveryCount)), float64(maxNackTimeMs)))

Review comment:
       Here we will first get the redeliveryCount object from the CommandMessage, and then start the << operation from minNackTimeMs to calculate the current length of time that nack needs to be executed, and then compare it with maxNackTimeMs, and take their maximum value as the nack duration. nack will increase from minNackTimeMs to maxNackTimeMs according to the above rule

##########
File path: pulsar/negative_backoff_policy.go
##########
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import "math"
+
+// NackBackoffPolicy is a interface for custom message negativeAcked policy, users can specify a NackBackoffPolicy
+// for a consumer.
+//
+// > Notice: the consumer crashes will trigger the redelivery of the unacked message, this case will not respect the
+// > NackBackoffPolicy, which means the message might get redelivered earlier than the delay time
+// > from the backoff.
+type NackBackoffPolicy interface {
+	// The redeliveryCount indicates the number of times the message was redelivered.
+	// We can get the redeliveryCount from the CommandMessage.
+	Next(redeliveryCount uint32) int64
+}
+
+// defaultNackBackoffPolicy is default impl for NackBackoffPolicy.
+type defaultNackBackoffPolicy struct{}
+
+func (nbp *defaultNackBackoffPolicy) Next(redeliveryCount uint32) int64 {
+	minNackTimeMs := int64(1000 * 10) // 10sec
+	maxNackTimeMs := 1000 * 60 * 10   // 10min
+
+	if redeliveryCount < 0 {
+		return minNackTimeMs
+	}
+
+	return int64(math.Min(math.Abs(float64(minNackTimeMs<<redeliveryCount)), float64(maxNackTimeMs)))

Review comment:
       Here we will first get the redeliveryCount object from the CommandMessage, and then start the << operation from minNackTimeMs to calculate the current length of time that nack needs to be executed, and then compare it with maxNackTimeMs, and take their maximum value as the nack duration. nack will increase from minNackTimeMs to maxNackTimeMs according to the above rule




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wolfstudy commented on a change in pull request #660: Support nack backoff policy for SDK

Posted by GitBox <gi...@apache.org>.
wolfstudy commented on a change in pull request #660:
URL: https://github.com/apache/pulsar-client-go/pull/660#discussion_r743352255



##########
File path: pulsar/negative_backoff_policy.go
##########
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import "math"
+
+// NackBackoffPolicy is a interface for custom message negativeAcked policy, users can specify a NackBackoffPolicy
+// for a consumer.
+//
+// > Notice: the consumer crashes will trigger the redelivery of the unacked message, this case will not respect the
+// > NackBackoffPolicy, which means the message might get redelivered earlier than the delay time
+// > from the backoff.
+type NackBackoffPolicy interface {
+	// The redeliveryCount indicates the number of times the message was redelivered.
+	// We can get the redeliveryCount from the CommandMessage.
+	Next(redeliveryCount uint32) int64
+}
+
+// defaultNackBackoffPolicy is default impl for NackBackoffPolicy.
+type defaultNackBackoffPolicy struct{}
+
+func (nbp *defaultNackBackoffPolicy) Next(redeliveryCount uint32) int64 {
+	minNackTimeMs := int64(1000 * 10) // 10sec

Review comment:
       ok, will fix this




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wolfstudy commented on a change in pull request #660: Support nack backoff policy for SDK

Posted by GitBox <gi...@apache.org>.
wolfstudy commented on a change in pull request #660:
URL: https://github.com/apache/pulsar-client-go/pull/660#discussion_r743352255



##########
File path: pulsar/negative_backoff_policy.go
##########
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import "math"
+
+// NackBackoffPolicy is a interface for custom message negativeAcked policy, users can specify a NackBackoffPolicy
+// for a consumer.
+//
+// > Notice: the consumer crashes will trigger the redelivery of the unacked message, this case will not respect the
+// > NackBackoffPolicy, which means the message might get redelivered earlier than the delay time
+// > from the backoff.
+type NackBackoffPolicy interface {
+	// The redeliveryCount indicates the number of times the message was redelivered.
+	// We can get the redeliveryCount from the CommandMessage.
+	Next(redeliveryCount uint32) int64
+}
+
+// defaultNackBackoffPolicy is default impl for NackBackoffPolicy.
+type defaultNackBackoffPolicy struct{}
+
+func (nbp *defaultNackBackoffPolicy) Next(redeliveryCount uint32) int64 {
+	minNackTimeMs := int64(1000 * 10) // 10sec

Review comment:
       ok, will fix this




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wolfstudy commented on a change in pull request #660: Support nack backoff policy for SDK

Posted by GitBox <gi...@apache.org>.
wolfstudy commented on a change in pull request #660:
URL: https://github.com/apache/pulsar-client-go/pull/660#discussion_r743352175



##########
File path: pulsar/consumer.go
##########
@@ -158,6 +158,17 @@ type ConsumerOptions struct {
 
 	// Decryption decryption related fields to decrypt the encrypted message
 	Decryption *MessageDecryptionInfo
+
+	// If enabled, the default implementation of NackBackoffPolicy will be used to calculate the delay time of
+	// nack backoff, Default: false.
+	EnableDefaultNackBackoffPolicy bool

Review comment:
       If there is no `EnableDefaultNackBackoffPolicy`, it will invade the existing code logic. When the NackBackoffPolicy policy is empty, suppose we use the default NackBackoffPolicy, then when the user uses the Nack(Message) interface, the new implementation will be used.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wolfstudy commented on a change in pull request #660: Support nack backoff policy for SDK

Posted by GitBox <gi...@apache.org>.
wolfstudy commented on a change in pull request #660:
URL: https://github.com/apache/pulsar-client-go/pull/660#discussion_r743352175



##########
File path: pulsar/consumer.go
##########
@@ -158,6 +158,17 @@ type ConsumerOptions struct {
 
 	// Decryption decryption related fields to decrypt the encrypted message
 	Decryption *MessageDecryptionInfo
+
+	// If enabled, the default implementation of NackBackoffPolicy will be used to calculate the delay time of
+	// nack backoff, Default: false.
+	EnableDefaultNackBackoffPolicy bool

Review comment:
       If there is no `EnableDefaultNackBackoffPolicy`, it will invade the existing code logic. When the NackBackoffPolicy policy is empty, suppose we use the default NackBackoffPolicy, then when the user uses the Nack(Message) interface, the new implementation will be used.

##########
File path: pulsar/negative_backoff_policy.go
##########
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import "math"
+
+// NackBackoffPolicy is a interface for custom message negativeAcked policy, users can specify a NackBackoffPolicy
+// for a consumer.
+//
+// > Notice: the consumer crashes will trigger the redelivery of the unacked message, this case will not respect the
+// > NackBackoffPolicy, which means the message might get redelivered earlier than the delay time
+// > from the backoff.
+type NackBackoffPolicy interface {
+	// The redeliveryCount indicates the number of times the message was redelivered.
+	// We can get the redeliveryCount from the CommandMessage.
+	Next(redeliveryCount uint32) int64
+}
+
+// defaultNackBackoffPolicy is default impl for NackBackoffPolicy.
+type defaultNackBackoffPolicy struct{}
+
+func (nbp *defaultNackBackoffPolicy) Next(redeliveryCount uint32) int64 {
+	minNackTimeMs := int64(1000 * 10) // 10sec

Review comment:
       ok, will fix this

##########
File path: pulsar/negative_backoff_policy.go
##########
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import "math"
+
+// NackBackoffPolicy is a interface for custom message negativeAcked policy, users can specify a NackBackoffPolicy
+// for a consumer.
+//
+// > Notice: the consumer crashes will trigger the redelivery of the unacked message, this case will not respect the
+// > NackBackoffPolicy, which means the message might get redelivered earlier than the delay time
+// > from the backoff.
+type NackBackoffPolicy interface {
+	// The redeliveryCount indicates the number of times the message was redelivered.
+	// We can get the redeliveryCount from the CommandMessage.
+	Next(redeliveryCount uint32) int64
+}
+
+// defaultNackBackoffPolicy is default impl for NackBackoffPolicy.
+type defaultNackBackoffPolicy struct{}
+
+func (nbp *defaultNackBackoffPolicy) Next(redeliveryCount uint32) int64 {
+	minNackTimeMs := int64(1000 * 10) // 10sec
+	maxNackTimeMs := 1000 * 60 * 10   // 10min
+
+	if redeliveryCount < 0 {
+		return minNackTimeMs
+	}
+
+	return int64(math.Min(math.Abs(float64(minNackTimeMs<<redeliveryCount)), float64(maxNackTimeMs)))

Review comment:
       Sure, will add comment for this change

##########
File path: pulsar/negative_acks_tracker.go
##########
@@ -35,22 +35,41 @@ type negativeAcksTracker struct {
 	doneOnce     sync.Once
 	negativeAcks map[messageID]time.Time
 	rc           redeliveryConsumer
-	tick         *time.Ticker
+	nackBackoff  NackBackoffPolicy
+	trackFlag    bool
 	delay        time.Duration
 	log          log.Logger
 }
 
-func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration, logger log.Logger) *negativeAcksTracker {
-	t := &negativeAcksTracker{
-		doneCh:       make(chan interface{}),
-		negativeAcks: make(map[messageID]time.Time),
-		rc:           rc,
-		tick:         time.NewTicker(delay / 3),
-		delay:        delay,
-		log:          logger,
-	}
+func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration,
+	nackBackoffPolicy NackBackoffPolicy, logger log.Logger) *negativeAcksTracker {
+
+	t := new(negativeAcksTracker)

Review comment:
       They are the same effect

##########
File path: pulsar/negative_backoff_policy.go
##########
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import "math"
+
+// NackBackoffPolicy is a interface for custom message negativeAcked policy, users can specify a NackBackoffPolicy
+// for a consumer.
+//
+// > Notice: the consumer crashes will trigger the redelivery of the unacked message, this case will not respect the
+// > NackBackoffPolicy, which means the message might get redelivered earlier than the delay time
+// > from the backoff.
+type NackBackoffPolicy interface {
+	// The redeliveryCount indicates the number of times the message was redelivered.
+	// We can get the redeliveryCount from the CommandMessage.
+	Next(redeliveryCount uint32) int64
+}
+
+// defaultNackBackoffPolicy is default impl for NackBackoffPolicy.
+type defaultNackBackoffPolicy struct{}
+
+func (nbp *defaultNackBackoffPolicy) Next(redeliveryCount uint32) int64 {
+	minNackTimeMs := int64(1000 * 10) // 10sec

Review comment:
       ok, will fix this

##########
File path: pulsar/negative_backoff_policy.go
##########
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import "math"
+
+// NackBackoffPolicy is a interface for custom message negativeAcked policy, users can specify a NackBackoffPolicy
+// for a consumer.
+//
+// > Notice: the consumer crashes will trigger the redelivery of the unacked message, this case will not respect the
+// > NackBackoffPolicy, which means the message might get redelivered earlier than the delay time
+// > from the backoff.
+type NackBackoffPolicy interface {
+	// The redeliveryCount indicates the number of times the message was redelivered.
+	// We can get the redeliveryCount from the CommandMessage.
+	Next(redeliveryCount uint32) int64
+}
+
+// defaultNackBackoffPolicy is default impl for NackBackoffPolicy.
+type defaultNackBackoffPolicy struct{}
+
+func (nbp *defaultNackBackoffPolicy) Next(redeliveryCount uint32) int64 {
+	minNackTimeMs := int64(1000 * 10) // 10sec

Review comment:
       Because the << operation is required to cooperate with redeliveryCount, the unit conversion is still required, which will be converted to time.Duration in subsequent use.

##########
File path: pulsar/negative_backoff_policy.go
##########
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import "math"
+
+// NackBackoffPolicy is a interface for custom message negativeAcked policy, users can specify a NackBackoffPolicy
+// for a consumer.
+//
+// > Notice: the consumer crashes will trigger the redelivery of the unacked message, this case will not respect the
+// > NackBackoffPolicy, which means the message might get redelivered earlier than the delay time
+// > from the backoff.
+type NackBackoffPolicy interface {
+	// The redeliveryCount indicates the number of times the message was redelivered.
+	// We can get the redeliveryCount from the CommandMessage.
+	Next(redeliveryCount uint32) int64
+}
+
+// defaultNackBackoffPolicy is default impl for NackBackoffPolicy.
+type defaultNackBackoffPolicy struct{}
+
+func (nbp *defaultNackBackoffPolicy) Next(redeliveryCount uint32) int64 {
+	minNackTimeMs := int64(1000 * 10) // 10sec

Review comment:
       Because the `<<` operation is required to cooperate with redeliveryCount, the unit conversion is still required, which will be converted to time.Duration in subsequent use.

##########
File path: pulsar/negative_backoff_policy.go
##########
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import "math"
+
+// NackBackoffPolicy is a interface for custom message negativeAcked policy, users can specify a NackBackoffPolicy
+// for a consumer.
+//
+// > Notice: the consumer crashes will trigger the redelivery of the unacked message, this case will not respect the
+// > NackBackoffPolicy, which means the message might get redelivered earlier than the delay time
+// > from the backoff.
+type NackBackoffPolicy interface {
+	// The redeliveryCount indicates the number of times the message was redelivered.
+	// We can get the redeliveryCount from the CommandMessage.
+	Next(redeliveryCount uint32) int64
+}
+
+// defaultNackBackoffPolicy is default impl for NackBackoffPolicy.
+type defaultNackBackoffPolicy struct{}
+
+func (nbp *defaultNackBackoffPolicy) Next(redeliveryCount uint32) int64 {
+	minNackTimeMs := int64(1000 * 10) // 10sec
+	maxNackTimeMs := 1000 * 60 * 10   // 10min
+
+	if redeliveryCount < 0 {
+		return minNackTimeMs
+	}
+
+	return int64(math.Min(math.Abs(float64(minNackTimeMs<<redeliveryCount)), float64(maxNackTimeMs)))

Review comment:
       Here we will first get the redeliveryCount object from the CommandMessage, and then start the << operation from minNackTimeMs to calculate the current length of time that nack needs to be executed, and then compare it with maxNackTimeMs, and take their maximum value as the nack duration. nack will increase from minNackTimeMs to maxNackTimeMs according to the above rule

##########
File path: pulsar/negative_backoff_policy.go
##########
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import "math"
+
+// NackBackoffPolicy is a interface for custom message negativeAcked policy, users can specify a NackBackoffPolicy
+// for a consumer.
+//
+// > Notice: the consumer crashes will trigger the redelivery of the unacked message, this case will not respect the
+// > NackBackoffPolicy, which means the message might get redelivered earlier than the delay time
+// > from the backoff.
+type NackBackoffPolicy interface {
+	// The redeliveryCount indicates the number of times the message was redelivered.
+	// We can get the redeliveryCount from the CommandMessage.
+	Next(redeliveryCount uint32) int64
+}
+
+// defaultNackBackoffPolicy is default impl for NackBackoffPolicy.
+type defaultNackBackoffPolicy struct{}
+
+func (nbp *defaultNackBackoffPolicy) Next(redeliveryCount uint32) int64 {
+	minNackTimeMs := int64(1000 * 10) // 10sec
+	maxNackTimeMs := 1000 * 60 * 10   // 10min
+
+	if redeliveryCount < 0 {
+		return minNackTimeMs
+	}
+
+	return int64(math.Min(math.Abs(float64(minNackTimeMs<<redeliveryCount)), float64(maxNackTimeMs)))

Review comment:
       Here we will first get the redeliveryCount object from the CommandMessage, and then start the << operation from minNackTimeMs to calculate the current length of time that nack needs to be executed, and then compare it with maxNackTimeMs, and take their maximum value as the nack duration. nack will increase from minNackTimeMs to maxNackTimeMs according to the above rule

##########
File path: pulsar/negative_acks_tracker.go
##########
@@ -35,22 +35,41 @@ type negativeAcksTracker struct {
 	doneOnce     sync.Once
 	negativeAcks map[messageID]time.Time
 	rc           redeliveryConsumer
-	tick         *time.Ticker
+	nackBackoff  NackBackoffPolicy
+	trackFlag    bool
 	delay        time.Duration
 	log          log.Logger
 }
 
-func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration, logger log.Logger) *negativeAcksTracker {
-	t := &negativeAcksTracker{
-		doneCh:       make(chan interface{}),
-		negativeAcks: make(map[messageID]time.Time),
-		rc:           rc,
-		tick:         time.NewTicker(delay / 3),
-		delay:        delay,
-		log:          logger,
-	}
+func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration,
+	nackBackoffPolicy NackBackoffPolicy, logger log.Logger) *negativeAcksTracker {
+
+	t := new(negativeAcksTracker)
+
+	// When using NackBackoffPolicy, the delay time needs to be calculated based on the RedeliveryCount field in
+	// the CommandMessage, so for the original default Nack() logic, we still keep the negativeAcksTracker created
+	// when we open a gorutine to execute the logic of `t.track()`. But for the NackBackoffPolicy method, we need
+	// to execute the logic of `t.track()` when AddMessage().
+	if nackBackoffPolicy != nil {

Review comment:
       Yes, agree with your point of view. The problem here is because, for nackbackoff, we can't directly get the corresponding nackDelayTime, we need to get the redeliveryCount through the CommandMessage and then calculate the nackDelayTime, then we can determine the time.NewTicker based on the nackDelayTime. It is precisely because of such a relationship that the if statement is added

##########
File path: pulsar/negative_acks_tracker.go
##########
@@ -76,14 +95,48 @@ func (t *negativeAcksTracker) Add(msgID messageID) {
 	t.negativeAcks[batchMsgID] = targetTime
 }
 
-func (t *negativeAcksTracker) track() {
+func (t *negativeAcksTracker) AddMessage(msg Message) {

Review comment:
       Because we need to get redeliveryCount through the Message interface

##########
File path: pulsar/negative_acks_tracker.go
##########
@@ -105,15 +158,13 @@ func (t *negativeAcksTracker) track() {
 					t.rc.Redeliver(msgIds)
 				}
 			}
-
 		}
 	}
 }
 
 func (t *negativeAcksTracker) Close() {
 	// allow Close() to be invoked multiple times by consumer_partition to avoid panic
 	t.doneOnce.Do(func() {
-		t.tick.Stop()

Review comment:
       In the current implementation situation, if we use the t.ticker in the struct, there will be a data race, so now we use the temporary variables of the ticker, and there is no good way to see how to close the temporarily created ticker.

##########
File path: pulsar/negative_backoff_policy.go
##########
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import "math"
+
+// NackBackoffPolicy is a interface for custom message negativeAcked policy, users can specify a NackBackoffPolicy
+// for a consumer.
+//
+// > Notice: the consumer crashes will trigger the redelivery of the unacked message, this case will not respect the
+// > NackBackoffPolicy, which means the message might get redelivered earlier than the delay time
+// > from the backoff.
+type NackBackoffPolicy interface {
+	// The redeliveryCount indicates the number of times the message was redelivered.
+	// We can get the redeliveryCount from the CommandMessage.
+	Next(redeliveryCount uint32) int64
+}
+
+// defaultNackBackoffPolicy is default impl for NackBackoffPolicy.
+type defaultNackBackoffPolicy struct{}
+
+func (nbp *defaultNackBackoffPolicy) Next(redeliveryCount uint32) int64 {
+	minNackTimeMs := int64(1000 * 10) // 10sec
+	maxNackTimeMs := 1000 * 60 * 10   // 10min
+
+	if redeliveryCount < 0 {
+		return minNackTimeMs
+	}
+
+	return int64(math.Min(math.Abs(float64(minNackTimeMs<<redeliveryCount)), float64(maxNackTimeMs)))

Review comment:
       Here we will first get the redeliveryCount object from the CommandMessage, and then start the << operation from minNackTimeMs to calculate the current length of time that nack needs to be executed, and then compare it with maxNackTimeMs, and take their maximum value as the nack duration. nack will increase from minNackTimeMs to maxNackTimeMs according to the above rule

##########
File path: pulsar/consumer.go
##########
@@ -158,6 +158,17 @@ type ConsumerOptions struct {
 
 	// Decryption decryption related fields to decrypt the encrypted message
 	Decryption *MessageDecryptionInfo
+
+	// If enabled, the default implementation of NackBackoffPolicy will be used to calculate the delay time of
+	// nack backoff, Default: false.
+	EnableDefaultNackBackoffPolicy bool

Review comment:
       If there is no `EnableDefaultNackBackoffPolicy`, it will invade the existing code logic. When the NackBackoffPolicy policy is empty, suppose we use the default NackBackoffPolicy, then when the user uses the Nack(Message) interface, the new implementation will be used.

##########
File path: pulsar/negative_backoff_policy.go
##########
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import "math"
+
+// NackBackoffPolicy is a interface for custom message negativeAcked policy, users can specify a NackBackoffPolicy
+// for a consumer.
+//
+// > Notice: the consumer crashes will trigger the redelivery of the unacked message, this case will not respect the
+// > NackBackoffPolicy, which means the message might get redelivered earlier than the delay time
+// > from the backoff.
+type NackBackoffPolicy interface {
+	// The redeliveryCount indicates the number of times the message was redelivered.
+	// We can get the redeliveryCount from the CommandMessage.
+	Next(redeliveryCount uint32) int64
+}
+
+// defaultNackBackoffPolicy is default impl for NackBackoffPolicy.
+type defaultNackBackoffPolicy struct{}
+
+func (nbp *defaultNackBackoffPolicy) Next(redeliveryCount uint32) int64 {
+	minNackTimeMs := int64(1000 * 10) // 10sec

Review comment:
       ok, will fix this

##########
File path: pulsar/negative_backoff_policy.go
##########
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import "math"
+
+// NackBackoffPolicy is a interface for custom message negativeAcked policy, users can specify a NackBackoffPolicy
+// for a consumer.
+//
+// > Notice: the consumer crashes will trigger the redelivery of the unacked message, this case will not respect the
+// > NackBackoffPolicy, which means the message might get redelivered earlier than the delay time
+// > from the backoff.
+type NackBackoffPolicy interface {
+	// The redeliveryCount indicates the number of times the message was redelivered.
+	// We can get the redeliveryCount from the CommandMessage.
+	Next(redeliveryCount uint32) int64
+}
+
+// defaultNackBackoffPolicy is default impl for NackBackoffPolicy.
+type defaultNackBackoffPolicy struct{}
+
+func (nbp *defaultNackBackoffPolicy) Next(redeliveryCount uint32) int64 {
+	minNackTimeMs := int64(1000 * 10) // 10sec
+	maxNackTimeMs := 1000 * 60 * 10   // 10min
+
+	if redeliveryCount < 0 {
+		return minNackTimeMs
+	}
+
+	return int64(math.Min(math.Abs(float64(minNackTimeMs<<redeliveryCount)), float64(maxNackTimeMs)))

Review comment:
       Sure, will add comment for this change

##########
File path: pulsar/negative_acks_tracker.go
##########
@@ -35,22 +35,41 @@ type negativeAcksTracker struct {
 	doneOnce     sync.Once
 	negativeAcks map[messageID]time.Time
 	rc           redeliveryConsumer
-	tick         *time.Ticker
+	nackBackoff  NackBackoffPolicy
+	trackFlag    bool
 	delay        time.Duration
 	log          log.Logger
 }
 
-func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration, logger log.Logger) *negativeAcksTracker {
-	t := &negativeAcksTracker{
-		doneCh:       make(chan interface{}),
-		negativeAcks: make(map[messageID]time.Time),
-		rc:           rc,
-		tick:         time.NewTicker(delay / 3),
-		delay:        delay,
-		log:          logger,
-	}
+func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration,
+	nackBackoffPolicy NackBackoffPolicy, logger log.Logger) *negativeAcksTracker {
+
+	t := new(negativeAcksTracker)

Review comment:
       They are the same effect

##########
File path: pulsar/negative_backoff_policy.go
##########
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import "math"
+
+// NackBackoffPolicy is a interface for custom message negativeAcked policy, users can specify a NackBackoffPolicy
+// for a consumer.
+//
+// > Notice: the consumer crashes will trigger the redelivery of the unacked message, this case will not respect the
+// > NackBackoffPolicy, which means the message might get redelivered earlier than the delay time
+// > from the backoff.
+type NackBackoffPolicy interface {
+	// The redeliveryCount indicates the number of times the message was redelivered.
+	// We can get the redeliveryCount from the CommandMessage.
+	Next(redeliveryCount uint32) int64
+}
+
+// defaultNackBackoffPolicy is default impl for NackBackoffPolicy.
+type defaultNackBackoffPolicy struct{}
+
+func (nbp *defaultNackBackoffPolicy) Next(redeliveryCount uint32) int64 {
+	minNackTimeMs := int64(1000 * 10) // 10sec

Review comment:
       ok, will fix this

##########
File path: pulsar/negative_backoff_policy.go
##########
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import "math"
+
+// NackBackoffPolicy is a interface for custom message negativeAcked policy, users can specify a NackBackoffPolicy
+// for a consumer.
+//
+// > Notice: the consumer crashes will trigger the redelivery of the unacked message, this case will not respect the
+// > NackBackoffPolicy, which means the message might get redelivered earlier than the delay time
+// > from the backoff.
+type NackBackoffPolicy interface {
+	// The redeliveryCount indicates the number of times the message was redelivered.
+	// We can get the redeliveryCount from the CommandMessage.
+	Next(redeliveryCount uint32) int64
+}
+
+// defaultNackBackoffPolicy is default impl for NackBackoffPolicy.
+type defaultNackBackoffPolicy struct{}
+
+func (nbp *defaultNackBackoffPolicy) Next(redeliveryCount uint32) int64 {
+	minNackTimeMs := int64(1000 * 10) // 10sec

Review comment:
       Because the << operation is required to cooperate with redeliveryCount, the unit conversion is still required, which will be converted to time.Duration in subsequent use.

##########
File path: pulsar/negative_backoff_policy.go
##########
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import "math"
+
+// NackBackoffPolicy is a interface for custom message negativeAcked policy, users can specify a NackBackoffPolicy
+// for a consumer.
+//
+// > Notice: the consumer crashes will trigger the redelivery of the unacked message, this case will not respect the
+// > NackBackoffPolicy, which means the message might get redelivered earlier than the delay time
+// > from the backoff.
+type NackBackoffPolicy interface {
+	// The redeliveryCount indicates the number of times the message was redelivered.
+	// We can get the redeliveryCount from the CommandMessage.
+	Next(redeliveryCount uint32) int64
+}
+
+// defaultNackBackoffPolicy is default impl for NackBackoffPolicy.
+type defaultNackBackoffPolicy struct{}
+
+func (nbp *defaultNackBackoffPolicy) Next(redeliveryCount uint32) int64 {
+	minNackTimeMs := int64(1000 * 10) // 10sec

Review comment:
       Because the `<<` operation is required to cooperate with redeliveryCount, the unit conversion is still required, which will be converted to time.Duration in subsequent use.

##########
File path: pulsar/negative_backoff_policy.go
##########
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import "math"
+
+// NackBackoffPolicy is a interface for custom message negativeAcked policy, users can specify a NackBackoffPolicy
+// for a consumer.
+//
+// > Notice: the consumer crashes will trigger the redelivery of the unacked message, this case will not respect the
+// > NackBackoffPolicy, which means the message might get redelivered earlier than the delay time
+// > from the backoff.
+type NackBackoffPolicy interface {
+	// The redeliveryCount indicates the number of times the message was redelivered.
+	// We can get the redeliveryCount from the CommandMessage.
+	Next(redeliveryCount uint32) int64
+}
+
+// defaultNackBackoffPolicy is default impl for NackBackoffPolicy.
+type defaultNackBackoffPolicy struct{}
+
+func (nbp *defaultNackBackoffPolicy) Next(redeliveryCount uint32) int64 {
+	minNackTimeMs := int64(1000 * 10) // 10sec
+	maxNackTimeMs := 1000 * 60 * 10   // 10min
+
+	if redeliveryCount < 0 {
+		return minNackTimeMs
+	}
+
+	return int64(math.Min(math.Abs(float64(minNackTimeMs<<redeliveryCount)), float64(maxNackTimeMs)))

Review comment:
       Here we will first get the redeliveryCount object from the CommandMessage, and then start the << operation from minNackTimeMs to calculate the current length of time that nack needs to be executed, and then compare it with maxNackTimeMs, and take their maximum value as the nack duration. nack will increase from minNackTimeMs to maxNackTimeMs according to the above rule

##########
File path: pulsar/negative_backoff_policy.go
##########
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import "math"
+
+// NackBackoffPolicy is a interface for custom message negativeAcked policy, users can specify a NackBackoffPolicy
+// for a consumer.
+//
+// > Notice: the consumer crashes will trigger the redelivery of the unacked message, this case will not respect the
+// > NackBackoffPolicy, which means the message might get redelivered earlier than the delay time
+// > from the backoff.
+type NackBackoffPolicy interface {
+	// The redeliveryCount indicates the number of times the message was redelivered.
+	// We can get the redeliveryCount from the CommandMessage.
+	Next(redeliveryCount uint32) int64
+}
+
+// defaultNackBackoffPolicy is default impl for NackBackoffPolicy.
+type defaultNackBackoffPolicy struct{}
+
+func (nbp *defaultNackBackoffPolicy) Next(redeliveryCount uint32) int64 {
+	minNackTimeMs := int64(1000 * 10) // 10sec
+	maxNackTimeMs := 1000 * 60 * 10   // 10min
+
+	if redeliveryCount < 0 {
+		return minNackTimeMs
+	}
+
+	return int64(math.Min(math.Abs(float64(minNackTimeMs<<redeliveryCount)), float64(maxNackTimeMs)))

Review comment:
       Here we will first get the redeliveryCount object from the CommandMessage, and then start the << operation from minNackTimeMs to calculate the current length of time that nack needs to be executed, and then compare it with maxNackTimeMs, and take their maximum value as the nack duration. nack will increase from minNackTimeMs to maxNackTimeMs according to the above rule

##########
File path: pulsar/negative_acks_tracker.go
##########
@@ -35,22 +35,41 @@ type negativeAcksTracker struct {
 	doneOnce     sync.Once
 	negativeAcks map[messageID]time.Time
 	rc           redeliveryConsumer
-	tick         *time.Ticker
+	nackBackoff  NackBackoffPolicy
+	trackFlag    bool
 	delay        time.Duration
 	log          log.Logger
 }
 
-func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration, logger log.Logger) *negativeAcksTracker {
-	t := &negativeAcksTracker{
-		doneCh:       make(chan interface{}),
-		negativeAcks: make(map[messageID]time.Time),
-		rc:           rc,
-		tick:         time.NewTicker(delay / 3),
-		delay:        delay,
-		log:          logger,
-	}
+func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration,
+	nackBackoffPolicy NackBackoffPolicy, logger log.Logger) *negativeAcksTracker {
+
+	t := new(negativeAcksTracker)
+
+	// When using NackBackoffPolicy, the delay time needs to be calculated based on the RedeliveryCount field in
+	// the CommandMessage, so for the original default Nack() logic, we still keep the negativeAcksTracker created
+	// when we open a gorutine to execute the logic of `t.track()`. But for the NackBackoffPolicy method, we need
+	// to execute the logic of `t.track()` when AddMessage().
+	if nackBackoffPolicy != nil {

Review comment:
       Yes, agree with your point of view. The problem here is because, for nackbackoff, we can't directly get the corresponding nackDelayTime, we need to get the redeliveryCount through the CommandMessage and then calculate the nackDelayTime, then we can determine the time.NewTicker based on the nackDelayTime. It is precisely because of such a relationship that the if statement is added

##########
File path: pulsar/negative_acks_tracker.go
##########
@@ -76,14 +95,48 @@ func (t *negativeAcksTracker) Add(msgID messageID) {
 	t.negativeAcks[batchMsgID] = targetTime
 }
 
-func (t *negativeAcksTracker) track() {
+func (t *negativeAcksTracker) AddMessage(msg Message) {

Review comment:
       Because we need to get redeliveryCount through the Message interface

##########
File path: pulsar/negative_acks_tracker.go
##########
@@ -105,15 +158,13 @@ func (t *negativeAcksTracker) track() {
 					t.rc.Redeliver(msgIds)
 				}
 			}
-
 		}
 	}
 }
 
 func (t *negativeAcksTracker) Close() {
 	// allow Close() to be invoked multiple times by consumer_partition to avoid panic
 	t.doneOnce.Do(func() {
-		t.tick.Stop()

Review comment:
       In the current implementation situation, if we use the t.ticker in the struct, there will be a data race, so now we use the temporary variables of the ticker, and there is no good way to see how to close the temporarily created ticker.

##########
File path: pulsar/negative_backoff_policy.go
##########
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import "math"
+
+// NackBackoffPolicy is a interface for custom message negativeAcked policy, users can specify a NackBackoffPolicy
+// for a consumer.
+//
+// > Notice: the consumer crashes will trigger the redelivery of the unacked message, this case will not respect the
+// > NackBackoffPolicy, which means the message might get redelivered earlier than the delay time
+// > from the backoff.
+type NackBackoffPolicy interface {
+	// The redeliveryCount indicates the number of times the message was redelivered.
+	// We can get the redeliveryCount from the CommandMessage.
+	Next(redeliveryCount uint32) int64
+}
+
+// defaultNackBackoffPolicy is default impl for NackBackoffPolicy.
+type defaultNackBackoffPolicy struct{}
+
+func (nbp *defaultNackBackoffPolicy) Next(redeliveryCount uint32) int64 {
+	minNackTimeMs := int64(1000 * 10) // 10sec
+	maxNackTimeMs := 1000 * 60 * 10   // 10min
+
+	if redeliveryCount < 0 {
+		return minNackTimeMs
+	}
+
+	return int64(math.Min(math.Abs(float64(minNackTimeMs<<redeliveryCount)), float64(maxNackTimeMs)))

Review comment:
       Here we will first get the redeliveryCount object from the CommandMessage, and then start the << operation from minNackTimeMs to calculate the current length of time that nack needs to be executed, and then compare it with maxNackTimeMs, and take their maximum value as the nack duration. nack will increase from minNackTimeMs to maxNackTimeMs according to the above rule




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] zymap commented on a change in pull request #660: Support nack backoff policy for SDK

Posted by GitBox <gi...@apache.org>.
zymap commented on a change in pull request #660:
URL: https://github.com/apache/pulsar-client-go/pull/660#discussion_r742825857



##########
File path: pulsar/consumer_regex.go
##########
@@ -183,6 +183,22 @@ func (c *regexConsumer) AckID(msgID MessageID) {
 }
 
 func (c *regexConsumer) Nack(msg Message) {
+	if c.options.EnableDefaultNackBackoffPolicy || c.options.NackBackoffPolicy != nil {

Review comment:
       ```suggestion
   	if c.options.EnableDefaultNackBackoffPolicy && c.options.NackBackoffPolicy != nil {
   ```

##########
File path: pulsar/consumer_multitopic.go
##########
@@ -164,6 +164,22 @@ func (c *multiTopicConsumer) ReconsumeLater(msg Message, delay time.Duration) {
 }
 
 func (c *multiTopicConsumer) Nack(msg Message) {
+	if c.options.EnableDefaultNackBackoffPolicy || c.options.NackBackoffPolicy != nil {

Review comment:
       ```suggestion
   	if c.options.EnableDefaultNackBackoffPolicy && c.options.NackBackoffPolicy != nil {
   ```

##########
File path: pulsar/consumer_impl.go
##########
@@ -489,6 +495,20 @@ func (c *consumer) ReconsumeLater(msg Message, delay time.Duration) {
 }
 
 func (c *consumer) Nack(msg Message) {
+	if c.options.EnableDefaultNackBackoffPolicy || c.options.NackBackoffPolicy != nil {

Review comment:
       ```suggestion
   	if c.options.EnableDefaultNackBackoffPolicy && c.options.NackBackoffPolicy != nil {
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] cckellogg commented on a change in pull request #660: Support nack backoff policy for SDK

Posted by GitBox <gi...@apache.org>.
cckellogg commented on a change in pull request #660:
URL: https://github.com/apache/pulsar-client-go/pull/660#discussion_r744036158



##########
File path: pulsar/negative_backoff_policy.go
##########
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import "math"
+
+// NackBackoffPolicy is a interface for custom message negativeAcked policy, users can specify a NackBackoffPolicy
+// for a consumer.
+//
+// > Notice: the consumer crashes will trigger the redelivery of the unacked message, this case will not respect the
+// > NackBackoffPolicy, which means the message might get redelivered earlier than the delay time
+// > from the backoff.
+type NackBackoffPolicy interface {
+	// The redeliveryCount indicates the number of times the message was redelivered.
+	// We can get the redeliveryCount from the CommandMessage.
+	Next(redeliveryCount uint32) int64
+}
+
+// defaultNackBackoffPolicy is default impl for NackBackoffPolicy.
+type defaultNackBackoffPolicy struct{}
+
+func (nbp *defaultNackBackoffPolicy) Next(redeliveryCount uint32) int64 {
+	minNackTimeMs := int64(1000 * 10) // 10sec
+	maxNackTimeMs := 1000 * 60 * 10   // 10min
+
+	if redeliveryCount < 0 {
+		return minNackTimeMs
+	}
+
+	return int64(math.Min(math.Abs(float64(minNackTimeMs<<redeliveryCount)), float64(maxNackTimeMs)))

Review comment:
       Nit let's make these constants 
   ```
   minNackTimeMs := int64(1000 * 30) // 30sec
   maxNackTimeMs := 1000 * 60 * 10   // 10min
   ```
   and please add some comments about how this `int64(math.Min(math.Abs(float64(minNackTimeMs<<redeliveryCount)), float64(maxNackTimeMs)))` works

##########
File path: pulsar/consumer_impl.go
##########
@@ -489,6 +495,20 @@ func (c *consumer) ReconsumeLater(msg Message, delay time.Duration) {
 }
 
 func (c *consumer) Nack(msg Message) {
+	if c.options.EnableDefaultNackBackoffPolicy || c.options.NackBackoffPolicy != nil {

Review comment:
       Nit create a single variable at creation for checking
   c.useBackoffPolicy or something like that

##########
File path: pulsar/consumer.go
##########
@@ -158,6 +158,17 @@ type ConsumerOptions struct {
 
 	// Decryption decryption related fields to decrypt the encrypted message
 	Decryption *MessageDecryptionInfo
+
+	// If enabled, the default implementation of NackBackoffPolicy will be used to calculate the delay time of
+	// nack backoff, Default: false.
+	EnableDefaultNackBackoffPolicy bool

Review comment:
       To me a cleaner API is to just have `NackBackoffPolicy` and expose the basic/default policy. If the is policy is not set than it uses the current behavior. This way there is only 1 configuration knob to worry about.
   
   ```
   // current behavior
   ConsumerOpts{}
   
   // custom behavior
   ConsumerOpts{
     NackBackoffPolicy: pulsar.NewExpNackBackoffPolicy(),
   }
   ```
   
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wolfstudy commented on pull request #660: Support nack backoff policy for SDK

Posted by GitBox <gi...@apache.org>.
wolfstudy commented on pull request #660:
URL: https://github.com/apache/pulsar-client-go/pull/660#issuecomment-960555078


   ```
   ==================
   WARNING: DATA RACE
   Read at 0x00c000158578 by goroutine 447:
     github.com/apache/pulsar-client-go/pulsar.(*negativeAcksTracker).track()
         /pulsar-client-go/pulsar/negative_acks_tracker.go:142 +0x7dc
   
   Previous write at 0x00c000158578 by goroutine 291:
     github.com/apache/pulsar-client-go/pulsar.(*negativeAcksTracker).AddMessage()
         /pulsar-client-go/pulsar/negative_acks_tracker.go:103 +0x15e
     github.com/apache/pulsar-client-go/pulsar.TestNackBackoffTracker()
         /pulsar-client-go/pulsar/negative_acks_tracker_test.go:157 +0x124
     testing.tRunner()
         /usr/local/go/src/testing/testing.go:1123 +0x202
   
   Goroutine 447 (running) created at:
     github.com/apache/pulsar-client-go/pulsar.(*negativeAcksTracker).AddMessage()
         /pulsar-client-go/pulsar/negative_acks_tracker.go:108 +0x4a4
     github.com/apache/pulsar-client-go/pulsar.TestNackBackoffTracker()
         /pulsar-client-go/pulsar/negative_acks_tracker_test.go:156 +0xeb
     testing.tRunner()
         /usr/local/go/src/testing/testing.go:1123 +0x202
   
   Goroutine 291 (running) created at:
     testing.(*T).Run()
         /usr/local/go/src/testing/testing.go:1168 +0x5bb
     testing.runTests.func1()
         /usr/local/go/src/testing/testing.go:1439 +0xa6
     testing.tRunner()
         /usr/local/go/src/testing/testing.go:1123 +0x202
     testing.runTests()
         /usr/local/go/src/testing/testing.go:1437 +0x612
     testing.(*M).Run()
         /usr/local/go/src/testing/testing.go:1345 +0x3b3
     main.main()
         _testmain.go:467 +0x356
   ==================
   ==================
   WARNING: DATA RACE
   Read at 0x00c000a1ff40 by goroutine 447:
     github.com/apache/pulsar-client-go/pulsar.(*negativeAcksTracker).track()
         /pulsar-client-go/pulsar/negative_acks_tracker.go:142 +0x7fb
   
   Previous write at 0x00c000a1ff40 by goroutine 291:
     time.NewTicker()
         /usr/local/go/src/time/tick.go:30 +0x9a
     github.com/apache/pulsar-client-go/pulsar.(*negativeAcksTracker).AddMessage()
         /pulsar-client-go/pulsar/negative_acks_tracker.go:103 +0x134
     github.com/apache/pulsar-client-go/pulsar.TestNackBackoffTracker()
         /pulsar-client-go/pulsar/negative_acks_tracker_test.go:157 +0x124
     testing.tRunner()
         /usr/local/go/src/testing/testing.go:1123 +0x202
   
   Goroutine 447 (running) created at:
     github.com/apache/pulsar-client-go/pulsar.(*negativeAcksTracker).AddMessage()
         /pulsar-client-go/pulsar/negative_acks_tracker.go:108 +0x4a4
     github.com/apache/pulsar-client-go/pulsar.TestNackBackoffTracker()
         /pulsar-client-go/pulsar/negative_acks_tracker_test.go:156 +0xeb
     testing.tRunner()
         /usr/local/go/src/testing/testing.go:1123 +0x202
   
   Goroutine 291 (running) created at:
     testing.(*T).Run()
         /usr/local/go/src/testing/testing.go:1168 +0x5bb
     testing.runTests.func1()
         /usr/local/go/src/testing/testing.go:1439 +0xa6
     testing.tRunner()
         /usr/local/go/src/testing/testing.go:1123 +0x202
     testing.runTests()
         /usr/local/go/src/testing/testing.go:1437 +0x612
     testing.(*M).Run()
         /usr/local/go/src/testing/testing.go:1345 +0x3b3
     main.main()
         _testmain.go:467 +0x356
   ==================
   --- FAIL: TestNackBackoffTracker (0.40s)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wolfstudy commented on a change in pull request #660: Support nack backoff policy for SDK

Posted by GitBox <gi...@apache.org>.
wolfstudy commented on a change in pull request #660:
URL: https://github.com/apache/pulsar-client-go/pull/660#discussion_r743354627



##########
File path: pulsar/negative_backoff_policy.go
##########
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import "math"
+
+// NackBackoffPolicy is a interface for custom message negativeAcked policy, users can specify a NackBackoffPolicy
+// for a consumer.
+//
+// > Notice: the consumer crashes will trigger the redelivery of the unacked message, this case will not respect the
+// > NackBackoffPolicy, which means the message might get redelivered earlier than the delay time
+// > from the backoff.
+type NackBackoffPolicy interface {
+	// The redeliveryCount indicates the number of times the message was redelivered.
+	// We can get the redeliveryCount from the CommandMessage.
+	Next(redeliveryCount uint32) int64
+}
+
+// defaultNackBackoffPolicy is default impl for NackBackoffPolicy.
+type defaultNackBackoffPolicy struct{}
+
+func (nbp *defaultNackBackoffPolicy) Next(redeliveryCount uint32) int64 {
+	minNackTimeMs := int64(1000 * 10) // 10sec

Review comment:
       Because the << operation is required to cooperate with redeliveryCount, the unit conversion is still required, which will be converted to time.Duration in subsequent use.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wolfstudy commented on a change in pull request #660: Support nack backoff policy for SDK

Posted by GitBox <gi...@apache.org>.
wolfstudy commented on a change in pull request #660:
URL: https://github.com/apache/pulsar-client-go/pull/660#discussion_r743365098



##########
File path: pulsar/negative_acks_tracker.go
##########
@@ -105,15 +158,13 @@ func (t *negativeAcksTracker) track() {
 					t.rc.Redeliver(msgIds)
 				}
 			}
-
 		}
 	}
 }
 
 func (t *negativeAcksTracker) Close() {
 	// allow Close() to be invoked multiple times by consumer_partition to avoid panic
 	t.doneOnce.Do(func() {
-		t.tick.Stop()

Review comment:
       In the current implementation situation, if we use the t.ticker in the struct, there will be a data race, so now we use the temporary variables of the ticker, and there is no good way to see how to close the temporarily created ticker.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wolfstudy commented on a change in pull request #660: Support nack backoff policy for SDK

Posted by GitBox <gi...@apache.org>.
wolfstudy commented on a change in pull request #660:
URL: https://github.com/apache/pulsar-client-go/pull/660#discussion_r743352676



##########
File path: pulsar/negative_acks_tracker.go
##########
@@ -35,22 +35,41 @@ type negativeAcksTracker struct {
 	doneOnce     sync.Once
 	negativeAcks map[messageID]time.Time
 	rc           redeliveryConsumer
-	tick         *time.Ticker
+	nackBackoff  NackBackoffPolicy
+	trackFlag    bool
 	delay        time.Duration
 	log          log.Logger
 }
 
-func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration, logger log.Logger) *negativeAcksTracker {
-	t := &negativeAcksTracker{
-		doneCh:       make(chan interface{}),
-		negativeAcks: make(map[messageID]time.Time),
-		rc:           rc,
-		tick:         time.NewTicker(delay / 3),
-		delay:        delay,
-		log:          logger,
-	}
+func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration,
+	nackBackoffPolicy NackBackoffPolicy, logger log.Logger) *negativeAcksTracker {
+
+	t := new(negativeAcksTracker)

Review comment:
       They are the same effect




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] cckellogg commented on a change in pull request #660: Support nack backoff policy for SDK

Posted by GitBox <gi...@apache.org>.
cckellogg commented on a change in pull request #660:
URL: https://github.com/apache/pulsar-client-go/pull/660#discussion_r743308255



##########
File path: pulsar/negative_acks_tracker.go
##########
@@ -35,22 +35,41 @@ type negativeAcksTracker struct {
 	doneOnce     sync.Once
 	negativeAcks map[messageID]time.Time
 	rc           redeliveryConsumer
-	tick         *time.Ticker
+	nackBackoff  NackBackoffPolicy
+	trackFlag    bool
 	delay        time.Duration
 	log          log.Logger
 }
 
-func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration, logger log.Logger) *negativeAcksTracker {
-	t := &negativeAcksTracker{
-		doneCh:       make(chan interface{}),
-		negativeAcks: make(map[messageID]time.Time),
-		rc:           rc,
-		tick:         time.NewTicker(delay / 3),
-		delay:        delay,
-		log:          logger,
-	}
+func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration,
+	nackBackoffPolicy NackBackoffPolicy, logger log.Logger) *negativeAcksTracker {
+
+	t := new(negativeAcksTracker)

Review comment:
       this can just be `var t *negativeAcksTracker`

##########
File path: pulsar/negative_backoff_policy.go
##########
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import "math"
+
+// NackBackoffPolicy is a interface for custom message negativeAcked policy, users can specify a NackBackoffPolicy
+// for a consumer.
+//
+// > Notice: the consumer crashes will trigger the redelivery of the unacked message, this case will not respect the
+// > NackBackoffPolicy, which means the message might get redelivered earlier than the delay time
+// > from the backoff.
+type NackBackoffPolicy interface {
+	// The redeliveryCount indicates the number of times the message was redelivered.
+	// We can get the redeliveryCount from the CommandMessage.
+	Next(redeliveryCount uint32) int64
+}
+
+// defaultNackBackoffPolicy is default impl for NackBackoffPolicy.
+type defaultNackBackoffPolicy struct{}
+
+func (nbp *defaultNackBackoffPolicy) Next(redeliveryCount uint32) int64 {
+	minNackTimeMs := int64(1000 * 10) // 10sec

Review comment:
       Let's use the time.Duration constants 
   ```
   10 * time.Seconds
   10 * time.Minutes
   ```

##########
File path: pulsar/negative_acks_tracker.go
##########
@@ -76,14 +95,48 @@ func (t *negativeAcksTracker) Add(msgID messageID) {
 	t.negativeAcks[batchMsgID] = targetTime
 }
 
-func (t *negativeAcksTracker) track() {
+func (t *negativeAcksTracker) AddMessage(msg Message) {

Review comment:
       Why is there a new method here?
   
   Also, it looks like state is changing here without a lock. If multiple go routines call this at once multiple tracking routines could be started right? 
   
   Can the tracking go routine just be started at creation time?

##########
File path: pulsar/consumer.go
##########
@@ -158,6 +158,17 @@ type ConsumerOptions struct {
 
 	// Decryption decryption related fields to decrypt the encrypted message
 	Decryption *MessageDecryptionInfo
+
+	// If enabled, the default implementation of NackBackoffPolicy will be used to calculate the delay time of
+	// nack backoff, Default: false.
+	EnableDefaultNackBackoffPolicy bool

Review comment:
       Why is this needed `EnableDefaultNackBackoffPolicy`?. If the `NackBackoffPolicy` is not supplied we can just the default?

##########
File path: pulsar/negative_acks_tracker.go
##########
@@ -105,15 +158,13 @@ func (t *negativeAcksTracker) track() {
 					t.rc.Redeliver(msgIds)
 				}
 			}
-
 		}
 	}
 }
 
 func (t *negativeAcksTracker) Close() {
 	// allow Close() to be invoked multiple times by consumer_partition to avoid panic
 	t.doneOnce.Do(func() {
-		t.tick.Stop()

Review comment:
       How is the ticker getting cleanup now?

##########
File path: pulsar/negative_acks_tracker.go
##########
@@ -35,22 +35,41 @@ type negativeAcksTracker struct {
 	doneOnce     sync.Once
 	negativeAcks map[messageID]time.Time
 	rc           redeliveryConsumer
-	tick         *time.Ticker
+	nackBackoff  NackBackoffPolicy
+	trackFlag    bool
 	delay        time.Duration
 	log          log.Logger
 }
 
-func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration, logger log.Logger) *negativeAcksTracker {
-	t := &negativeAcksTracker{
-		doneCh:       make(chan interface{}),
-		negativeAcks: make(map[messageID]time.Time),
-		rc:           rc,
-		tick:         time.NewTicker(delay / 3),
-		delay:        delay,
-		log:          logger,
-	}
+func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration,
+	nackBackoffPolicy NackBackoffPolicy, logger log.Logger) *negativeAcksTracker {
+
+	t := new(negativeAcksTracker)
+
+	// When using NackBackoffPolicy, the delay time needs to be calculated based on the RedeliveryCount field in
+	// the CommandMessage, so for the original default Nack() logic, we still keep the negativeAcksTracker created
+	// when we open a gorutine to execute the logic of `t.track()`. But for the NackBackoffPolicy method, we need
+	// to execute the logic of `t.track()` when AddMessage().
+	if nackBackoffPolicy != nil {

Review comment:
       I'm a little confused on why we need an if statement. Shouldn't the default Implementation of the `NackBackoffPolicy` be what the current behavior is? The benefit of the interface is to simply the code and delegate the to the implementation.
   
   ```
   bp := nackBackoffPolicy
   if bp == nil {
     bp = newDefaultBackoffPolicy(delay)
   }
   t = &negativeAcksTracker{
   			doneCh:       make(chan interface{}),
   			negativeAcks: make(map[messageID]time.Time),
   			nackBackoff:  bp,
   			rc:           rc,
   			log:          logger,
   		}
   ```
   
    

##########
File path: pulsar/negative_backoff_policy.go
##########
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import "math"
+
+// NackBackoffPolicy is a interface for custom message negativeAcked policy, users can specify a NackBackoffPolicy
+// for a consumer.
+//
+// > Notice: the consumer crashes will trigger the redelivery of the unacked message, this case will not respect the
+// > NackBackoffPolicy, which means the message might get redelivered earlier than the delay time
+// > from the backoff.
+type NackBackoffPolicy interface {
+	// The redeliveryCount indicates the number of times the message was redelivered.
+	// We can get the redeliveryCount from the CommandMessage.
+	Next(redeliveryCount uint32) int64
+}
+
+// defaultNackBackoffPolicy is default impl for NackBackoffPolicy.
+type defaultNackBackoffPolicy struct{}
+
+func (nbp *defaultNackBackoffPolicy) Next(redeliveryCount uint32) int64 {
+	minNackTimeMs := int64(1000 * 10) // 10sec
+	maxNackTimeMs := 1000 * 60 * 10   // 10min
+
+	if redeliveryCount < 0 {
+		return minNackTimeMs
+	}
+
+	return int64(math.Min(math.Abs(float64(minNackTimeMs<<redeliveryCount)), float64(maxNackTimeMs)))

Review comment:
       Can you add some comments to what this logic is doing. For me it's difficult to look and just understand it.

##########
File path: pulsar/negative_acks_tracker.go
##########
@@ -35,22 +35,41 @@ type negativeAcksTracker struct {
 	doneOnce     sync.Once
 	negativeAcks map[messageID]time.Time
 	rc           redeliveryConsumer
-	tick         *time.Ticker
+	nackBackoff  NackBackoffPolicy
+	trackFlag    bool
 	delay        time.Duration
 	log          log.Logger
 }
 
-func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration, logger log.Logger) *negativeAcksTracker {
-	t := &negativeAcksTracker{
-		doneCh:       make(chan interface{}),
-		negativeAcks: make(map[messageID]time.Time),
-		rc:           rc,
-		tick:         time.NewTicker(delay / 3),
-		delay:        delay,
-		log:          logger,
-	}
+func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration,
+	nackBackoffPolicy NackBackoffPolicy, logger log.Logger) *negativeAcksTracker {
+
+	t := new(negativeAcksTracker)
+
+	// When using NackBackoffPolicy, the delay time needs to be calculated based on the RedeliveryCount field in
+	// the CommandMessage, so for the original default Nack() logic, we still keep the negativeAcksTracker created
+	// when we open a gorutine to execute the logic of `t.track()`. But for the NackBackoffPolicy method, we need
+	// to execute the logic of `t.track()` when AddMessage().
+	if nackBackoffPolicy != nil {

Review comment:
       I'm a little confused on why we need an if statement. Shouldn't the default Implementation of the `NackBackoffPolicy` be what the current behavior is? The benefit of the interface is to simply the code and delegate to the implementation.
   
   ```
   bp := nackBackoffPolicy
   if bp == nil {
     bp = newDefaultBackoffPolicy(delay)
   }
   t = &negativeAcksTracker{
   			doneCh:       make(chan interface{}),
   			negativeAcks: make(map[messageID]time.Time),
   			nackBackoff:  bp,
   			rc:           rc,
   			log:          logger,
   		}
   ```
   
    Thoughts?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wolfstudy commented on a change in pull request #660: Support nack backoff policy for SDK

Posted by GitBox <gi...@apache.org>.
wolfstudy commented on a change in pull request #660:
URL: https://github.com/apache/pulsar-client-go/pull/660#discussion_r743354627



##########
File path: pulsar/negative_backoff_policy.go
##########
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import "math"
+
+// NackBackoffPolicy is a interface for custom message negativeAcked policy, users can specify a NackBackoffPolicy
+// for a consumer.
+//
+// > Notice: the consumer crashes will trigger the redelivery of the unacked message, this case will not respect the
+// > NackBackoffPolicy, which means the message might get redelivered earlier than the delay time
+// > from the backoff.
+type NackBackoffPolicy interface {
+	// The redeliveryCount indicates the number of times the message was redelivered.
+	// We can get the redeliveryCount from the CommandMessage.
+	Next(redeliveryCount uint32) int64
+}
+
+// defaultNackBackoffPolicy is default impl for NackBackoffPolicy.
+type defaultNackBackoffPolicy struct{}
+
+func (nbp *defaultNackBackoffPolicy) Next(redeliveryCount uint32) int64 {
+	minNackTimeMs := int64(1000 * 10) // 10sec

Review comment:
       Because the `<<` operation is required to cooperate with redeliveryCount, the unit conversion is still required, which will be converted to time.Duration in subsequent use.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] cckellogg commented on a change in pull request #660: Support nack backoff policy for SDK

Posted by GitBox <gi...@apache.org>.
cckellogg commented on a change in pull request #660:
URL: https://github.com/apache/pulsar-client-go/pull/660#discussion_r744035593



##########
File path: pulsar/consumer_impl.go
##########
@@ -489,6 +495,20 @@ func (c *consumer) ReconsumeLater(msg Message, delay time.Duration) {
 }
 
 func (c *consumer) Nack(msg Message) {
+	if c.options.EnableDefaultNackBackoffPolicy || c.options.NackBackoffPolicy != nil {

Review comment:
       Nit create a single variable at creation for checking
   c.useBackoffPolicy or something like that




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wolfstudy merged pull request #660: Support nack backoff policy for SDK

Posted by GitBox <gi...@apache.org>.
wolfstudy merged pull request #660:
URL: https://github.com/apache/pulsar-client-go/pull/660


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] cckellogg commented on a change in pull request #660: Support nack backoff policy for SDK

Posted by GitBox <gi...@apache.org>.
cckellogg commented on a change in pull request #660:
URL: https://github.com/apache/pulsar-client-go/pull/660#discussion_r743308255



##########
File path: pulsar/negative_acks_tracker.go
##########
@@ -35,22 +35,41 @@ type negativeAcksTracker struct {
 	doneOnce     sync.Once
 	negativeAcks map[messageID]time.Time
 	rc           redeliveryConsumer
-	tick         *time.Ticker
+	nackBackoff  NackBackoffPolicy
+	trackFlag    bool
 	delay        time.Duration
 	log          log.Logger
 }
 
-func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration, logger log.Logger) *negativeAcksTracker {
-	t := &negativeAcksTracker{
-		doneCh:       make(chan interface{}),
-		negativeAcks: make(map[messageID]time.Time),
-		rc:           rc,
-		tick:         time.NewTicker(delay / 3),
-		delay:        delay,
-		log:          logger,
-	}
+func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration,
+	nackBackoffPolicy NackBackoffPolicy, logger log.Logger) *negativeAcksTracker {
+
+	t := new(negativeAcksTracker)

Review comment:
       this can just be `var t *negativeAcksTracker`

##########
File path: pulsar/negative_backoff_policy.go
##########
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import "math"
+
+// NackBackoffPolicy is a interface for custom message negativeAcked policy, users can specify a NackBackoffPolicy
+// for a consumer.
+//
+// > Notice: the consumer crashes will trigger the redelivery of the unacked message, this case will not respect the
+// > NackBackoffPolicy, which means the message might get redelivered earlier than the delay time
+// > from the backoff.
+type NackBackoffPolicy interface {
+	// The redeliveryCount indicates the number of times the message was redelivered.
+	// We can get the redeliveryCount from the CommandMessage.
+	Next(redeliveryCount uint32) int64
+}
+
+// defaultNackBackoffPolicy is default impl for NackBackoffPolicy.
+type defaultNackBackoffPolicy struct{}
+
+func (nbp *defaultNackBackoffPolicy) Next(redeliveryCount uint32) int64 {
+	minNackTimeMs := int64(1000 * 10) // 10sec

Review comment:
       Let's use the time.Duration constants 
   ```
   10 * time.Seconds
   10 * time.Minutes
   ```

##########
File path: pulsar/negative_acks_tracker.go
##########
@@ -76,14 +95,48 @@ func (t *negativeAcksTracker) Add(msgID messageID) {
 	t.negativeAcks[batchMsgID] = targetTime
 }
 
-func (t *negativeAcksTracker) track() {
+func (t *negativeAcksTracker) AddMessage(msg Message) {

Review comment:
       Why is there a new method here?
   
   Also, it looks like state is changing here without a lock. If multiple go routines call this at once multiple tracking routines could be started right? 
   
   Can the tracking go routine just be started at creation time?

##########
File path: pulsar/consumer.go
##########
@@ -158,6 +158,17 @@ type ConsumerOptions struct {
 
 	// Decryption decryption related fields to decrypt the encrypted message
 	Decryption *MessageDecryptionInfo
+
+	// If enabled, the default implementation of NackBackoffPolicy will be used to calculate the delay time of
+	// nack backoff, Default: false.
+	EnableDefaultNackBackoffPolicy bool

Review comment:
       Why is this needed `EnableDefaultNackBackoffPolicy`?. If the `NackBackoffPolicy` is not supplied we can just the default?

##########
File path: pulsar/negative_acks_tracker.go
##########
@@ -105,15 +158,13 @@ func (t *negativeAcksTracker) track() {
 					t.rc.Redeliver(msgIds)
 				}
 			}
-
 		}
 	}
 }
 
 func (t *negativeAcksTracker) Close() {
 	// allow Close() to be invoked multiple times by consumer_partition to avoid panic
 	t.doneOnce.Do(func() {
-		t.tick.Stop()

Review comment:
       How is the ticker getting cleanup now?

##########
File path: pulsar/negative_acks_tracker.go
##########
@@ -35,22 +35,41 @@ type negativeAcksTracker struct {
 	doneOnce     sync.Once
 	negativeAcks map[messageID]time.Time
 	rc           redeliveryConsumer
-	tick         *time.Ticker
+	nackBackoff  NackBackoffPolicy
+	trackFlag    bool
 	delay        time.Duration
 	log          log.Logger
 }
 
-func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration, logger log.Logger) *negativeAcksTracker {
-	t := &negativeAcksTracker{
-		doneCh:       make(chan interface{}),
-		negativeAcks: make(map[messageID]time.Time),
-		rc:           rc,
-		tick:         time.NewTicker(delay / 3),
-		delay:        delay,
-		log:          logger,
-	}
+func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration,
+	nackBackoffPolicy NackBackoffPolicy, logger log.Logger) *negativeAcksTracker {
+
+	t := new(negativeAcksTracker)
+
+	// When using NackBackoffPolicy, the delay time needs to be calculated based on the RedeliveryCount field in
+	// the CommandMessage, so for the original default Nack() logic, we still keep the negativeAcksTracker created
+	// when we open a gorutine to execute the logic of `t.track()`. But for the NackBackoffPolicy method, we need
+	// to execute the logic of `t.track()` when AddMessage().
+	if nackBackoffPolicy != nil {

Review comment:
       I'm a little confused on why we need an if statement. Shouldn't the default Implementation of the `NackBackoffPolicy` be what the current behavior is? The benefit of the interface is to simply the code and delegate the to the implementation.
   
   ```
   bp := nackBackoffPolicy
   if bp == nil {
     bp = newDefaultBackoffPolicy(delay)
   }
   t = &negativeAcksTracker{
   			doneCh:       make(chan interface{}),
   			negativeAcks: make(map[messageID]time.Time),
   			nackBackoff:  bp,
   			rc:           rc,
   			log:          logger,
   		}
   ```
   
    

##########
File path: pulsar/negative_backoff_policy.go
##########
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import "math"
+
+// NackBackoffPolicy is a interface for custom message negativeAcked policy, users can specify a NackBackoffPolicy
+// for a consumer.
+//
+// > Notice: the consumer crashes will trigger the redelivery of the unacked message, this case will not respect the
+// > NackBackoffPolicy, which means the message might get redelivered earlier than the delay time
+// > from the backoff.
+type NackBackoffPolicy interface {
+	// The redeliveryCount indicates the number of times the message was redelivered.
+	// We can get the redeliveryCount from the CommandMessage.
+	Next(redeliveryCount uint32) int64
+}
+
+// defaultNackBackoffPolicy is default impl for NackBackoffPolicy.
+type defaultNackBackoffPolicy struct{}
+
+func (nbp *defaultNackBackoffPolicy) Next(redeliveryCount uint32) int64 {
+	minNackTimeMs := int64(1000 * 10) // 10sec
+	maxNackTimeMs := 1000 * 60 * 10   // 10min
+
+	if redeliveryCount < 0 {
+		return minNackTimeMs
+	}
+
+	return int64(math.Min(math.Abs(float64(minNackTimeMs<<redeliveryCount)), float64(maxNackTimeMs)))

Review comment:
       Can you add some comments to what this logic is doing. For me it's difficult to look and just understand it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] cckellogg commented on a change in pull request #660: Support nack backoff policy for SDK

Posted by GitBox <gi...@apache.org>.
cckellogg commented on a change in pull request #660:
URL: https://github.com/apache/pulsar-client-go/pull/660#discussion_r743310068



##########
File path: pulsar/negative_acks_tracker.go
##########
@@ -35,22 +35,41 @@ type negativeAcksTracker struct {
 	doneOnce     sync.Once
 	negativeAcks map[messageID]time.Time
 	rc           redeliveryConsumer
-	tick         *time.Ticker
+	nackBackoff  NackBackoffPolicy
+	trackFlag    bool
 	delay        time.Duration
 	log          log.Logger
 }
 
-func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration, logger log.Logger) *negativeAcksTracker {
-	t := &negativeAcksTracker{
-		doneCh:       make(chan interface{}),
-		negativeAcks: make(map[messageID]time.Time),
-		rc:           rc,
-		tick:         time.NewTicker(delay / 3),
-		delay:        delay,
-		log:          logger,
-	}
+func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration,
+	nackBackoffPolicy NackBackoffPolicy, logger log.Logger) *negativeAcksTracker {
+
+	t := new(negativeAcksTracker)
+
+	// When using NackBackoffPolicy, the delay time needs to be calculated based on the RedeliveryCount field in
+	// the CommandMessage, so for the original default Nack() logic, we still keep the negativeAcksTracker created
+	// when we open a gorutine to execute the logic of `t.track()`. But for the NackBackoffPolicy method, we need
+	// to execute the logic of `t.track()` when AddMessage().
+	if nackBackoffPolicy != nil {

Review comment:
       I'm a little confused on why we need an if statement. Shouldn't the default Implementation of the `NackBackoffPolicy` be what the current behavior is? The benefit of the interface is to simply the code and delegate to the implementation.
   
   ```
   bp := nackBackoffPolicy
   if bp == nil {
     bp = newDefaultBackoffPolicy(delay)
   }
   t = &negativeAcksTracker{
   			doneCh:       make(chan interface{}),
   			negativeAcks: make(map[messageID]time.Time),
   			nackBackoff:  bp,
   			rc:           rc,
   			log:          logger,
   		}
   ```
   
    Thoughts?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wolfstudy commented on a change in pull request #660: Support nack backoff policy for SDK

Posted by GitBox <gi...@apache.org>.
wolfstudy commented on a change in pull request #660:
URL: https://github.com/apache/pulsar-client-go/pull/660#discussion_r743363997



##########
File path: pulsar/negative_acks_tracker.go
##########
@@ -76,14 +95,48 @@ func (t *negativeAcksTracker) Add(msgID messageID) {
 	t.negativeAcks[batchMsgID] = targetTime
 }
 
-func (t *negativeAcksTracker) track() {
+func (t *negativeAcksTracker) AddMessage(msg Message) {

Review comment:
       Because we need to get redeliveryCount through the Message interface




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wolfstudy commented on a change in pull request #660: Support nack backoff policy for SDK

Posted by GitBox <gi...@apache.org>.
wolfstudy commented on a change in pull request #660:
URL: https://github.com/apache/pulsar-client-go/pull/660#discussion_r743356923



##########
File path: pulsar/negative_backoff_policy.go
##########
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import "math"
+
+// NackBackoffPolicy is a interface for custom message negativeAcked policy, users can specify a NackBackoffPolicy
+// for a consumer.
+//
+// > Notice: the consumer crashes will trigger the redelivery of the unacked message, this case will not respect the
+// > NackBackoffPolicy, which means the message might get redelivered earlier than the delay time
+// > from the backoff.
+type NackBackoffPolicy interface {
+	// The redeliveryCount indicates the number of times the message was redelivered.
+	// We can get the redeliveryCount from the CommandMessage.
+	Next(redeliveryCount uint32) int64
+}
+
+// defaultNackBackoffPolicy is default impl for NackBackoffPolicy.
+type defaultNackBackoffPolicy struct{}
+
+func (nbp *defaultNackBackoffPolicy) Next(redeliveryCount uint32) int64 {
+	minNackTimeMs := int64(1000 * 10) // 10sec
+	maxNackTimeMs := 1000 * 60 * 10   // 10min
+
+	if redeliveryCount < 0 {
+		return minNackTimeMs
+	}
+
+	return int64(math.Min(math.Abs(float64(minNackTimeMs<<redeliveryCount)), float64(maxNackTimeMs)))

Review comment:
       Here we will first get the redeliveryCount object from the CommandMessage, and then start the << operation from minNackTimeMs to calculate the current length of time that nack needs to be executed, and then compare it with maxNackTimeMs, and take their maximum value as the nack duration. nack will increase from minNackTimeMs to maxNackTimeMs according to the above rule




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org