You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/14 16:34:03 UTC
[pulsar-client-go] 26/38: Use logrus in perf producer/consumer
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
commit e077250d12080dc8688c058df47f4e602b6088a8
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Sat May 4 13:51:38 2019 -0700
Use logrus in perf producer/consumer
---
perf/perf-consumer.go | 9 ++++-----
perf/perf-producer.go | 25 ++++++++++++-------------
2 files changed, 16 insertions(+), 18 deletions(-)
diff --git a/perf/perf-consumer.go b/perf/perf-consumer.go
index 4ffa36f..e6fdf77 100644
--- a/perf/perf-consumer.go
+++ b/perf/perf-consumer.go
@@ -22,10 +22,9 @@ package main
import (
"context"
"encoding/json"
- "fmt"
"pulsar-client-go-native/pulsar"
"github.com/spf13/cobra"
- "log"
+ log "github.com/sirupsen/logrus"
"sync/atomic"
"time"
)
@@ -55,9 +54,9 @@ func initConsumer() {
func consume() {
b, _ := json.MarshalIndent(clientArgs, "", " ")
- fmt.Println("Client config: ", string(b))
+ log.Info("Client config: ", string(b))
b, _ = json.MarshalIndent(consumeArgs, "", " ")
- fmt.Println("Consumer config: ", string(b))
+ log.Info("Consumer config: ", string(b))
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: clientArgs.ServiceUrl,
@@ -107,7 +106,7 @@ func consume() {
msgRate := float64(currentMsgReceived) / float64(10)
bytesRate := float64(currentBytesReceived) / float64(10)
- log.Printf(`Stats - Consume rate: %6.1f msg/s - %6.1f Mbps`,
+ log.Infof(`Stats - Consume rate: %6.1f msg/s - %6.1f Mbps`,
msgRate, bytesRate*8/1024/1024)
}
}
diff --git a/perf/perf-producer.go b/perf/perf-producer.go
index 8b5c0fd..c40af30 100644
--- a/perf/perf-producer.go
+++ b/perf/perf-producer.go
@@ -22,21 +22,20 @@ package main
import (
"context"
"encoding/json"
- "fmt"
"github.com/beefsack/go-rate"
"github.com/bmizerany/perks/quantile"
"github.com/spf13/cobra"
- "log"
+ log "github.com/sirupsen/logrus"
"pulsar-client-go-native/pulsar"
"time"
)
type ProduceArgs struct {
- Topic string
- Rate int
- Batching bool
- MessageSize int
- ProducerQueueSize int
+ Topic string
+ Rate int
+ BatchingTimeMillis int
+ MessageSize int
+ ProducerQueueSize int
}
var produceArgs ProduceArgs
@@ -53,16 +52,16 @@ var cmdProduce = &cobra.Command{
func initProducer() {
cmdProduce.Flags().IntVarP(&produceArgs.Rate, "rate", "r", 100, "Publish rate. Set to 0 to go unthrottled")
- cmdProduce.Flags().BoolVarP(&produceArgs.Batching, "batching", "b", true, "Enable batching")
+ cmdProduce.Flags().IntVarP(&produceArgs.BatchingTimeMillis, "batching-time", "b", 1, "Batching grouping time in millis")
cmdProduce.Flags().IntVarP(&produceArgs.MessageSize, "size", "s", 1024, "Message size")
cmdProduce.Flags().IntVarP(&produceArgs.ProducerQueueSize, "queue-size", "q", 1000, "Produce queue size")
}
func produce() {
b, _ := json.MarshalIndent(clientArgs, "", " ")
- fmt.Println("Client config: ", string(b))
+ log.Info("Client config: ", string(b))
b, _ = json.MarshalIndent(produceArgs, "", " ")
- fmt.Println("Producer config: ", string(b))
+ log.Info("Producer config: ", string(b))
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: clientArgs.ServiceUrl,
@@ -77,7 +76,7 @@ func produce() {
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: produceArgs.Topic,
MaxPendingMessages: produceArgs.ProducerQueueSize,
- BatchingMaxPublishDelay: 1 * time.Millisecond,
+ BatchingMaxPublishDelay: time.Millisecond * time.Duration(produceArgs.BatchingTimeMillis),
SendTimeout: 0,
BlockIfQueueFull: true,
})
@@ -110,7 +109,7 @@ func produce() {
Payload: payload,
}, func(msgID pulsar.MessageID, message *pulsar.ProducerMessage, e error) {
if e != nil {
- log.Fatal("Failed to publish", e)
+ log.WithError(e).Fatal("Failed to publish")
}
latency := time.Since(start).Seconds()
@@ -128,7 +127,7 @@ func produce() {
select {
case <-tick.C:
messageRate := float64(messagesPublished) / float64(10)
- log.Printf(`Stats - Publish rate: %6.1f msg/s - %6.1f Mbps - Latency ms: 50%% %5.1f - 95%% %5.1f - 99%% %5.1f - 99.9%% %5.1f - max %6.1f`,
+ log.Infof(`Stats - Publish rate: %6.1f msg/s - %6.1f Mbps - Latency ms: 50%% %5.1f - 95%% %5.1f - 99%% %5.1f - 99.9%% %5.1f - max %6.1f`,
messageRate,
messageRate*float64(produceArgs.MessageSize)/1024/1024*8,
q.Query(0.5)*1000,