You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/07/01 15:25:17 UTC

[GitHub] [pulsar-client-go] pgier commented on a diff in pull request #799: [issue #752] replace go-rate rate limiter with a buffered channel implementation

pgier commented on code in PR #799:
URL: https://github.com/apache/pulsar-client-go/pull/799#discussion_r912050134


##########
perf/perf-producer.go:
##########
@@ -101,25 +100,26 @@ func produce(produceArgs *ProduceArgs, stop <-chan struct{}) {
 	payload := make([]byte, produceArgs.MessageSize)
 
 	ch := make(chan float64)
-
-	go func(stopCh <-chan struct{}) {
-		var rateLimiter *rate.RateLimiter
-		if produceArgs.Rate > 0 {
-			rateLimiter = rate.New(produceArgs.Rate, time.Second)
+	rateLimitCh := make(chan time.Time, produceArgs.Rate)

Review Comment:
   Since the buffer size here is proportional to the rate, should there be a limit on the max size?  Is there a way to do this with a fixed buffer size?



##########
perf/perf-producer.go:
##########
@@ -101,25 +100,26 @@ func produce(produceArgs *ProduceArgs, stop <-chan struct{}) {
 	payload := make([]byte, produceArgs.MessageSize)
 
 	ch := make(chan float64)
-
-	go func(stopCh <-chan struct{}) {
-		var rateLimiter *rate.RateLimiter
-		if produceArgs.Rate > 0 {
-			rateLimiter = rate.New(produceArgs.Rate, time.Second)
+	rateLimitCh := make(chan time.Time, produceArgs.Rate)
+	go func(rateLimit int, interval time.Duration) {
+		for {
+			last := <-rateLimitCh
+			if rateLimit > 0 { // 0 is defined as no limit enforced
+				time.Sleep(interval - time.Since(last))
+			}
 		}
+	}(produceArgs.Rate, time.Second)
 
+	go func(stopCh <-chan struct{}) {
 		for {
 			select {
 			case <-stopCh:
 				return
 			default:
 			}
 
-			if rateLimiter != nil {
-				rateLimiter.Wait()
-			}
-
 			start := time.Now()
+			rateLimitCh <- start

Review Comment:
   I think this should be skipped completely if there is no limit set.



##########
perf/perf-producer.go:
##########
@@ -101,25 +100,26 @@ func produce(produceArgs *ProduceArgs, stop <-chan struct{}) {
 	payload := make([]byte, produceArgs.MessageSize)
 
 	ch := make(chan float64)
-
-	go func(stopCh <-chan struct{}) {
-		var rateLimiter *rate.RateLimiter
-		if produceArgs.Rate > 0 {
-			rateLimiter = rate.New(produceArgs.Rate, time.Second)
+	rateLimitCh := make(chan time.Time, produceArgs.Rate)
+	go func(rateLimit int, interval time.Duration) {
+		for {
+			last := <-rateLimitCh

Review Comment:
   `last` makes me think this is coming from the back of the queue, maybe change `last` to `oldest`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org