You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/11/25 01:54:50 UTC

[GitHub] [pulsar-client-go] freeznet opened a new pull request #400: [Issue 172] Add key based batcher

freeznet opened a new pull request #400:
URL: https://github.com/apache/pulsar-client-go/pull/400


   Fixes #172
   
   ### Motivation
   
   Add a new batch message container named `keyBasedBatchContainer` to support batching message in key_shared subscription mode.
   
   ### Modifications
   
   - add `BatchBuilder` interface, add `FlushBatches` and `IsMultiBatches` func
   - change old `BatchBuilder` struct to `batchContainer`
   - add `keyBasedBatchContainer`
   - add tests
   
   ### Verifying this change
   
   This change added tests and can be verified as follows:
   
     - *Added integration tests for key based batch producer with multiple consumer in KeyShared mode*
     - *Added integration tests for message ordering with key based batch producer and KeyShared consumer*


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wolfstudy merged pull request #400: [Issue 172] Add key based batcher

Posted by GitBox <gi...@apache.org>.
wolfstudy merged pull request #400:
URL: https://github.com/apache/pulsar-client-go/pull/400


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] freeznet commented on a change in pull request #400: [Issue 172] Add key based batcher

Posted by GitBox <gi...@apache.org>.
freeznet commented on a change in pull request #400:
URL: https://github.com/apache/pulsar-client-go/pull/400#discussion_r532323506



##########
File path: pulsar/consumer_test.go
##########
@@ -1712,3 +1712,162 @@ func TestConsumerName(t *testing.T) {
 
 	assert.Equal(consumerName, consumer.Name())
 }
+
+func TestKeyBasedBatchProducerConsumerKeyShared(t *testing.T) {
+	const MsgBatchCount = 100
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topic := "persistent://public/default/test-key-based-batch-with-key-shared"
+
+	consumer1, err := client.Subscribe(ConsumerOptions{
+		Topic:            topic,
+		SubscriptionName: "sub-1",
+		Type:             KeyShared,
+	})
+	assert.Nil(t, err)
+	defer consumer1.Close()
+
+	consumer2, err := client.Subscribe(ConsumerOptions{
+		Topic:            topic,
+		SubscriptionName: "sub-1",
+		Type:             KeyShared,
+	})
+	assert.Nil(t, err)
+	defer consumer2.Close()
+
+	// create producer
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic:               topic,
+		DisableBatching:     false,
+		BatcherBuilderType:  KeyBasedBatchBuilder,
+		BatchingMaxMessages: 10,
+	})
+	assert.Nil(t, err)
+	defer producer.Close()
+
+	ctx := context.Background()
+	keys := []string{"key1", "key2", "key3"}
+	for i := 0; i < MsgBatchCount; i++ {
+		for _, k := range keys {
+			producer.SendAsync(ctx, &ProducerMessage{
+				Key:     k,
+				Payload: []byte(fmt.Sprintf("value-%d", i)),
+			}, func(id MessageID, producerMessage *ProducerMessage, err error) {
+				assert.Nil(t, err)
+			},
+			)
+		}
+	}
+
+	receivedConsumer1 := 0
+	receivedConsumer2 := 0
+	consumer1Keys := make(map[string]int)
+	consumer2Keys := make(map[string]int)
+	for (receivedConsumer1 + receivedConsumer2) < 300 {
+		select {
+		case cm, ok := <-consumer1.Chan():
+			if !ok {
+				break
+			}
+			receivedConsumer1++
+			if cnt, has := consumer1Keys[cm.Key()]; !has {
+				consumer1Keys[cm.Key()] = 1
+			} else {
+				consumer1Keys[cm.Key()] = cnt + 1
+			}
+			consumer1.Ack(cm.Message)
+		case cm, ok := <-consumer2.Chan():
+			if !ok {
+				break
+			}
+			receivedConsumer2++
+			if cnt, has := consumer2Keys[cm.Key()]; !has {
+				consumer2Keys[cm.Key()] = 1
+			} else {
+				consumer2Keys[cm.Key()] = cnt + 1
+			}
+			consumer2.Ack(cm.Message)
+		}
+	}
+
+	assert.NotEqual(t, 0, receivedConsumer1)
+	assert.NotEqual(t, 0, receivedConsumer2)
+	assert.Equal(t, len(consumer1Keys)*MsgBatchCount, receivedConsumer1)
+	assert.Equal(t, len(consumer2Keys)*MsgBatchCount, receivedConsumer2)
+
+	fmt.Printf("TestKeyBasedBatchProducerConsumerKeyShared received messages consumer1: %d consumser2: %d\n",
+		receivedConsumer1, receivedConsumer2)
+	assert.Equal(t, 300, receivedConsumer1+receivedConsumer2)
+
+	fmt.Printf("TestKeyBasedBatchProducerConsumerKeyShared received messages keys consumer1: %v consumser2: %v\n",
+		consumer1Keys, consumer2Keys)
+}

Review comment:
       @wolfstudy thanks for comment. The intention for this test is to check if `KeyBasedBatchBuilder` works ok (batch message with key works, which different from normal batch builder), the order of messages is test with `TestOrderingOfKeyBasedBatchProducerConsumerKeyShared`. But I can make some changes to add order test in `TestKeyBasedBatchProducerConsumerKeyShared` as well.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] freeznet commented on pull request #400: [Issue 172] Add key based batcher

Posted by GitBox <gi...@apache.org>.
freeznet commented on pull request #400:
URL: https://github.com/apache/pulsar-client-go/pull/400#issuecomment-733502015


   rerun failure checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wolfstudy commented on pull request #400: [Issue 172] Add key based batcher

Posted by GitBox <gi...@apache.org>.
wolfstudy commented on pull request #400:
URL: https://github.com/apache/pulsar-client-go/pull/400#issuecomment-736162738


   @freeznet Please merge master code to fix conflict, thanks?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wolfstudy commented on a change in pull request #400: [Issue 172] Add key based batcher

Posted by GitBox <gi...@apache.org>.
wolfstudy commented on a change in pull request #400:
URL: https://github.com/apache/pulsar-client-go/pull/400#discussion_r532302829



##########
File path: pulsar/consumer_test.go
##########
@@ -1712,3 +1712,162 @@ func TestConsumerName(t *testing.T) {
 
 	assert.Equal(consumerName, consumer.Name())
 }
+
+func TestKeyBasedBatchProducerConsumerKeyShared(t *testing.T) {
+	const MsgBatchCount = 100
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topic := "persistent://public/default/test-key-based-batch-with-key-shared"
+
+	consumer1, err := client.Subscribe(ConsumerOptions{
+		Topic:            topic,
+		SubscriptionName: "sub-1",
+		Type:             KeyShared,
+	})
+	assert.Nil(t, err)
+	defer consumer1.Close()
+
+	consumer2, err := client.Subscribe(ConsumerOptions{
+		Topic:            topic,
+		SubscriptionName: "sub-1",
+		Type:             KeyShared,
+	})
+	assert.Nil(t, err)
+	defer consumer2.Close()
+
+	// create producer
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic:               topic,
+		DisableBatching:     false,
+		BatcherBuilderType:  KeyBasedBatchBuilder,
+		BatchingMaxMessages: 10,
+	})
+	assert.Nil(t, err)
+	defer producer.Close()
+
+	ctx := context.Background()
+	keys := []string{"key1", "key2", "key3"}
+	for i := 0; i < MsgBatchCount; i++ {
+		for _, k := range keys {
+			producer.SendAsync(ctx, &ProducerMessage{
+				Key:     k,
+				Payload: []byte(fmt.Sprintf("value-%d", i)),
+			}, func(id MessageID, producerMessage *ProducerMessage, err error) {
+				assert.Nil(t, err)
+			},
+			)
+		}
+	}
+
+	receivedConsumer1 := 0
+	receivedConsumer2 := 0
+	consumer1Keys := make(map[string]int)
+	consumer2Keys := make(map[string]int)
+	for (receivedConsumer1 + receivedConsumer2) < 300 {
+		select {
+		case cm, ok := <-consumer1.Chan():
+			if !ok {
+				break
+			}
+			receivedConsumer1++
+			if cnt, has := consumer1Keys[cm.Key()]; !has {
+				consumer1Keys[cm.Key()] = 1
+			} else {
+				consumer1Keys[cm.Key()] = cnt + 1
+			}
+			consumer1.Ack(cm.Message)
+		case cm, ok := <-consumer2.Chan():
+			if !ok {
+				break
+			}
+			receivedConsumer2++
+			if cnt, has := consumer2Keys[cm.Key()]; !has {
+				consumer2Keys[cm.Key()] = 1
+			} else {
+				consumer2Keys[cm.Key()] = cnt + 1
+			}
+			consumer2.Ack(cm.Message)
+		}
+	}
+
+	assert.NotEqual(t, 0, receivedConsumer1)
+	assert.NotEqual(t, 0, receivedConsumer2)
+	assert.Equal(t, len(consumer1Keys)*MsgBatchCount, receivedConsumer1)
+	assert.Equal(t, len(consumer2Keys)*MsgBatchCount, receivedConsumer2)
+
+	fmt.Printf("TestKeyBasedBatchProducerConsumerKeyShared received messages consumer1: %d consumser2: %d\n",
+		receivedConsumer1, receivedConsumer2)
+	assert.Equal(t, 300, receivedConsumer1+receivedConsumer2)
+
+	fmt.Printf("TestKeyBasedBatchProducerConsumerKeyShared received messages keys consumer1: %v consumser2: %v\n",
+		consumer1Keys, consumer2Keys)
+}
+
+func TestOrderingOfKeyBasedBatchProducerConsumerKeyShared(t *testing.T) {
+	const MsgBatchCount = 10
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topic := "persistent://public/default/test-ordering-of-key-based-batch-with-key-shared"
+
+	consumer1, err := client.Subscribe(ConsumerOptions{
+		Topic:            topic,
+		SubscriptionName: "sub-1",
+		Type:             KeyShared,
+	})
+	assert.Nil(t, err)
+	defer consumer1.Close()
+
+	// create producer
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic:                   topic,
+		DisableBatching:         false,
+		BatcherBuilderType:      KeyBasedBatchBuilder,
+		BatchingMaxMessages:     30,
+		BatchingMaxPublishDelay: time.Second * 5,
+	})
+	assert.Nil(t, err)
+	defer producer.Close()
+
+	ctx := context.Background()
+	keys := []string{"key1", "key2", "key3"}
+	for i := 0; i < MsgBatchCount; i++ {
+		for _, k := range keys {
+			producer.SendAsync(ctx, &ProducerMessage{
+				Key:     k,
+				Payload: []byte(fmt.Sprintf("value-%d", i)),
+			}, func(id MessageID, producerMessage *ProducerMessage, err error) {
+				assert.Nil(t, err)
+			},
+			)
+		}
+	}
+
+	var receivedKey string
+	var receivedMessageIndex int
+	for i := 0; i < len(keys)*MsgBatchCount; i++ {
+		cm, ok := <-consumer1.Chan()
+		if !ok {
+			break
+		}
+		if receivedKey != cm.Key() {
+			receivedKey = cm.Key()
+			receivedMessageIndex = 0
+		}
+		assert.Equal(
+			t, fmt.Sprintf("value-%d", receivedMessageIndex%10),
+			string(cm.Payload()),
+		)
+		consumer1.Ack(cm.Message)
+		receivedMessageIndex++
+	}
+
+	// TODO: add OrderingKey support

Review comment:
       Thanks @freeznet work for this, can you help to create an issue to track this issue?

##########
File path: pulsar/consumer_test.go
##########
@@ -1712,3 +1712,162 @@ func TestConsumerName(t *testing.T) {
 
 	assert.Equal(consumerName, consumer.Name())
 }
+
+func TestKeyBasedBatchProducerConsumerKeyShared(t *testing.T) {
+	const MsgBatchCount = 100
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topic := "persistent://public/default/test-key-based-batch-with-key-shared"
+
+	consumer1, err := client.Subscribe(ConsumerOptions{
+		Topic:            topic,
+		SubscriptionName: "sub-1",
+		Type:             KeyShared,
+	})
+	assert.Nil(t, err)
+	defer consumer1.Close()
+
+	consumer2, err := client.Subscribe(ConsumerOptions{
+		Topic:            topic,
+		SubscriptionName: "sub-1",
+		Type:             KeyShared,
+	})
+	assert.Nil(t, err)
+	defer consumer2.Close()
+
+	// create producer
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic:               topic,
+		DisableBatching:     false,
+		BatcherBuilderType:  KeyBasedBatchBuilder,
+		BatchingMaxMessages: 10,
+	})
+	assert.Nil(t, err)
+	defer producer.Close()
+
+	ctx := context.Background()
+	keys := []string{"key1", "key2", "key3"}
+	for i := 0; i < MsgBatchCount; i++ {
+		for _, k := range keys {
+			producer.SendAsync(ctx, &ProducerMessage{
+				Key:     k,
+				Payload: []byte(fmt.Sprintf("value-%d", i)),
+			}, func(id MessageID, producerMessage *ProducerMessage, err error) {
+				assert.Nil(t, err)
+			},
+			)
+		}
+	}
+
+	receivedConsumer1 := 0
+	receivedConsumer2 := 0
+	consumer1Keys := make(map[string]int)
+	consumer2Keys := make(map[string]int)
+	for (receivedConsumer1 + receivedConsumer2) < 300 {
+		select {
+		case cm, ok := <-consumer1.Chan():
+			if !ok {
+				break
+			}
+			receivedConsumer1++
+			if cnt, has := consumer1Keys[cm.Key()]; !has {
+				consumer1Keys[cm.Key()] = 1
+			} else {
+				consumer1Keys[cm.Key()] = cnt + 1
+			}
+			consumer1.Ack(cm.Message)
+		case cm, ok := <-consumer2.Chan():
+			if !ok {
+				break
+			}
+			receivedConsumer2++
+			if cnt, has := consumer2Keys[cm.Key()]; !has {
+				consumer2Keys[cm.Key()] = 1
+			} else {
+				consumer2Keys[cm.Key()] = cnt + 1
+			}
+			consumer2.Ack(cm.Message)
+		}
+	}
+
+	assert.NotEqual(t, 0, receivedConsumer1)
+	assert.NotEqual(t, 0, receivedConsumer2)
+	assert.Equal(t, len(consumer1Keys)*MsgBatchCount, receivedConsumer1)
+	assert.Equal(t, len(consumer2Keys)*MsgBatchCount, receivedConsumer2)
+
+	fmt.Printf("TestKeyBasedBatchProducerConsumerKeyShared received messages consumer1: %d consumser2: %d\n",
+		receivedConsumer1, receivedConsumer2)
+	assert.Equal(t, 300, receivedConsumer1+receivedConsumer2)
+
+	fmt.Printf("TestKeyBasedBatchProducerConsumerKeyShared received messages keys consumer1: %v consumser2: %v\n",
+		consumer1Keys, consumer2Keys)
+}

Review comment:
       In this test case, how do we determine that the order of messages received by the consumer is correct?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] freeznet commented on a change in pull request #400: [Issue 172] Add key based batcher

Posted by GitBox <gi...@apache.org>.
freeznet commented on a change in pull request #400:
URL: https://github.com/apache/pulsar-client-go/pull/400#discussion_r532327313



##########
File path: pulsar/consumer_test.go
##########
@@ -1712,3 +1712,162 @@ func TestConsumerName(t *testing.T) {
 
 	assert.Equal(consumerName, consumer.Name())
 }
+
+func TestKeyBasedBatchProducerConsumerKeyShared(t *testing.T) {
+	const MsgBatchCount = 100
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topic := "persistent://public/default/test-key-based-batch-with-key-shared"
+
+	consumer1, err := client.Subscribe(ConsumerOptions{
+		Topic:            topic,
+		SubscriptionName: "sub-1",
+		Type:             KeyShared,
+	})
+	assert.Nil(t, err)
+	defer consumer1.Close()
+
+	consumer2, err := client.Subscribe(ConsumerOptions{
+		Topic:            topic,
+		SubscriptionName: "sub-1",
+		Type:             KeyShared,
+	})
+	assert.Nil(t, err)
+	defer consumer2.Close()
+
+	// create producer
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic:               topic,
+		DisableBatching:     false,
+		BatcherBuilderType:  KeyBasedBatchBuilder,
+		BatchingMaxMessages: 10,
+	})
+	assert.Nil(t, err)
+	defer producer.Close()
+
+	ctx := context.Background()
+	keys := []string{"key1", "key2", "key3"}
+	for i := 0; i < MsgBatchCount; i++ {
+		for _, k := range keys {
+			producer.SendAsync(ctx, &ProducerMessage{
+				Key:     k,
+				Payload: []byte(fmt.Sprintf("value-%d", i)),
+			}, func(id MessageID, producerMessage *ProducerMessage, err error) {
+				assert.Nil(t, err)
+			},
+			)
+		}
+	}
+
+	receivedConsumer1 := 0
+	receivedConsumer2 := 0
+	consumer1Keys := make(map[string]int)
+	consumer2Keys := make(map[string]int)
+	for (receivedConsumer1 + receivedConsumer2) < 300 {
+		select {
+		case cm, ok := <-consumer1.Chan():
+			if !ok {
+				break
+			}
+			receivedConsumer1++
+			if cnt, has := consumer1Keys[cm.Key()]; !has {
+				consumer1Keys[cm.Key()] = 1
+			} else {
+				consumer1Keys[cm.Key()] = cnt + 1
+			}
+			consumer1.Ack(cm.Message)
+		case cm, ok := <-consumer2.Chan():
+			if !ok {
+				break
+			}
+			receivedConsumer2++
+			if cnt, has := consumer2Keys[cm.Key()]; !has {
+				consumer2Keys[cm.Key()] = 1
+			} else {
+				consumer2Keys[cm.Key()] = cnt + 1
+			}
+			consumer2.Ack(cm.Message)
+		}
+	}
+
+	assert.NotEqual(t, 0, receivedConsumer1)
+	assert.NotEqual(t, 0, receivedConsumer2)
+	assert.Equal(t, len(consumer1Keys)*MsgBatchCount, receivedConsumer1)
+	assert.Equal(t, len(consumer2Keys)*MsgBatchCount, receivedConsumer2)
+
+	fmt.Printf("TestKeyBasedBatchProducerConsumerKeyShared received messages consumer1: %d consumser2: %d\n",
+		receivedConsumer1, receivedConsumer2)
+	assert.Equal(t, 300, receivedConsumer1+receivedConsumer2)
+
+	fmt.Printf("TestKeyBasedBatchProducerConsumerKeyShared received messages keys consumer1: %v consumser2: %v\n",
+		consumer1Keys, consumer2Keys)
+}
+
+func TestOrderingOfKeyBasedBatchProducerConsumerKeyShared(t *testing.T) {
+	const MsgBatchCount = 10
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topic := "persistent://public/default/test-ordering-of-key-based-batch-with-key-shared"
+
+	consumer1, err := client.Subscribe(ConsumerOptions{
+		Topic:            topic,
+		SubscriptionName: "sub-1",
+		Type:             KeyShared,
+	})
+	assert.Nil(t, err)
+	defer consumer1.Close()
+
+	// create producer
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic:                   topic,
+		DisableBatching:         false,
+		BatcherBuilderType:      KeyBasedBatchBuilder,
+		BatchingMaxMessages:     30,
+		BatchingMaxPublishDelay: time.Second * 5,
+	})
+	assert.Nil(t, err)
+	defer producer.Close()
+
+	ctx := context.Background()
+	keys := []string{"key1", "key2", "key3"}
+	for i := 0; i < MsgBatchCount; i++ {
+		for _, k := range keys {
+			producer.SendAsync(ctx, &ProducerMessage{
+				Key:     k,
+				Payload: []byte(fmt.Sprintf("value-%d", i)),
+			}, func(id MessageID, producerMessage *ProducerMessage, err error) {
+				assert.Nil(t, err)
+			},
+			)
+		}
+	}
+
+	var receivedKey string
+	var receivedMessageIndex int
+	for i := 0; i < len(keys)*MsgBatchCount; i++ {
+		cm, ok := <-consumer1.Chan()
+		if !ok {
+			break
+		}
+		if receivedKey != cm.Key() {
+			receivedKey = cm.Key()
+			receivedMessageIndex = 0
+		}
+		assert.Equal(
+			t, fmt.Sprintf("value-%d", receivedMessageIndex%10),
+			string(cm.Payload()),
+		)
+		consumer1.Ack(cm.Message)
+		receivedMessageIndex++
+	}
+
+	// TODO: add OrderingKey support

Review comment:
       new issue added #401, also add to comment to track.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] freeznet commented on pull request #400: [Issue 172] Add key based batcher

Posted by GitBox <gi...@apache.org>.
freeznet commented on pull request #400:
URL: https://github.com/apache/pulsar-client-go/pull/400#issuecomment-736222381


   > @freeznet Please merge master code to fix conflict, thanks?
   
   resolved ;)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org