You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2020/03/23 22:59:08 UTC

[pulsar-client-go] branch master updated: Fix Go Producer produce to only one partition (#205)

This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new ac85b65  Fix Go Producer produce to only one partition (#205)
ac85b65 is described below

commit ac85b657a850a9c124bea9f4759cfc59e7559ebc
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Mon Mar 23 15:58:22 2020 -0700

    Fix Go Producer produce to only one partition (#205)
    
    * Fix Go Producer produce to only one partition
---
 pulsar/consumer_test.go                |  5 +--
 pulsar/internal/default_router.go      | 12 ++++--
 pulsar/internal/default_router_test.go | 24 ++++++++++-
 pulsar/producer_impl.go                | 12 +++++-
 pulsar/producer_partition.go           |  2 -
 pulsar/producer_test.go                | 73 ++++++++++++++++++++++++++++++++--
 pulsar/test_helper.go                  |  4 +-
 7 files changed, 115 insertions(+), 17 deletions(-)

diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 488ca59..05fab87 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -21,7 +21,6 @@ import (
 	"context"
 	"fmt"
 	"log"
-	"net/http"
 	"testing"
 	"time"
 
@@ -327,7 +326,7 @@ func TestPartitionTopicsConsumerPubSub(t *testing.T) {
 	topic := "persistent://public/default/testGetPartitions"
 	testURL := adminURL + "/" + "admin/v2/persistent/public/default/testGetPartitions/partitions"
 
-	makeHTTPCall(t, http.MethodPut, testURL, "64")
+	makeHTTPCall(t, testURL, "64")
 
 	// create producer
 	producer, err := client.CreateProducer(ProducerOptions{
@@ -411,7 +410,7 @@ func TestConsumerShared(t *testing.T) {
 	topic := "persistent://public/default/testMultiPartitionConsumerShared"
 	testURL := adminURL + "/" + "admin/v2/persistent/public/default/testMultiPartitionConsumerShared/partitions"
 
-	makeHTTPCall(t, http.MethodPut, testURL, "3")
+	makeHTTPCall(t, testURL, "3")
 
 	sub := "sub-shared-1"
 	consumer1, err := client.Subscribe(ConsumerOptions{
diff --git a/pulsar/internal/default_router.go b/pulsar/internal/default_router.go
index e4d31ca..2cf4a90 100644
--- a/pulsar/internal/default_router.go
+++ b/pulsar/internal/default_router.go
@@ -19,6 +19,7 @@ package internal
 
 import (
 	"math/rand"
+	"sync/atomic"
 	"time"
 )
 
@@ -27,6 +28,7 @@ type defaultRouter struct {
 	shiftIdx         uint32
 	maxBatchingDelay time.Duration
 	hashFunc         func(string) uint32
+	msgCounter       uint32
 }
 
 type Clock func() uint64
@@ -41,12 +43,13 @@ func NewSystemClock() Clock {
 // NewDefaultRouter set the message routing mode for the partitioned producer.
 // Default routing mode is round-robin routing.
 func NewDefaultRouter(clock Clock, hashFunc func(string) uint32,
-	maxBatchingDelay time.Duration) func(string, uint32) int {
+	maxBatchingDelay time.Duration, disableBatching bool) func(string, uint32) int {
 	state := &defaultRouter{
 		clock:            clock,
 		shiftIdx:         rand.Uint32(),
 		maxBatchingDelay: maxBatchingDelay,
 		hashFunc:         hashFunc,
+		msgCounter:       0,
 	}
 
 	return func(key string, numPartitions uint32) int {
@@ -65,10 +68,13 @@ func NewDefaultRouter(clock Clock, hashFunc func(string) uint32,
 		// of batching of the messages.
 		//
 		//currentMs / maxBatchingDelayMs + startPtnIdx
-		if maxBatchingDelay.Nanoseconds() != 0 {
+		if !disableBatching && maxBatchingDelay.Nanoseconds() > 0 {
 			n := uint32(state.clock()/uint64(maxBatchingDelay.Nanoseconds())) + state.shiftIdx
 			return int(n % numPartitions)
 		}
-		return 0
+
+		p := int(state.msgCounter % numPartitions)
+		atomic.AddUint32(&state.msgCounter, 1)
+		return p
 	}
 }
diff --git a/pulsar/internal/default_router_test.go b/pulsar/internal/default_router_test.go
index 7dc3e0b..980df92 100644
--- a/pulsar/internal/default_router_test.go
+++ b/pulsar/internal/default_router_test.go
@@ -30,7 +30,7 @@ func TestDefaultRouter(t *testing.T) {
 
 	router := NewDefaultRouter(func() uint64 {
 		return currentClock
-	}, JavaStringHash, 10*time.Nanosecond)
+	}, JavaStringHash, 10*time.Nanosecond, false)
 
 	// partition index should not change with time
 	p1 := router("my-key", 100)
@@ -63,11 +63,31 @@ func TestDefaultRouter(t *testing.T) {
 	currentClock = 112
 	pr6 := router("", 100)
 	assert.Equal(t, pr5, pr6)
+
+	// test batching delay is 0
+	router = NewDefaultRouter(func() uint64 {
+		return currentClock
+	}, JavaStringHash, 0, true)
+
+	// should round robin partitions
+	for i := 0; i < 200; i++ {
+		assert.Equal(t, router("", 100), i%100)
+	}
+
+	// test batching is disabled
+	router = NewDefaultRouter(func() uint64 {
+		return currentClock
+	}, JavaStringHash, 10*time.Nanosecond, true)
+
+	// should round robin partitions
+	for i := 0; i < 200; i++ {
+		assert.Equal(t, router("", 100), i%100)
+	}
 }
 
 func TestDefaultRouterNoPartitions(t *testing.T) {
 
-	router := NewDefaultRouter(NewSystemClock(), JavaStringHash, 10*time.Nanosecond)
+	router := NewDefaultRouter(NewSystemClock(), JavaStringHash, 10*time.Nanosecond, false)
 
 	// partition index should not change with time
 	p1 := router("", 1)
diff --git a/pulsar/producer_impl.go b/pulsar/producer_impl.go
index 83a6f75..67d0360 100644
--- a/pulsar/producer_impl.go
+++ b/pulsar/producer_impl.go
@@ -19,6 +19,7 @@ package pulsar
 
 import (
 	"context"
+	"time"
 
 	"github.com/apache/pulsar-client-go/pulsar/internal"
 )
@@ -30,6 +31,8 @@ type producer struct {
 	messageRouter func(*ProducerMessage, TopicMetadata) int
 }
 
+const defaultBatchingMaxPublishDelay = 10 * time.Millisecond
+
 func getHashingFunction(s HashingScheme) func(string) uint32 {
 	switch s {
 	case JavaStringHash:
@@ -51,11 +54,18 @@ func newProducer(client *client, options *ProducerOptions) (*producer, error) {
 		client: client,
 	}
 
+	var batchingMaxPublishDelay time.Duration
+	if options.BatchingMaxPublishDelay != 0 {
+		batchingMaxPublishDelay = options.BatchingMaxPublishDelay
+	} else {
+		batchingMaxPublishDelay = defaultBatchingMaxPublishDelay
+	}
+
 	if options.MessageRouter == nil {
 		internalRouter := internal.NewDefaultRouter(
 			internal.NewSystemClock(),
 			getHashingFunction(options.HashingScheme),
-			options.BatchingMaxPublishDelay)
+			batchingMaxPublishDelay, options.DisableBatching)
 		p.messageRouter = func(message *ProducerMessage, metadata TopicMetadata) int {
 			return internalRouter(message.Key, metadata.NumPartitions())
 		}
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index f10adcf..58f512f 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -64,8 +64,6 @@ type partitionProducer struct {
 	partitionIdx int
 }
 
-const defaultBatchingMaxPublishDelay = 10 * time.Millisecond
-
 func newPartitionProducer(client *client, topic string, options *ProducerOptions, partitionIdx int) (
 	*partitionProducer, error) {
 	var batchingMaxPublishDelay time.Duration
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 66fa8a0..50eea77 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -20,7 +20,7 @@ package pulsar
 import (
 	"context"
 	"fmt"
-	"net/http"
+	"strconv"
 	"sync"
 	"testing"
 	"time"
@@ -351,11 +351,11 @@ func TestFlushInProducer(t *testing.T) {
 }
 
 func TestFlushInPartitionedProducer(t *testing.T) {
-	topicName := "persistent://public/default/partition-testFlushInPartitionedProducer"
+	topicName := "public/default/partition-testFlushInPartitionedProducer"
 
 	// call admin api to make it partitioned
-	url := adminURL + "/" + "admin/v2/" + topicName + "/partitions"
-	makeHTTPCall(t, http.MethodPut, url, "5")
+	url := adminURL + "/" + "admin/v2/persistent/" + topicName + "/partitions"
+	makeHTTPCall(t, url, "5")
 
 	numberOfPartitions := 5
 	numOfMessages := 10
@@ -427,6 +427,71 @@ func TestFlushInPartitionedProducer(t *testing.T) {
 	assert.Equal(t, msgCount, numOfMessages/2)
 }
 
+func TestRoundRobinRouterPartitionedProducer(t *testing.T) {
+	topicName := "public/default/partition-testRoundRobinRouterPartitionedProducer"
+	numberOfPartitions := 5
+
+	// call admin api to make it partitioned
+	url := adminURL + "/" + "admin/v2/persistent/" + topicName + "/partitions"
+	makeHTTPCall(t, url, strconv.Itoa(numberOfPartitions))
+
+	numOfMessages := 10
+	ctx := context.Background()
+
+	// creat client connection
+	client, err := NewClient(ClientOptions{
+		URL: serviceURL,
+	})
+	assert.NoError(t, err)
+	defer client.Close()
+
+	// create consumer
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topic:            topicName,
+		SubscriptionName: "my-sub",
+		Type:             Exclusive,
+	})
+	assert.Nil(t, err)
+	defer consumer.Close()
+
+	// create producer
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic:           topicName,
+		DisableBatching: true,
+	})
+	assert.Nil(t, err)
+	defer producer.Close()
+
+	// send 5 messages
+	prefix := "msg-"
+
+	for i := 0; i < numOfMessages; i++ {
+		messageContent := prefix + fmt.Sprintf("%d", i)
+		_, err = producer.Send(ctx, &ProducerMessage{
+			Payload: []byte(messageContent),
+		})
+		assert.Nil(t, err)
+	}
+
+	// Receive all messages
+	msgCount := 0
+	msgPartitionMap := make(map[string]int)
+	for i := 0; i < numOfMessages; i++ {
+		msg, err := consumer.Receive(ctx)
+		fmt.Printf("Received message msgId: %#v topic: %s-- content: '%s'\n",
+			msg.ID(), msg.Topic(), string(msg.Payload()))
+		assert.Nil(t, err)
+		consumer.Ack(msg)
+		msgCount++
+		msgPartitionMap[msg.Topic()]++
+	}
+	assert.Equal(t, msgCount, numOfMessages)
+	assert.Equal(t, numberOfPartitions, len(msgPartitionMap))
+	for _, count := range msgPartitionMap {
+		assert.Equal(t, count, numOfMessages/numberOfPartitions)
+	}
+}
+
 func TestMessageRouter(t *testing.T) {
 	// Create topic with 5 partitions
 	err := httpPut("admin/v2/persistent/public/default/my-partitioned-topic/partitions", 5)
diff --git a/pulsar/test_helper.go b/pulsar/test_helper.go
index 3471bca..9d54f20 100644
--- a/pulsar/test_helper.go
+++ b/pulsar/test_helper.go
@@ -118,10 +118,10 @@ func httpDo(method string, requestPath string, in interface{}, out interface{})
 	return nil
 }
 
-func makeHTTPCall(t *testing.T, method string, url string, body string) {
+func makeHTTPCall(t *testing.T, url string, body string) {
 	client := http.Client{}
 
-	req, err := http.NewRequest(method, url, strings.NewReader(body))
+	req, err := http.NewRequest(http.MethodPut, url, strings.NewReader(body))
 	if err != nil {
 		t.Fatal(err)
 	}