You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/10/21 01:00:37 UTC

[pulsar-client-go] branch master updated: Update default router to switch partition on all batching thresholds #382 (#383)

This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 16fea6d  Update default router to switch partition on all batching thresholds #382 (#383)
16fea6d is described below

commit 16fea6d572b936253aa17abdbe1c60eb9ceb54d5
Author: Denis Vergnes <de...@gmail.com>
AuthorDate: Tue Oct 20 18:00:26 2020 -0700

    Update default router to switch partition on all batching thresholds #382 (#383)
    
    Master Issue: #382
    
    ### Motivation
    
    
    The default router only switches partition when the max delay to publish has elapsed. This PR is about switching as soon as one of the threshold (max number of messages, max number of bytes, max delay) is reached.
    
    ### Modifications
    
    The default router has been updated to switch to one of the threshold. To prevent cyclic dependencies between the packages, it has been moved from internal to pulsar package.
    
    ### Verifying this change
    
    - [X ] Make sure that the change passes the CI checks.
    
    *(Please pick either of the following options)*
    
    This change added unit tests to verify all cases
    
    ### Does this pull request potentially affect one of the following parts:
    
    *If `yes` was chosen, please highlight the changes*
    
      - Dependencies (does it add or upgrade a dependency): no
      - The public API: no
      - The schema: no
      - The default values of configurations: no
      - The wire protocol: no
    
    ### Documentation
    
      - Does this pull request introduce a new feature? yes
      - If yes, how is the feature documented? GoDocs
---
 pulsar/default_router.go               |  94 +++++++++++++++++++++
 pulsar/default_router_test.go          | 144 +++++++++++++++++++++++++++++++++
 pulsar/internal/batch_builder.go       |  15 +---
 pulsar/internal/default_router.go      |  80 ------------------
 pulsar/internal/default_router_test.go | 102 -----------------------
 pulsar/producer_impl.go                |  40 ++++++---
 pulsar/producer_test.go                |   2 +-
 7 files changed, 267 insertions(+), 210 deletions(-)

diff --git a/pulsar/default_router.go b/pulsar/default_router.go
new file mode 100644
index 0000000..82be982
--- /dev/null
+++ b/pulsar/default_router.go
@@ -0,0 +1,94 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import (
+	"math"
+	"math/rand"
+	"sync/atomic"
+	"time"
+)
+
+type defaultRouter struct {
+	currentPartitionCursor uint32
+
+	lastChangeTimestamp int64
+	msgCounter          uint32
+	cumulativeBatchSize uint32
+}
+
+// NewDefaultRouter set the message routing mode for the partitioned producer.
+// Default routing mode is round-robin routing if no partition key is specified.
+// If the batching is enabled, it honors the different thresholds for batching i.e. maximum batch size,
+// maximum number of messages, maximum delay to publish a batch. When one of the threshold is reached the next partition
+// is used.
+func NewDefaultRouter(
+	hashFunc func(string) uint32,
+	maxBatchingMessages uint,
+	maxBatchingSize uint,
+	maxBatchingDelay time.Duration,
+	disableBatching bool) func(*ProducerMessage, uint32) int {
+	state := &defaultRouter{
+		currentPartitionCursor: rand.Uint32(),
+		lastChangeTimestamp:    math.MinInt64,
+	}
+
+	return func(message *ProducerMessage, numPartitions uint32) int {
+		if numPartitions == 1 {
+			// When there are no partitions, don't even bother
+			return 0
+		}
+
+		if len(message.Key) != 0 {
+			// When a key is specified, use the hash of that key
+			return int(hashFunc(message.Key) % numPartitions)
+		}
+
+		// If there's no key, we do round-robin across partition. If no batching go to next partition.
+		if disableBatching {
+			p := int(state.currentPartitionCursor % numPartitions)
+			atomic.AddUint32(&state.currentPartitionCursor, 1)
+			return p
+		}
+
+		// If there's no key, we do round-robin across partition, sticking with a given
+		// partition for a certain amount of messages or volume buffered or the max delay to batch is reached so that
+		// we ensure having a decent amount of batching of the messages.
+		// Note that it is possible that we skip more than one partition if multiple goroutines increment
+		// currentPartitionCursor at the same time. If that happens it shouldn't be a problem because we only want to
+		// spread the data on different partitions but not necessarily in a specific sequence.
+		size := uint32(len(message.Payload))
+		previousMessageCount := atomic.LoadUint32(&state.msgCounter)
+		previousBatchingMaxSize := atomic.LoadUint32(&state.cumulativeBatchSize)
+		previousLastChange := atomic.LoadInt64(&state.lastChangeTimestamp)
+		if (previousMessageCount >= uint32(maxBatchingMessages-1)) ||
+			(size >= uint32(maxBatchingSize)-previousBatchingMaxSize) ||
+			(time.Now().UnixNano()-previousLastChange >= maxBatchingDelay.Nanoseconds()) {
+			atomic.AddUint32(&state.currentPartitionCursor, 1)
+			atomic.StoreInt64(&state.lastChangeTimestamp, time.Now().UnixNano())
+			atomic.StoreUint32(&state.cumulativeBatchSize, 0)
+			atomic.StoreUint32(&state.msgCounter, 0)
+			return int(state.currentPartitionCursor % numPartitions)
+		}
+
+		atomic.StoreInt64(&state.lastChangeTimestamp, time.Now().UnixNano())
+		atomic.AddUint32(&state.msgCounter, 1)
+		atomic.AddUint32(&state.cumulativeBatchSize, size)
+		return int(state.currentPartitionCursor % numPartitions)
+	}
+}
diff --git a/pulsar/default_router_test.go b/pulsar/default_router_test.go
new file mode 100644
index 0000000..60e10c7
--- /dev/null
+++ b/pulsar/default_router_test.go
@@ -0,0 +1,144 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import (
+	"testing"
+	"time"
+
+	"github.com/apache/pulsar-client-go/pulsar/internal"
+	"github.com/stretchr/testify/assert"
+)
+
+const oneHourPublishMaxDelay = time.Hour
+
+func TestDefaultRouterRoutingBecauseBatchingDisabled(t *testing.T) {
+	router := NewDefaultRouter(internal.JavaStringHash, 20, 100, oneHourPublishMaxDelay, true)
+	const numPartitions = uint32(3)
+	p1 := router(&ProducerMessage{
+		Payload: []byte("message 1"),
+	}, numPartitions)
+	assert.LessOrEqual(t, p1, int(numPartitions))
+
+	p2 := router(&ProducerMessage{
+		Payload: []byte("message 2"),
+	}, numPartitions)
+	if p1 == int(numPartitions-1) {
+		assert.Equal(t, 0, p2)
+	} else {
+		assert.Equal(t, p1+1, p2)
+	}
+}
+
+func TestDefaultRouterRoutingBecauseMaxPublishDelayReached(t *testing.T) {
+	maxPublishDelay := time.Nanosecond * 10
+	router := NewDefaultRouter(internal.JavaStringHash, 20, 100, maxPublishDelay, false)
+	const numPartitions = uint32(3)
+	p1 := router(&ProducerMessage{
+		Payload: []byte("message 1"),
+	}, 3)
+	assert.LessOrEqual(t, p1, int(numPartitions))
+
+	time.Sleep(maxPublishDelay)
+
+	p2 := router(&ProducerMessage{
+		Payload: []byte("message 2"),
+	}, numPartitions)
+	if p1 == int(numPartitions-1) {
+		assert.Equal(t, 0, p2)
+	} else {
+		assert.Equal(t, p1+1, p2)
+	}
+}
+
+func TestDefaultRouterRoutingBecauseMaxNumberOfMessagesReached(t *testing.T) {
+	router := NewDefaultRouter(internal.JavaStringHash, 2, 100, oneHourPublishMaxDelay, false)
+	const numPartitions = uint32(3)
+	p1 := router(&ProducerMessage{
+		Payload: []byte("message 1"),
+	}, 3)
+	assert.LessOrEqual(t, p1, int(numPartitions))
+
+	p2 := router(&ProducerMessage{
+		Payload: []byte("message 2"),
+	}, numPartitions)
+	if p1 == int(numPartitions-1) {
+		assert.Equal(t, 0, p2)
+	} else {
+		assert.Equal(t, p1+1, p2)
+	}
+}
+
+func TestDefaultRouterRoutingBecauseMaxVolumeReached(t *testing.T) {
+	router := NewDefaultRouter(internal.JavaStringHash, 10, 10, oneHourPublishMaxDelay, false)
+	const numPartitions = uint32(3)
+	p1 := router(&ProducerMessage{
+		Payload: []byte("message 1"),
+	}, 3)
+	assert.LessOrEqual(t, p1, int(numPartitions))
+
+	p2 := router(&ProducerMessage{
+		Payload: []byte("message 2"),
+	}, numPartitions)
+	if p1 == int(numPartitions-1) {
+		assert.Equal(t, 0, p2)
+	} else {
+		assert.Equal(t, p1+1, p2)
+	}
+}
+
+func TestDefaultRouterNoRoutingBecausePartitionKeyIsSpecified(t *testing.T) {
+	router := NewDefaultRouter(internal.JavaStringHash, 1, 1, 0, false)
+	p1 := router(&ProducerMessage{
+		Key:     "my-key",
+		Payload: []byte("message 1"),
+	}, 3)
+	assert.Equal(t, 1, p1)
+
+	p2 := router(&ProducerMessage{
+		Key:     "my-key",
+		Payload: []byte("message 2"),
+	}, 3)
+	assert.Equal(t, p1, p2)
+}
+
+func TestDefaultRouterNoRoutingBecauseOnlyOnePartition(t *testing.T) {
+
+	router := NewDefaultRouter(internal.JavaStringHash, 1, 10, oneHourPublishMaxDelay, false)
+
+	// partition index should not change regardless of the batching settings
+	p1 := router(&ProducerMessage{
+		Key: "",
+	}, 1)
+	p2 := router(&ProducerMessage{
+		Key: "my-key",
+	}, 1)
+	p3 := router(&ProducerMessage{
+		Payload: []byte("this payload is bigger than 10 bytes"),
+	}, 1)
+
+	// we send 2 messages to try trigger the max messages rule
+	p4 := router(&ProducerMessage{}, 1)
+	p5 := router(&ProducerMessage{}, 1)
+
+	assert.Equal(t, 0, p1)
+	assert.Equal(t, 0, p2)
+	assert.Equal(t, 0, p3)
+	assert.Equal(t, 0, p4)
+	assert.Equal(t, 0, p5)
+}
diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go
index 81dda2d..ecf2b88 100644
--- a/pulsar/internal/batch_builder.go
+++ b/pulsar/internal/batch_builder.go
@@ -27,14 +27,6 @@ import (
 	"github.com/apache/pulsar-client-go/pulsar/log"
 )
 
-const (
-	// DefaultMaxBatchSize init default for maximum number of bytes per batch
-	DefaultMaxBatchSize = 128 * 1024
-
-	// DefaultMaxMessagesPerBatch init default num of entries in per batch.
-	DefaultMaxMessagesPerBatch = 1000
-)
-
 type BuffersPool interface {
 	GetBuffer() Buffer
 }
@@ -71,12 +63,7 @@ type BatchBuilder struct {
 func NewBatchBuilder(maxMessages uint, maxBatchSize uint, producerName string, producerID uint64,
 	compressionType pb.CompressionType, level compression.Level,
 	bufferPool BuffersPool, logger log.Logger) (*BatchBuilder, error) {
-	if maxMessages == 0 {
-		maxMessages = DefaultMaxMessagesPerBatch
-	}
-	if maxBatchSize == 0 {
-		maxBatchSize = DefaultMaxBatchSize
-	}
+
 	bb := &BatchBuilder{
 		buffer:       NewBuffer(4096),
 		numMessages:  0,
diff --git a/pulsar/internal/default_router.go b/pulsar/internal/default_router.go
deleted file mode 100644
index 2cf4a90..0000000
--- a/pulsar/internal/default_router.go
+++ /dev/null
@@ -1,80 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package internal
-
-import (
-	"math/rand"
-	"sync/atomic"
-	"time"
-)
-
-type defaultRouter struct {
-	clock            Clock
-	shiftIdx         uint32
-	maxBatchingDelay time.Duration
-	hashFunc         func(string) uint32
-	msgCounter       uint32
-}
-
-type Clock func() uint64
-
-// NewSystemClock init system clock and return current time.
-func NewSystemClock() Clock {
-	return func() uint64 {
-		return uint64(time.Now().UnixNano())
-	}
-}
-
-// NewDefaultRouter set the message routing mode for the partitioned producer.
-// Default routing mode is round-robin routing.
-func NewDefaultRouter(clock Clock, hashFunc func(string) uint32,
-	maxBatchingDelay time.Duration, disableBatching bool) func(string, uint32) int {
-	state := &defaultRouter{
-		clock:            clock,
-		shiftIdx:         rand.Uint32(),
-		maxBatchingDelay: maxBatchingDelay,
-		hashFunc:         hashFunc,
-		msgCounter:       0,
-	}
-
-	return func(key string, numPartitions uint32) int {
-		if numPartitions == 1 {
-			// When there are no partitions, don't even bother
-			return 0
-		}
-
-		if key != "" {
-			// When a key is specified, use the hash of that key
-			return int(state.hashFunc(key) % numPartitions)
-		}
-
-		// If there's no key, we do round-robin across partition, sticking with a given
-		// partition for a certain amount of time, to ensure we can have a decent amount
-		// of batching of the messages.
-		//
-		//currentMs / maxBatchingDelayMs + startPtnIdx
-		if !disableBatching && maxBatchingDelay.Nanoseconds() > 0 {
-			n := uint32(state.clock()/uint64(maxBatchingDelay.Nanoseconds())) + state.shiftIdx
-			return int(n % numPartitions)
-		}
-
-		p := int(state.msgCounter % numPartitions)
-		atomic.AddUint32(&state.msgCounter, 1)
-		return p
-	}
-}
diff --git a/pulsar/internal/default_router_test.go b/pulsar/internal/default_router_test.go
deleted file mode 100644
index 980df92..0000000
--- a/pulsar/internal/default_router_test.go
+++ /dev/null
@@ -1,102 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package internal
-
-import (
-	"testing"
-	"time"
-
-	"github.com/stretchr/testify/assert"
-)
-
-func TestDefaultRouter(t *testing.T) {
-
-	var currentClock uint64
-
-	router := NewDefaultRouter(func() uint64 {
-		return currentClock
-	}, JavaStringHash, 10*time.Nanosecond, false)
-
-	// partition index should not change with time
-	p1 := router("my-key", 100)
-	p2 := router("my-key", 100)
-
-	assert.Equal(t, p1, p2)
-
-	currentClock = 100
-	p3 := router("my-key", 100)
-
-	assert.Equal(t, p1, p3)
-
-	// With no key, we should give the same partition for a given time range
-	pr1 := router("", 100)
-	pr2 := router("", 100)
-	assert.Equal(t, pr1, pr2)
-
-	currentClock = 101
-	pr3 := router("", 100)
-	assert.Equal(t, pr1, pr3)
-
-	currentClock = 102
-	pr4 := router("", 100)
-	assert.Equal(t, pr1, pr4)
-
-	currentClock = 111
-	pr5 := router("", 100)
-	assert.NotEqual(t, pr1, pr5)
-
-	currentClock = 112
-	pr6 := router("", 100)
-	assert.Equal(t, pr5, pr6)
-
-	// test batching delay is 0
-	router = NewDefaultRouter(func() uint64 {
-		return currentClock
-	}, JavaStringHash, 0, true)
-
-	// should round robin partitions
-	for i := 0; i < 200; i++ {
-		assert.Equal(t, router("", 100), i%100)
-	}
-
-	// test batching is disabled
-	router = NewDefaultRouter(func() uint64 {
-		return currentClock
-	}, JavaStringHash, 10*time.Nanosecond, true)
-
-	// should round robin partitions
-	for i := 0; i < 200; i++ {
-		assert.Equal(t, router("", 100), i%100)
-	}
-}
-
-func TestDefaultRouterNoPartitions(t *testing.T) {
-
-	router := NewDefaultRouter(NewSystemClock(), JavaStringHash, 10*time.Nanosecond, false)
-
-	// partition index should not change with time
-	p1 := router("", 1)
-	p2 := router("my-key", 1)
-	p3 := router("my-key-2", 1)
-	p4 := router("my-key-3", 1)
-
-	assert.Equal(t, 0, p1)
-	assert.Equal(t, 0, p2)
-	assert.Equal(t, 0, p3)
-	assert.Equal(t, 0, p4)
-}
diff --git a/pulsar/producer_impl.go b/pulsar/producer_impl.go
index 25499ce..42ddf0b 100644
--- a/pulsar/producer_impl.go
+++ b/pulsar/producer_impl.go
@@ -31,6 +31,17 @@ import (
 	"github.com/apache/pulsar-client-go/pulsar/log"
 )
 
+const (
+	// defaultBatchingMaxPublishDelay init default for maximum delay to batch messages
+	defaultBatchingMaxPublishDelay = 10 * time.Millisecond
+
+	// defaultMaxBatchSize init default for maximum number of bytes per batch
+	defaultMaxBatchSize = 128 * 1024
+
+	// defaultMaxMessagesPerBatch init default num of entries in per batch.
+	defaultMaxMessagesPerBatch = 1000
+)
+
 var (
 	producersOpened = promauto.NewCounter(prometheus.CounterOpts{
 		Name: "pulsar_client_producers_opened",
@@ -62,8 +73,6 @@ type producer struct {
 	log           log.Logger
 }
 
-const defaultBatchingMaxPublishDelay = 10 * time.Millisecond
-
 var partitionsAutoDiscoveryInterval = 1 * time.Minute
 
 func getHashingFunction(s HashingScheme) func(string) uint32 {
@@ -82,6 +91,16 @@ func newProducer(client *client, options *ProducerOptions) (*producer, error) {
 		return nil, newError(ResultInvalidTopicName, "Topic name is required for producer")
 	}
 
+	if options.BatchingMaxMessages == 0 {
+		options.BatchingMaxMessages = defaultMaxMessagesPerBatch
+	}
+	if options.BatchingMaxSize == 0 {
+		options.BatchingMaxSize = defaultMaxBatchSize
+	}
+	if options.BatchingMaxPublishDelay == 0 {
+		options.BatchingMaxPublishDelay = defaultBatchingMaxPublishDelay
+	}
+
 	p := &producer{
 		options: options,
 		topic:   options.Topic,
@@ -89,24 +108,19 @@ func newProducer(client *client, options *ProducerOptions) (*producer, error) {
 		log:     client.log.SubLogger(log.Fields{"topic": options.Topic}),
 	}
 
-	var batchingMaxPublishDelay time.Duration
-	if options.BatchingMaxPublishDelay != 0 {
-		batchingMaxPublishDelay = options.BatchingMaxPublishDelay
-	} else {
-		batchingMaxPublishDelay = defaultBatchingMaxPublishDelay
-	}
-
 	if options.Interceptors == nil {
 		options.Interceptors = defaultProducerInterceptors
 	}
 
 	if options.MessageRouter == nil {
-		internalRouter := internal.NewDefaultRouter(
-			internal.NewSystemClock(),
+		internalRouter := NewDefaultRouter(
 			getHashingFunction(options.HashingScheme),
-			batchingMaxPublishDelay, options.DisableBatching)
+			options.BatchingMaxMessages,
+			options.BatchingMaxSize,
+			options.BatchingMaxPublishDelay,
+			options.DisableBatching)
 		p.messageRouter = func(message *ProducerMessage, metadata TopicMetadata) int {
-			return internalRouter(message.Key, metadata.NumPartitions())
+			return internalRouter(message, metadata.NumPartitions())
 		}
 	} else {
 		p.messageRouter = options.MessageRouter
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 2c1ff51..3fe46c4 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -660,7 +660,7 @@ func TestBatchMessageFlushing(t *testing.T) {
 	}
 	defer producer.Close()
 
-	maxBytes := internal.DefaultMaxBatchSize
+	maxBytes := defaultMaxBatchSize
 	genbytes := func(n int) []byte {
 		c := []byte("a")[0]
 		bytes := make([]byte, n)