You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/11/05 00:22:35 UTC

[GitHub] [pulsar-client-go] cckellogg commented on a change in pull request #660: Support nack backoff policy for SDK

cckellogg commented on a change in pull request #660:
URL: https://github.com/apache/pulsar-client-go/pull/660#discussion_r743308255



##########
File path: pulsar/negative_acks_tracker.go
##########
@@ -35,22 +35,41 @@ type negativeAcksTracker struct {
 	doneOnce     sync.Once
 	negativeAcks map[messageID]time.Time
 	rc           redeliveryConsumer
-	tick         *time.Ticker
+	nackBackoff  NackBackoffPolicy
+	trackFlag    bool
 	delay        time.Duration
 	log          log.Logger
 }
 
-func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration, logger log.Logger) *negativeAcksTracker {
-	t := &negativeAcksTracker{
-		doneCh:       make(chan interface{}),
-		negativeAcks: make(map[messageID]time.Time),
-		rc:           rc,
-		tick:         time.NewTicker(delay / 3),
-		delay:        delay,
-		log:          logger,
-	}
+func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration,
+	nackBackoffPolicy NackBackoffPolicy, logger log.Logger) *negativeAcksTracker {
+
+	t := new(negativeAcksTracker)

Review comment:
       this can just be `var t *negativeAcksTracker`

##########
File path: pulsar/negative_backoff_policy.go
##########
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import "math"
+
+// NackBackoffPolicy is a interface for custom message negativeAcked policy, users can specify a NackBackoffPolicy
+// for a consumer.
+//
+// > Notice: the consumer crashes will trigger the redelivery of the unacked message, this case will not respect the
+// > NackBackoffPolicy, which means the message might get redelivered earlier than the delay time
+// > from the backoff.
+type NackBackoffPolicy interface {
+	// The redeliveryCount indicates the number of times the message was redelivered.
+	// We can get the redeliveryCount from the CommandMessage.
+	Next(redeliveryCount uint32) int64
+}
+
+// defaultNackBackoffPolicy is default impl for NackBackoffPolicy.
+type defaultNackBackoffPolicy struct{}
+
+func (nbp *defaultNackBackoffPolicy) Next(redeliveryCount uint32) int64 {
+	minNackTimeMs := int64(1000 * 10) // 10sec

Review comment:
       Let's use the time.Duration constants 
   ```
   10 * time.Seconds
   10 * time.Minutes
   ```

##########
File path: pulsar/negative_acks_tracker.go
##########
@@ -76,14 +95,48 @@ func (t *negativeAcksTracker) Add(msgID messageID) {
 	t.negativeAcks[batchMsgID] = targetTime
 }
 
-func (t *negativeAcksTracker) track() {
+func (t *negativeAcksTracker) AddMessage(msg Message) {

Review comment:
       Why is there a new method here?
   
   Also, it looks like state is changing here without a lock. If multiple go routines call this at once multiple tracking routines could be started right? 
   
   Can the tracking go routine just be started at creation time?

##########
File path: pulsar/consumer.go
##########
@@ -158,6 +158,17 @@ type ConsumerOptions struct {
 
 	// Decryption decryption related fields to decrypt the encrypted message
 	Decryption *MessageDecryptionInfo
+
+	// If enabled, the default implementation of NackBackoffPolicy will be used to calculate the delay time of
+	// nack backoff, Default: false.
+	EnableDefaultNackBackoffPolicy bool

Review comment:
       Why is this needed `EnableDefaultNackBackoffPolicy`?. If the `NackBackoffPolicy` is not supplied we can just the default?

##########
File path: pulsar/negative_acks_tracker.go
##########
@@ -105,15 +158,13 @@ func (t *negativeAcksTracker) track() {
 					t.rc.Redeliver(msgIds)
 				}
 			}
-
 		}
 	}
 }
 
 func (t *negativeAcksTracker) Close() {
 	// allow Close() to be invoked multiple times by consumer_partition to avoid panic
 	t.doneOnce.Do(func() {
-		t.tick.Stop()

Review comment:
       How is the ticker getting cleanup now?

##########
File path: pulsar/negative_acks_tracker.go
##########
@@ -35,22 +35,41 @@ type negativeAcksTracker struct {
 	doneOnce     sync.Once
 	negativeAcks map[messageID]time.Time
 	rc           redeliveryConsumer
-	tick         *time.Ticker
+	nackBackoff  NackBackoffPolicy
+	trackFlag    bool
 	delay        time.Duration
 	log          log.Logger
 }
 
-func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration, logger log.Logger) *negativeAcksTracker {
-	t := &negativeAcksTracker{
-		doneCh:       make(chan interface{}),
-		negativeAcks: make(map[messageID]time.Time),
-		rc:           rc,
-		tick:         time.NewTicker(delay / 3),
-		delay:        delay,
-		log:          logger,
-	}
+func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration,
+	nackBackoffPolicy NackBackoffPolicy, logger log.Logger) *negativeAcksTracker {
+
+	t := new(negativeAcksTracker)
+
+	// When using NackBackoffPolicy, the delay time needs to be calculated based on the RedeliveryCount field in
+	// the CommandMessage, so for the original default Nack() logic, we still keep the negativeAcksTracker created
+	// when we open a gorutine to execute the logic of `t.track()`. But for the NackBackoffPolicy method, we need
+	// to execute the logic of `t.track()` when AddMessage().
+	if nackBackoffPolicy != nil {

Review comment:
       I'm a little confused on why we need an if statement. Shouldn't the default Implementation of the `NackBackoffPolicy` be what the current behavior is? The benefit of the interface is to simply the code and delegate the to the implementation.
   
   ```
   bp := nackBackoffPolicy
   if bp == nil {
     bp = newDefaultBackoffPolicy(delay)
   }
   t = &negativeAcksTracker{
   			doneCh:       make(chan interface{}),
   			negativeAcks: make(map[messageID]time.Time),
   			nackBackoff:  bp,
   			rc:           rc,
   			log:          logger,
   		}
   ```
   
    

##########
File path: pulsar/negative_backoff_policy.go
##########
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import "math"
+
+// NackBackoffPolicy is a interface for custom message negativeAcked policy, users can specify a NackBackoffPolicy
+// for a consumer.
+//
+// > Notice: the consumer crashes will trigger the redelivery of the unacked message, this case will not respect the
+// > NackBackoffPolicy, which means the message might get redelivered earlier than the delay time
+// > from the backoff.
+type NackBackoffPolicy interface {
+	// The redeliveryCount indicates the number of times the message was redelivered.
+	// We can get the redeliveryCount from the CommandMessage.
+	Next(redeliveryCount uint32) int64
+}
+
+// defaultNackBackoffPolicy is default impl for NackBackoffPolicy.
+type defaultNackBackoffPolicy struct{}
+
+func (nbp *defaultNackBackoffPolicy) Next(redeliveryCount uint32) int64 {
+	minNackTimeMs := int64(1000 * 10) // 10sec
+	maxNackTimeMs := 1000 * 60 * 10   // 10min
+
+	if redeliveryCount < 0 {
+		return minNackTimeMs
+	}
+
+	return int64(math.Min(math.Abs(float64(minNackTimeMs<<redeliveryCount)), float64(maxNackTimeMs)))

Review comment:
       Can you add some comments to what this logic is doing. For me it's difficult to look and just understand it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org