You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/14 16:34:03 UTC

[pulsar-client-go] 26/38: Use logrus in perf producer/consumer

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git

commit e077250d12080dc8688c058df47f4e602b6088a8
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Sat May 4 13:51:38 2019 -0700

    Use logrus in perf producer/consumer
---
 perf/perf-consumer.go |  9 ++++-----
 perf/perf-producer.go | 25 ++++++++++++-------------
 2 files changed, 16 insertions(+), 18 deletions(-)

diff --git a/perf/perf-consumer.go b/perf/perf-consumer.go
index 4ffa36f..e6fdf77 100644
--- a/perf/perf-consumer.go
+++ b/perf/perf-consumer.go
@@ -22,10 +22,9 @@ package main
 import (
 	"context"
 	"encoding/json"
-	"fmt"
 	"pulsar-client-go-native/pulsar"
 	"github.com/spf13/cobra"
-	"log"
+	log "github.com/sirupsen/logrus"
 	"sync/atomic"
 	"time"
 )
@@ -55,9 +54,9 @@ func initConsumer() {
 
 func consume() {
 	b, _ := json.MarshalIndent(clientArgs, "", "  ")
-	fmt.Println("Client config: ", string(b))
+	log.Info("Client config: ", string(b))
 	b, _ = json.MarshalIndent(consumeArgs, "", "  ")
-	fmt.Println("Consumer config: ", string(b))
+	log.Info("Consumer config: ", string(b))
 
 	client, err := pulsar.NewClient(pulsar.ClientOptions{
 		URL:                    clientArgs.ServiceUrl,
@@ -107,7 +106,7 @@ func consume() {
 			msgRate := float64(currentMsgReceived) / float64(10)
 			bytesRate := float64(currentBytesReceived) / float64(10)
 
-			log.Printf(`Stats - Consume rate: %6.1f msg/s - %6.1f Mbps`,
+			log.Infof(`Stats - Consume rate: %6.1f msg/s - %6.1f Mbps`,
 				msgRate, bytesRate*8/1024/1024)
 		}
 	}
diff --git a/perf/perf-producer.go b/perf/perf-producer.go
index 8b5c0fd..c40af30 100644
--- a/perf/perf-producer.go
+++ b/perf/perf-producer.go
@@ -22,21 +22,20 @@ package main
 import (
 	"context"
 	"encoding/json"
-	"fmt"
 	"github.com/beefsack/go-rate"
 	"github.com/bmizerany/perks/quantile"
 	"github.com/spf13/cobra"
-	"log"
+	log "github.com/sirupsen/logrus"
 	"pulsar-client-go-native/pulsar"
 	"time"
 )
 
 type ProduceArgs struct {
-	Topic             string
-	Rate              int
-	Batching          bool
-	MessageSize       int
-	ProducerQueueSize int
+	Topic              string
+	Rate               int
+	BatchingTimeMillis int
+	MessageSize        int
+	ProducerQueueSize  int
 }
 
 var produceArgs ProduceArgs
@@ -53,16 +52,16 @@ var cmdProduce = &cobra.Command{
 
 func initProducer() {
 	cmdProduce.Flags().IntVarP(&produceArgs.Rate, "rate", "r", 100, "Publish rate. Set to 0 to go unthrottled")
-	cmdProduce.Flags().BoolVarP(&produceArgs.Batching, "batching", "b", true, "Enable batching")
+	cmdProduce.Flags().IntVarP(&produceArgs.BatchingTimeMillis, "batching-time", "b", 1, "Batching grouping time in millis")
 	cmdProduce.Flags().IntVarP(&produceArgs.MessageSize, "size", "s", 1024, "Message size")
 	cmdProduce.Flags().IntVarP(&produceArgs.ProducerQueueSize, "queue-size", "q", 1000, "Produce queue size")
 }
 
 func produce() {
 	b, _ := json.MarshalIndent(clientArgs, "", "  ")
-	fmt.Println("Client config: ", string(b))
+	log.Info("Client config: ", string(b))
 	b, _ = json.MarshalIndent(produceArgs, "", "  ")
-	fmt.Println("Producer config: ", string(b))
+	log.Info("Producer config: ", string(b))
 
 	client, err := pulsar.NewClient(pulsar.ClientOptions{
 		URL: clientArgs.ServiceUrl,
@@ -77,7 +76,7 @@ func produce() {
 	producer, err := client.CreateProducer(pulsar.ProducerOptions{
 		Topic:                   produceArgs.Topic,
 		MaxPendingMessages:      produceArgs.ProducerQueueSize,
-		BatchingMaxPublishDelay: 1 * time.Millisecond,
+		BatchingMaxPublishDelay: time.Millisecond * time.Duration(produceArgs.BatchingTimeMillis),
 		SendTimeout:             0,
 		BlockIfQueueFull:        true,
 	})
@@ -110,7 +109,7 @@ func produce() {
 				Payload: payload,
 			}, func(msgID pulsar.MessageID, message *pulsar.ProducerMessage, e error) {
 				if e != nil {
-					log.Fatal("Failed to publish", e)
+					log.WithError(e).Fatal("Failed to publish")
 				}
 
 				latency := time.Since(start).Seconds()
@@ -128,7 +127,7 @@ func produce() {
 		select {
 		case <-tick.C:
 			messageRate := float64(messagesPublished) / float64(10)
-			log.Printf(`Stats - Publish rate: %6.1f msg/s - %6.1f Mbps - Latency ms: 50%% %5.1f - 95%% %5.1f - 99%% %5.1f - 99.9%% %5.1f - max %6.1f`,
+			log.Infof(`Stats - Publish rate: %6.1f msg/s - %6.1f Mbps - Latency ms: 50%% %5.1f - 95%% %5.1f - 99%% %5.1f - 99.9%% %5.1f - max %6.1f`,
 				messageRate,
 				messageRate*float64(produceArgs.MessageSize)/1024/1024*8,
 				q.Query(0.5)*1000,