You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2020/03/23 22:59:08 UTC
[pulsar-client-go] branch master updated: Fix Go Producer produce
to only one partition (#205)
This is an automated email from the ASF dual-hosted git repository.
jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new ac85b65 Fix Go Producer produce to only one partition (#205)
ac85b65 is described below
commit ac85b657a850a9c124bea9f4759cfc59e7559ebc
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Mon Mar 23 15:58:22 2020 -0700
Fix Go Producer produce to only one partition (#205)
* Fix Go Producer produce to only one partition
---
pulsar/consumer_test.go | 5 +--
pulsar/internal/default_router.go | 12 ++++--
pulsar/internal/default_router_test.go | 24 ++++++++++-
pulsar/producer_impl.go | 12 +++++-
pulsar/producer_partition.go | 2 -
pulsar/producer_test.go | 73 ++++++++++++++++++++++++++++++++--
pulsar/test_helper.go | 4 +-
7 files changed, 115 insertions(+), 17 deletions(-)
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 488ca59..05fab87 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -21,7 +21,6 @@ import (
"context"
"fmt"
"log"
- "net/http"
"testing"
"time"
@@ -327,7 +326,7 @@ func TestPartitionTopicsConsumerPubSub(t *testing.T) {
topic := "persistent://public/default/testGetPartitions"
testURL := adminURL + "/" + "admin/v2/persistent/public/default/testGetPartitions/partitions"
- makeHTTPCall(t, http.MethodPut, testURL, "64")
+ makeHTTPCall(t, testURL, "64")
// create producer
producer, err := client.CreateProducer(ProducerOptions{
@@ -411,7 +410,7 @@ func TestConsumerShared(t *testing.T) {
topic := "persistent://public/default/testMultiPartitionConsumerShared"
testURL := adminURL + "/" + "admin/v2/persistent/public/default/testMultiPartitionConsumerShared/partitions"
- makeHTTPCall(t, http.MethodPut, testURL, "3")
+ makeHTTPCall(t, testURL, "3")
sub := "sub-shared-1"
consumer1, err := client.Subscribe(ConsumerOptions{
diff --git a/pulsar/internal/default_router.go b/pulsar/internal/default_router.go
index e4d31ca..2cf4a90 100644
--- a/pulsar/internal/default_router.go
+++ b/pulsar/internal/default_router.go
@@ -19,6 +19,7 @@ package internal
import (
"math/rand"
+ "sync/atomic"
"time"
)
@@ -27,6 +28,7 @@ type defaultRouter struct {
shiftIdx uint32
maxBatchingDelay time.Duration
hashFunc func(string) uint32
+ msgCounter uint32
}
type Clock func() uint64
@@ -41,12 +43,13 @@ func NewSystemClock() Clock {
// NewDefaultRouter set the message routing mode for the partitioned producer.
// Default routing mode is round-robin routing.
func NewDefaultRouter(clock Clock, hashFunc func(string) uint32,
- maxBatchingDelay time.Duration) func(string, uint32) int {
+ maxBatchingDelay time.Duration, disableBatching bool) func(string, uint32) int {
state := &defaultRouter{
clock: clock,
shiftIdx: rand.Uint32(),
maxBatchingDelay: maxBatchingDelay,
hashFunc: hashFunc,
+ msgCounter: 0,
}
return func(key string, numPartitions uint32) int {
@@ -65,10 +68,13 @@ func NewDefaultRouter(clock Clock, hashFunc func(string) uint32,
// of batching of the messages.
//
//currentMs / maxBatchingDelayMs + startPtnIdx
- if maxBatchingDelay.Nanoseconds() != 0 {
+ if !disableBatching && maxBatchingDelay.Nanoseconds() > 0 {
n := uint32(state.clock()/uint64(maxBatchingDelay.Nanoseconds())) + state.shiftIdx
return int(n % numPartitions)
}
- return 0
+
+ p := int(state.msgCounter % numPartitions)
+ atomic.AddUint32(&state.msgCounter, 1)
+ return p
}
}
diff --git a/pulsar/internal/default_router_test.go b/pulsar/internal/default_router_test.go
index 7dc3e0b..980df92 100644
--- a/pulsar/internal/default_router_test.go
+++ b/pulsar/internal/default_router_test.go
@@ -30,7 +30,7 @@ func TestDefaultRouter(t *testing.T) {
router := NewDefaultRouter(func() uint64 {
return currentClock
- }, JavaStringHash, 10*time.Nanosecond)
+ }, JavaStringHash, 10*time.Nanosecond, false)
// partition index should not change with time
p1 := router("my-key", 100)
@@ -63,11 +63,31 @@ func TestDefaultRouter(t *testing.T) {
currentClock = 112
pr6 := router("", 100)
assert.Equal(t, pr5, pr6)
+
+ // test batching delay is 0
+ router = NewDefaultRouter(func() uint64 {
+ return currentClock
+ }, JavaStringHash, 0, true)
+
+ // should round robin partitions
+ for i := 0; i < 200; i++ {
+ assert.Equal(t, router("", 100), i%100)
+ }
+
+ // test batching is disabled
+ router = NewDefaultRouter(func() uint64 {
+ return currentClock
+ }, JavaStringHash, 10*time.Nanosecond, true)
+
+ // should round robin partitions
+ for i := 0; i < 200; i++ {
+ assert.Equal(t, router("", 100), i%100)
+ }
}
func TestDefaultRouterNoPartitions(t *testing.T) {
- router := NewDefaultRouter(NewSystemClock(), JavaStringHash, 10*time.Nanosecond)
+ router := NewDefaultRouter(NewSystemClock(), JavaStringHash, 10*time.Nanosecond, false)
// partition index should not change with time
p1 := router("", 1)
diff --git a/pulsar/producer_impl.go b/pulsar/producer_impl.go
index 83a6f75..67d0360 100644
--- a/pulsar/producer_impl.go
+++ b/pulsar/producer_impl.go
@@ -19,6 +19,7 @@ package pulsar
import (
"context"
+ "time"
"github.com/apache/pulsar-client-go/pulsar/internal"
)
@@ -30,6 +31,8 @@ type producer struct {
messageRouter func(*ProducerMessage, TopicMetadata) int
}
+const defaultBatchingMaxPublishDelay = 10 * time.Millisecond
+
func getHashingFunction(s HashingScheme) func(string) uint32 {
switch s {
case JavaStringHash:
@@ -51,11 +54,18 @@ func newProducer(client *client, options *ProducerOptions) (*producer, error) {
client: client,
}
+ var batchingMaxPublishDelay time.Duration
+ if options.BatchingMaxPublishDelay != 0 {
+ batchingMaxPublishDelay = options.BatchingMaxPublishDelay
+ } else {
+ batchingMaxPublishDelay = defaultBatchingMaxPublishDelay
+ }
+
if options.MessageRouter == nil {
internalRouter := internal.NewDefaultRouter(
internal.NewSystemClock(),
getHashingFunction(options.HashingScheme),
- options.BatchingMaxPublishDelay)
+ batchingMaxPublishDelay, options.DisableBatching)
p.messageRouter = func(message *ProducerMessage, metadata TopicMetadata) int {
return internalRouter(message.Key, metadata.NumPartitions())
}
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index f10adcf..58f512f 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -64,8 +64,6 @@ type partitionProducer struct {
partitionIdx int
}
-const defaultBatchingMaxPublishDelay = 10 * time.Millisecond
-
func newPartitionProducer(client *client, topic string, options *ProducerOptions, partitionIdx int) (
*partitionProducer, error) {
var batchingMaxPublishDelay time.Duration
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 66fa8a0..50eea77 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -20,7 +20,7 @@ package pulsar
import (
"context"
"fmt"
- "net/http"
+ "strconv"
"sync"
"testing"
"time"
@@ -351,11 +351,11 @@ func TestFlushInProducer(t *testing.T) {
}
func TestFlushInPartitionedProducer(t *testing.T) {
- topicName := "persistent://public/default/partition-testFlushInPartitionedProducer"
+ topicName := "public/default/partition-testFlushInPartitionedProducer"
// call admin api to make it partitioned
- url := adminURL + "/" + "admin/v2/" + topicName + "/partitions"
- makeHTTPCall(t, http.MethodPut, url, "5")
+ url := adminURL + "/" + "admin/v2/persistent/" + topicName + "/partitions"
+ makeHTTPCall(t, url, "5")
numberOfPartitions := 5
numOfMessages := 10
@@ -427,6 +427,71 @@ func TestFlushInPartitionedProducer(t *testing.T) {
assert.Equal(t, msgCount, numOfMessages/2)
}
+func TestRoundRobinRouterPartitionedProducer(t *testing.T) {
+ topicName := "public/default/partition-testRoundRobinRouterPartitionedProducer"
+ numberOfPartitions := 5
+
+ // call admin api to make it partitioned
+ url := adminURL + "/" + "admin/v2/persistent/" + topicName + "/partitions"
+ makeHTTPCall(t, url, strconv.Itoa(numberOfPartitions))
+
+ numOfMessages := 10
+ ctx := context.Background()
+
+ // creat client connection
+ client, err := NewClient(ClientOptions{
+ URL: serviceURL,
+ })
+ assert.NoError(t, err)
+ defer client.Close()
+
+ // create consumer
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topicName,
+ SubscriptionName: "my-sub",
+ Type: Exclusive,
+ })
+ assert.Nil(t, err)
+ defer consumer.Close()
+
+ // create producer
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topicName,
+ DisableBatching: true,
+ })
+ assert.Nil(t, err)
+ defer producer.Close()
+
+ // send 5 messages
+ prefix := "msg-"
+
+ for i := 0; i < numOfMessages; i++ {
+ messageContent := prefix + fmt.Sprintf("%d", i)
+ _, err = producer.Send(ctx, &ProducerMessage{
+ Payload: []byte(messageContent),
+ })
+ assert.Nil(t, err)
+ }
+
+ // Receive all messages
+ msgCount := 0
+ msgPartitionMap := make(map[string]int)
+ for i := 0; i < numOfMessages; i++ {
+ msg, err := consumer.Receive(ctx)
+ fmt.Printf("Received message msgId: %#v topic: %s-- content: '%s'\n",
+ msg.ID(), msg.Topic(), string(msg.Payload()))
+ assert.Nil(t, err)
+ consumer.Ack(msg)
+ msgCount++
+ msgPartitionMap[msg.Topic()]++
+ }
+ assert.Equal(t, msgCount, numOfMessages)
+ assert.Equal(t, numberOfPartitions, len(msgPartitionMap))
+ for _, count := range msgPartitionMap {
+ assert.Equal(t, count, numOfMessages/numberOfPartitions)
+ }
+}
+
func TestMessageRouter(t *testing.T) {
// Create topic with 5 partitions
err := httpPut("admin/v2/persistent/public/default/my-partitioned-topic/partitions", 5)
diff --git a/pulsar/test_helper.go b/pulsar/test_helper.go
index 3471bca..9d54f20 100644
--- a/pulsar/test_helper.go
+++ b/pulsar/test_helper.go
@@ -118,10 +118,10 @@ func httpDo(method string, requestPath string, in interface{}, out interface{})
return nil
}
-func makeHTTPCall(t *testing.T, method string, url string, body string) {
+func makeHTTPCall(t *testing.T, url string, body string) {
client := http.Client{}
- req, err := http.NewRequest(method, url, strings.NewReader(body))
+ req, err := http.NewRequest(http.MethodPut, url, strings.NewReader(body))
if err != nil {
t.Fatal(err)
}