You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by cc...@apache.org on 2021/12/30 18:26:59 UTC

[pulsar-client-go] branch master updated: Add a parallel default_router bench test (#693)

This is an automated email from the ASF dual-hosted git repository.

cckellogg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 6d543c6  Add a parallel default_router bench test (#693)
6d543c6 is described below

commit 6d543c67574af812812fdbe8b76b307dc1574397
Author: dferstay <df...@users.noreply.github.com>
AuthorDate: Thu Dec 30 10:26:52 2021 -0800

    Add a parallel default_router bench test (#693)
    
    So that we can verify whether changes to the DefaultRouter have an impact
    on performance under concurrent load.
    
    Signed-off-by: Daniel Ferstay <df...@splunk.com>
    
    Co-authored-by: Daniel Ferstay <df...@splunk.com>
---
 pulsar/default_router_bench_test.go | 33 ++++++++++++++++++++++++++-------
 1 file changed, 26 insertions(+), 7 deletions(-)

diff --git a/pulsar/default_router_bench_test.go b/pulsar/default_router_bench_test.go
index d7ec175..d0c1cb9 100644
--- a/pulsar/default_router_bench_test.go
+++ b/pulsar/default_router_bench_test.go
@@ -29,17 +29,36 @@ var (
 )
 
 func BenchmarkDefaultRouter(b *testing.B) {
-	const (
-		numPartitions       = uint32(200)
-		maxBatchingMessages = 2000
-		maxBatchingSize     = 524288
-		maxBatchingDelay    = 100 * time.Millisecond
-	)
+	const numPartitions = uint32(200)
 	msg := &ProducerMessage{
 		Payload: []byte("message 1"),
 	}
-	router := NewDefaultRouter(internal.JavaStringHash, maxBatchingMessages, maxBatchingSize, maxBatchingDelay, false)
+	router := newBenchDefaultRouter()
+	b.ResetTimer()
 	for i := 0; i < b.N; i++ {
 		targetPartition = router(msg, numPartitions)
 	}
 }
+
+func BenchmarkDefaultRouterParallel(b *testing.B) {
+	const numPartitions = uint32(200)
+	msg := &ProducerMessage{
+		Payload: []byte("message 1"),
+	}
+	router := newBenchDefaultRouter()
+	b.ResetTimer()
+	b.RunParallel(func(pb *testing.PB) {
+		for pb.Next() {
+			targetPartition = router(msg, numPartitions)
+		}
+	})
+}
+
+func newBenchDefaultRouter() func(*ProducerMessage, uint32) int {
+	const (
+		maxBatchingMessages = 2000
+		maxBatchingSize     = 524288
+		maxBatchingDelay    = 100 * time.Millisecond
+	)
+	return NewDefaultRouter(internal.JavaStringHash, maxBatchingMessages, maxBatchingSize, maxBatchingDelay, false)
+}