You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/04/17 11:28:38 UTC

[GitHub] [pulsar] wolfstudy edited a comment on issue #6712: Pulsar Go Client Producer 2.4.1, 2.4.2 does not round robin messages by default when sending messages to multi partitioned topics

wolfstudy edited a comment on issue #6712: Pulsar Go Client Producer 2.4.1, 2.4.2 does not round robin messages by default when sending messages to multi partitioned topics
URL: https://github.com/apache/pulsar/issues/6712#issuecomment-615190728
 
 
   > @sijie We are not sending any message keys when producing to the partitioned topic. Below is the configuration we are using when sending data to the broker. This applies to both non-persistent and persistent topics
   > 
   > ```
   > client, err := pulsar.NewClient(pulsar.ClientOptions{
   >     		URL: pulsarBroker,
   > 		Authentication: tls_auth,
   > 		TLSTrustCertsFilePath: pulsarSslCACertFile,
   > 		TLSAllowInsecureConnection: TLSAllowInsecureConnection,
   > 		TLSValidateHostname : true,
   > 		MessageListenerThreads: 1,
   > 		IOThreads: 1,
   > 	})
   > 
   > 	producer, err := client.CreateProducer(pulsar.ProducerOptions{
   > 		Topic: pulsarTopic,
   > 		CompressionType: pulsar.LZ4,
   > 		BatchingMaxMessages: uint(10000),
   > 		Batching: true,
   > 		MaxPendingMessages: 10000,
   > 		MessageRoutingMode: pulsar.RoundRobinDistribution,
   > 		BlockIfQueueFull: true,
   > 		BatchingMaxPublishDelay: time.Duration(1)*time.Millisecond,
   > 	})
   > ```
   > 
   > ```
   > //Send Message Sync to Broker
   > msg := pulsar.ProducerMessage{
   > 			    Payload: metric, //Metric is a byte[]
   > 			}
   > err := producer.Send(context.Background(), msg)
   > ```
   
   hello @afire007  what is the `pulsarTopic` value?
   
   Unfortunately, i don't reproduce the issue in my local. Step as follows:
   
   1. run pulsar 2.4.2
   
   ```bash
   ./bin/pulsar localrun -a 127.0.0.1
   ```
   
   2. create topic
   
   ```bash
   ./bin/pulsar-admin non-persistent create-partitioned-topic non-persistent://public/default/sample-topic-1min --partitions 6
   ```
   
   3. receive messages from `non-persistent://public/default/sample-topic-1min`
   
   ```
   import (
   	"context"
   	"fmt"
   	log "github.com/apache/pulsar/pulsar-client-go/logutil"
   	"github.com/apache/pulsar/pulsar-client-go/pulsar"
   )
   
   func main() {
   	client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"})
   	if err != nil {
   		log.Fatal(err)
   	}
   
   	defer client.Close()
   
   	consumer, err := client.Subscribe(pulsar.ConsumerOptions{
   		Topic:            "non-persistent://public/default/sample-topic-1min",
   		SubscriptionName: "my-subscription",
   		Type:             pulsar.Shared,
   	})
   	if err != nil {
   		log.Fatal(err)
   	}
   
   	defer consumer.Close()
   
   	for {
   		msg, err := consumer.Receive(context.Background())
   		if err != nil {
   			log.Fatal(err)
   		}
   
   		fmt.Printf("Received message  msgId: %s -- content: '%s'\n",
   			msg.ID(), string(msg.Payload()))
   
   		consumer.Ack(msg)
   	}
   }
   ```
   
   4. publish message to `non-persistent://public/default/sample-topic-1min`
   
   ```
   import (
   	"context"
   	"fmt"
   
   	log "github.com/apache/pulsar/pulsar-client-go/logutil"
   	"github.com/apache/pulsar/pulsar-client-go/pulsar"
   )
   
   func main() {
   	client, err := pulsar.NewClient(pulsar.ClientOptions{
   		URL:       "pulsar://localhost:6650",
   		IOThreads: 5,
   	})
   
   	if err != nil {
   		log.Fatal(err)
   	}
   
   	defer client.Close()
   
   	producer, err := client.CreateProducer(pulsar.ProducerOptions{
   		Topic: "non-persistent://public/default/sample-topic-1min",
   	})
   	if err != nil {
   		log.Fatal(err)
   	}
   
   	defer producer.Close()
   
   	ctx := context.Background()
   
   	for i := 0; i < 10; i++ {
   		if err := producer.Send(ctx, pulsar.ProducerMessage{
   			Payload: []byte(fmt.Sprintf("hello-%d", i)),
   		}); err != nil {
   			log.Fatal(err)
   		}
   	}
   }
   ```
   
   In receive window, the output as follows:
   
   ```text
   Received message  msgId: (0,0,0,-1) -- content: 'hello-0'
   Received message  msgId: (0,0,1,-1) -- content: 'hello-1'
   Received message  msgId: (0,0,2,-1) -- content: 'hello-2'
   Received message  msgId: (0,0,3,-1) -- content: 'hello-3'
   Received message  msgId: (0,0,4,-1) -- content: 'hello-4'
   Received message  msgId: (0,0,5,-1) -- content: 'hello-5'
   Received message  msgId: (0,0,0,-1) -- content: 'hello-6'
   Received message  msgId: (0,0,1,-1) -- content: 'hello-7'
   Received message  msgId: (0,0,2,-1) -- content: 'hello-8'
   Received message  msgId: (0,0,3,-1) -- content: 'hello-9'
   ```
   
   Then run `./bin/pulsar-admin topics list public/default`, output as follows:
   
   ```
   "non-persistent://public/default/sample-topic-1min-partition-0"
   "non-persistent://public/default/sample-topic-1min-partition-1"
   "non-persistent://public/default/sample-topic-1min-partition-2"
   "non-persistent://public/default/sample-topic-1min-partition-3"
   "non-persistent://public/default/sample-topic-1min-partition-4"
   "non-persistent://public/default/sample-topic-1min-partition-5"
   "non-persistent://public/default/sample-topic-1min"
   ```
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services