You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/06/30 01:14:25 UTC

[GitHub] [pulsar-client-go] zzzming opened a new pull request, #799: [issue #752] replace go-rate rate limiter with a buffered channel implementation

zzzming opened a new pull request, #799:
URL: https://github.com/apache/pulsar-client-go/pull/799

   Fixes #752 
   
   ### Motivation
   
   The go rate limiter library, beefsack/go-rate, is an MIT license. We have to updated the license header. The PR removes this dependency.
   
   ### Modifications
   
   In Go, it is rather simple to implement a rate limiter directly. A buffered channel is used to store a request timestamp as message. The channel receiver check each timestamp (in the message) to sleep on the remaining time duration. 
   
   The per second rate is the size of channel buffer. If the buffer is full, the main producing go routine blocks on producing Pulsar message, until the sleep expires and move to on the next message in the channel thus free up a slot in the channel.
   
   It resembles leaky bucket rate limiting implementation. Because of leveraging go channel, it uses 10 lines of code and remove the dependency entirely.
   
   ### Verifying this change
   
   Since the change only affects the perf utility, I built the tool and verify the producing rate is accurate.
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If `yes` was chosen, please highlight the changes*
   
     - Dependencies (does it add or upgrade a dependency): (yes)
         Removes beefsak/go-rate dependency
     - The public API: ( no)
     - The schema: (no)
     - The default values of configurations: (no)
     - The wire protocol: (no)
   
   ### Documentation
   
     - Does this pull request introduce a new feature? (no)
     - If yes, how is the feature documented? (not applicable / docs / GoDocs / not documented)
     - If a feature is not applicable for documentation, explain why?
     - If a feature is not documented yet in this PR, please create a followup issue for adding the documentation
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] pgier commented on a diff in pull request #799: [issue #752] replace go-rate rate limiter with a buffered channel implementation

Posted by GitBox <gi...@apache.org>.
pgier commented on code in PR #799:
URL: https://github.com/apache/pulsar-client-go/pull/799#discussion_r912050134


##########
perf/perf-producer.go:
##########
@@ -101,25 +100,26 @@ func produce(produceArgs *ProduceArgs, stop <-chan struct{}) {
 	payload := make([]byte, produceArgs.MessageSize)
 
 	ch := make(chan float64)
-
-	go func(stopCh <-chan struct{}) {
-		var rateLimiter *rate.RateLimiter
-		if produceArgs.Rate > 0 {
-			rateLimiter = rate.New(produceArgs.Rate, time.Second)
+	rateLimitCh := make(chan time.Time, produceArgs.Rate)

Review Comment:
   Since the buffer size here is proportional to the rate, should there be a limit on the max size?  Is there a way to do this with a fixed buffer size?



##########
perf/perf-producer.go:
##########
@@ -101,25 +100,26 @@ func produce(produceArgs *ProduceArgs, stop <-chan struct{}) {
 	payload := make([]byte, produceArgs.MessageSize)
 
 	ch := make(chan float64)
-
-	go func(stopCh <-chan struct{}) {
-		var rateLimiter *rate.RateLimiter
-		if produceArgs.Rate > 0 {
-			rateLimiter = rate.New(produceArgs.Rate, time.Second)
+	rateLimitCh := make(chan time.Time, produceArgs.Rate)
+	go func(rateLimit int, interval time.Duration) {
+		for {
+			last := <-rateLimitCh
+			if rateLimit > 0 { // 0 is defined as no limit enforced
+				time.Sleep(interval - time.Since(last))
+			}
 		}
+	}(produceArgs.Rate, time.Second)
 
+	go func(stopCh <-chan struct{}) {
 		for {
 			select {
 			case <-stopCh:
 				return
 			default:
 			}
 
-			if rateLimiter != nil {
-				rateLimiter.Wait()
-			}
-
 			start := time.Now()
+			rateLimitCh <- start

Review Comment:
   I think this should be skipped completely if there is no limit set.



##########
perf/perf-producer.go:
##########
@@ -101,25 +100,26 @@ func produce(produceArgs *ProduceArgs, stop <-chan struct{}) {
 	payload := make([]byte, produceArgs.MessageSize)
 
 	ch := make(chan float64)
-
-	go func(stopCh <-chan struct{}) {
-		var rateLimiter *rate.RateLimiter
-		if produceArgs.Rate > 0 {
-			rateLimiter = rate.New(produceArgs.Rate, time.Second)
+	rateLimitCh := make(chan time.Time, produceArgs.Rate)
+	go func(rateLimit int, interval time.Duration) {
+		for {
+			last := <-rateLimitCh

Review Comment:
   `last` makes me think this is coming from the back of the queue, maybe change `last` to `oldest`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] merlimat merged pull request #799: [issue #752] replace go-rate rate limiter with a buffered channel implementation

Posted by GitBox <gi...@apache.org>.
merlimat merged PR #799:
URL: https://github.com/apache/pulsar-client-go/pull/799


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] zzzming commented on a diff in pull request #799: [issue #752] replace go-rate rate limiter with a buffered channel implementation

Posted by GitBox <gi...@apache.org>.
zzzming commented on code in PR #799:
URL: https://github.com/apache/pulsar-client-go/pull/799#discussion_r912071823


##########
perf/perf-producer.go:
##########
@@ -101,25 +100,26 @@ func produce(produceArgs *ProduceArgs, stop <-chan struct{}) {
 	payload := make([]byte, produceArgs.MessageSize)
 
 	ch := make(chan float64)
-
-	go func(stopCh <-chan struct{}) {
-		var rateLimiter *rate.RateLimiter
-		if produceArgs.Rate > 0 {
-			rateLimiter = rate.New(produceArgs.Rate, time.Second)
+	rateLimitCh := make(chan time.Time, produceArgs.Rate)

Review Comment:
   The buffer size is the rate per second. It is a leaky bucket rate limiter implementation. So the channel buffer should hold the same number of rate allowed. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] pgier commented on a diff in pull request #799: [issue #752] replace go-rate rate limiter with a buffered channel implementation

Posted by GitBox <gi...@apache.org>.
pgier commented on code in PR #799:
URL: https://github.com/apache/pulsar-client-go/pull/799#discussion_r912109886


##########
perf/perf-producer.go:
##########
@@ -101,25 +100,28 @@ func produce(produceArgs *ProduceArgs, stop <-chan struct{}) {
 	payload := make([]byte, produceArgs.MessageSize)
 
 	ch := make(chan float64)
-
-	go func(stopCh <-chan struct{}) {
-		var rateLimiter *rate.RateLimiter
-		if produceArgs.Rate > 0 {
-			rateLimiter = rate.New(produceArgs.Rate, time.Second)
+	rateLimitCh := make(chan time.Time, produceArgs.Rate)
+	go func(rateLimit int, interval time.Duration) {
+		for {
+			oldest := <-rateLimitCh
+			if rateLimit > 0 { // 0 is defined as no limit enforced
+				time.Sleep(interval - time.Since(oldest))
+			}
 		}
+	}(produceArgs.Rate, time.Second)

Review Comment:
   Can this function also be skipped if Rate is zero?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-go] pgier commented on pull request #799: [issue #752] replace go-rate rate limiter with a buffered channel implementation

Posted by GitBox <gi...@apache.org>.
pgier commented on PR #799:
URL: https://github.com/apache/pulsar-client-go/pull/799#issuecomment-1172528184

   @merlimat I think the idea @zzzming had was that removing the dependency also solves the license issue, and it's about the same amount of code anyway.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org