You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/07/13 18:32:02 UTC

[pulsar-client-go] branch master updated: [issue #752] replace go-rate rate limiter with a buffered channel implementation (#799)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 6a8e7f3  [issue #752] replace go-rate rate limiter with a buffered channel implementation (#799)
6a8e7f3 is described below

commit 6a8e7f39aac100a285a2c190186e38b73a5c9d34
Author: ming <it...@gmail.com>
AuthorDate: Wed Jul 13 14:31:57 2022 -0400

    [issue #752] replace go-rate rate limiter with a buffered channel implementation (#799)
    
    * replace go-rate rate limiter with channel implementation
    
    * fix linter
    
    * update based on review comments
    
    * stop ratelimiter goroutine if the rate is unthrottled
---
 go.mod                |  1 -
 go.sum                |  2 --
 perf/perf-producer.go | 23 +++++++++++++----------
 3 files changed, 13 insertions(+), 13 deletions(-)

diff --git a/go.mod b/go.mod
index ec6d810..5c2c33a 100644
--- a/go.mod
+++ b/go.mod
@@ -6,7 +6,6 @@ require (
 	github.com/AthenZ/athenz v1.10.39
 	github.com/DataDog/zstd v1.5.0
 	github.com/apache/pulsar-client-go/oauth2 v0.0.0-20220120090717-25e59572242e
-	github.com/beefsack/go-rate v0.0.0-20220214233405-116f4ca011a0
 	github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b
 	github.com/davecgh/go-spew v1.1.1
 	github.com/gogo/protobuf v1.3.2
diff --git a/go.sum b/go.sum
index 4eb4433..9f2ac9c 100644
--- a/go.sum
+++ b/go.sum
@@ -60,8 +60,6 @@ github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hC
 github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
 github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
 github.com/aws/aws-sdk-go v1.32.6/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0=
-github.com/beefsack/go-rate v0.0.0-20220214233405-116f4ca011a0 h1:0b2vaepXIfMsG++IsjHiI2p4bxALD1Y2nQKGMR5zDQM=
-github.com/beefsack/go-rate v0.0.0-20220214233405-116f4ca011a0/go.mod h1:6YNgTHLutezwnBvyneBbwvB8C82y3dcoOj5EQJIdGXA=
 github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
 github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
 github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
diff --git a/perf/perf-producer.go b/perf/perf-producer.go
index 3ffa7c0..0ee0083 100644
--- a/perf/perf-producer.go
+++ b/perf/perf-producer.go
@@ -22,7 +22,6 @@ import (
 	"encoding/json"
 	"time"
 
-	"github.com/beefsack/go-rate"
 	"github.com/bmizerany/perks/quantile"
 	"github.com/spf13/cobra"
 
@@ -101,13 +100,18 @@ func produce(produceArgs *ProduceArgs, stop <-chan struct{}) {
 	payload := make([]byte, produceArgs.MessageSize)
 
 	ch := make(chan float64)
-
-	go func(stopCh <-chan struct{}) {
-		var rateLimiter *rate.RateLimiter
-		if produceArgs.Rate > 0 {
-			rateLimiter = rate.New(produceArgs.Rate, time.Second)
+	rateLimitCh := make(chan time.Time, produceArgs.Rate)
+	go func(rateLimit int, interval time.Duration) {
+		if rateLimit <= 0 { // 0 as no limit enforced
+			return
+		}
+		for {
+			oldest := <-rateLimitCh
+			time.Sleep(interval - time.Since(oldest))
 		}
+	}(produceArgs.Rate, time.Second)
 
+	go func(stopCh <-chan struct{}) {
 		for {
 			select {
 			case <-stopCh:
@@ -115,11 +119,10 @@ func produce(produceArgs *ProduceArgs, stop <-chan struct{}) {
 			default:
 			}
 
-			if rateLimiter != nil {
-				rateLimiter.Wait()
-			}
-
 			start := time.Now()
+			if produceArgs.Rate > 0 {
+				rateLimitCh <- start
+			}
 
 			producer.SendAsync(ctx, &pulsar.ProducerMessage{
 				Payload: payload,