You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/07/13 18:32:02 UTC
[pulsar-client-go] branch master updated: [issue #752] replace go-rate rate limiter with a buffered channel implementation (#799)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 6a8e7f3 [issue #752] replace go-rate rate limiter with a buffered channel implementation (#799)
6a8e7f3 is described below
commit 6a8e7f39aac100a285a2c190186e38b73a5c9d34
Author: ming <it...@gmail.com>
AuthorDate: Wed Jul 13 14:31:57 2022 -0400
[issue #752] replace go-rate rate limiter with a buffered channel implementation (#799)
* replace go-rate rate limiter with channel implementation
* fix linter
* update based on review comments
* stop ratelimiter goroutine if the rate is unthrottled
---
go.mod | 1 -
go.sum | 2 --
perf/perf-producer.go | 23 +++++++++++++----------
3 files changed, 13 insertions(+), 13 deletions(-)
diff --git a/go.mod b/go.mod
index ec6d810..5c2c33a 100644
--- a/go.mod
+++ b/go.mod
@@ -6,7 +6,6 @@ require (
github.com/AthenZ/athenz v1.10.39
github.com/DataDog/zstd v1.5.0
github.com/apache/pulsar-client-go/oauth2 v0.0.0-20220120090717-25e59572242e
- github.com/beefsack/go-rate v0.0.0-20220214233405-116f4ca011a0
github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b
github.com/davecgh/go-spew v1.1.1
github.com/gogo/protobuf v1.3.2
diff --git a/go.sum b/go.sum
index 4eb4433..9f2ac9c 100644
--- a/go.sum
+++ b/go.sum
@@ -60,8 +60,6 @@ github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hC
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
github.com/aws/aws-sdk-go v1.32.6/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0=
-github.com/beefsack/go-rate v0.0.0-20220214233405-116f4ca011a0 h1:0b2vaepXIfMsG++IsjHiI2p4bxALD1Y2nQKGMR5zDQM=
-github.com/beefsack/go-rate v0.0.0-20220214233405-116f4ca011a0/go.mod h1:6YNgTHLutezwnBvyneBbwvB8C82y3dcoOj5EQJIdGXA=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
diff --git a/perf/perf-producer.go b/perf/perf-producer.go
index 3ffa7c0..0ee0083 100644
--- a/perf/perf-producer.go
+++ b/perf/perf-producer.go
@@ -22,7 +22,6 @@ import (
"encoding/json"
"time"
- "github.com/beefsack/go-rate"
"github.com/bmizerany/perks/quantile"
"github.com/spf13/cobra"
@@ -101,13 +100,18 @@ func produce(produceArgs *ProduceArgs, stop <-chan struct{}) {
payload := make([]byte, produceArgs.MessageSize)
ch := make(chan float64)
-
- go func(stopCh <-chan struct{}) {
- var rateLimiter *rate.RateLimiter
- if produceArgs.Rate > 0 {
- rateLimiter = rate.New(produceArgs.Rate, time.Second)
+ rateLimitCh := make(chan time.Time, produceArgs.Rate)
+ go func(rateLimit int, interval time.Duration) {
+ if rateLimit <= 0 { // 0 as no limit enforced
+ return
+ }
+ for {
+ oldest := <-rateLimitCh
+ time.Sleep(interval - time.Since(oldest))
}
+ }(produceArgs.Rate, time.Second)
+ go func(stopCh <-chan struct{}) {
for {
select {
case <-stopCh:
@@ -115,11 +119,10 @@ func produce(produceArgs *ProduceArgs, stop <-chan struct{}) {
default:
}
- if rateLimiter != nil {
- rateLimiter.Wait()
- }
-
start := time.Now()
+ if produceArgs.Rate > 0 {
+ rateLimitCh <- start
+ }
producer.SendAsync(ctx, &pulsar.ProducerMessage{
Payload: payload,