You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/14 16:34:01 UTC

[pulsar-client-go] 24/38: Added perf producer/consumer

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git

commit 585500e59e10e71ad74bb982e8fab5d0eb216fb8
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Sat May 4 08:32:38 2019 -0700

    Added perf producer/consumer
---
 perf/perf-consumer.go  | 114 +++++++++++++++++++++++++++++++++++++
 perf/perf-producer.go  | 148 +++++++++++++++++++++++++++++++++++++++++++++++++
 perf/pulsar-perf-go.go |  49 ++++++++++++++++
 3 files changed, 311 insertions(+)

diff --git a/perf/perf-consumer.go b/perf/perf-consumer.go
new file mode 100644
index 0000000..4ffa36f
--- /dev/null
+++ b/perf/perf-consumer.go
@@ -0,0 +1,114 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package main
+
+import (
+	"context"
+	"encoding/json"
+	"fmt"
+	"pulsar-client-go-native/pulsar"
+	"github.com/spf13/cobra"
+	"log"
+	"sync/atomic"
+	"time"
+)
+
+type ConsumeArgs struct {
+	Topic             string
+	SubscriptionName  string
+	ReceiverQueueSize int
+}
+
+var consumeArgs ConsumeArgs
+
+var cmdConsume = &cobra.Command{
+	Use:   "consume <topic>",
+	Short: "Consume from topic",
+	Args:  cobra.ExactArgs(1),
+	Run: func(cmd *cobra.Command, args []string) {
+		consumeArgs.Topic = args[0]
+		consume()
+	},
+}
+
+func initConsumer() {
+	cmdConsume.Flags().StringVarP(&consumeArgs.SubscriptionName, "subscription", "s", "sub", "Subscription name")
+	cmdConsume.Flags().IntVarP(&consumeArgs.ReceiverQueueSize, "receiver-queue-size", "r", 1000, "Receiver queue size")
+}
+
+func consume() {
+	b, _ := json.MarshalIndent(clientArgs, "", "  ")
+	fmt.Println("Client config: ", string(b))
+	b, _ = json.MarshalIndent(consumeArgs, "", "  ")
+	fmt.Println("Consumer config: ", string(b))
+
+	client, err := pulsar.NewClient(pulsar.ClientOptions{
+		URL:                    clientArgs.ServiceUrl,
+	})
+
+	if err != nil {
+		log.Fatal(err)
+	}
+
+	defer client.Close()
+
+	consumer, err := client.Subscribe(pulsar.ConsumerOptions{
+		Topic:            consumeArgs.Topic,
+		SubscriptionName: consumeArgs.SubscriptionName,
+	})
+
+	if err != nil {
+		log.Fatal(err)
+	}
+
+	defer consumer.Close()
+
+	ctx := context.Background()
+
+	var msgReceived int64 = 0
+	var bytesReceived int64 = 0
+
+	go func() {
+		for {
+			msg, _ := consumer.Receive(ctx)
+
+			atomic.AddInt64(&msgReceived, 1)
+			atomic.AddInt64(&bytesReceived, int64(len(msg.Payload())))
+
+			consumer.Ack(msg)
+		}
+	}()
+
+	// Print stats of the consume rate
+	tick := time.NewTicker(10 * time.Second)
+
+	for {
+		select {
+		case <-tick.C:
+			currentMsgReceived := atomic.SwapInt64(&msgReceived, 0)
+			currentBytesReceived := atomic.SwapInt64(&bytesReceived, 0)
+			msgRate := float64(currentMsgReceived) / float64(10)
+			bytesRate := float64(currentBytesReceived) / float64(10)
+
+			log.Printf(`Stats - Consume rate: %6.1f msg/s - %6.1f Mbps`,
+				msgRate, bytesRate*8/1024/1024)
+		}
+	}
+}
\ No newline at end of file
diff --git a/perf/perf-producer.go b/perf/perf-producer.go
new file mode 100644
index 0000000..8b5c0fd
--- /dev/null
+++ b/perf/perf-producer.go
@@ -0,0 +1,148 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package main
+
+import (
+	"context"
+	"encoding/json"
+	"fmt"
+	"github.com/beefsack/go-rate"
+	"github.com/bmizerany/perks/quantile"
+	"github.com/spf13/cobra"
+	"log"
+	"pulsar-client-go-native/pulsar"
+	"time"
+)
+
+type ProduceArgs struct {
+	Topic             string
+	Rate              int
+	Batching          bool
+	MessageSize       int
+	ProducerQueueSize int
+}
+
+var produceArgs ProduceArgs
+
+var cmdProduce = &cobra.Command{
+	Use:   "produce ",
+	Short: "Produce on a topic and measure performance",
+	Args:  cobra.ExactArgs(1),
+	Run: func(cmd *cobra.Command, args []string) {
+		produceArgs.Topic = args[0]
+		produce()
+	},
+}
+
+func initProducer() {
+	cmdProduce.Flags().IntVarP(&produceArgs.Rate, "rate", "r", 100, "Publish rate. Set to 0 to go unthrottled")
+	cmdProduce.Flags().BoolVarP(&produceArgs.Batching, "batching", "b", true, "Enable batching")
+	cmdProduce.Flags().IntVarP(&produceArgs.MessageSize, "size", "s", 1024, "Message size")
+	cmdProduce.Flags().IntVarP(&produceArgs.ProducerQueueSize, "queue-size", "q", 1000, "Produce queue size")
+}
+
+func produce() {
+	b, _ := json.MarshalIndent(clientArgs, "", "  ")
+	fmt.Println("Client config: ", string(b))
+	b, _ = json.MarshalIndent(produceArgs, "", "  ")
+	fmt.Println("Producer config: ", string(b))
+
+	client, err := pulsar.NewClient(pulsar.ClientOptions{
+		URL: clientArgs.ServiceUrl,
+	})
+
+	if err != nil {
+		log.Fatal(err)
+	}
+
+	defer client.Close()
+
+	producer, err := client.CreateProducer(pulsar.ProducerOptions{
+		Topic:                   produceArgs.Topic,
+		MaxPendingMessages:      produceArgs.ProducerQueueSize,
+		BatchingMaxPublishDelay: 1 * time.Millisecond,
+		SendTimeout:             0,
+		BlockIfQueueFull:        true,
+	})
+	if err != nil {
+		log.Fatal(err)
+	}
+
+	defer producer.Close()
+
+	ctx := context.Background()
+
+	payload := make([]byte, produceArgs.MessageSize)
+
+	ch := make(chan float64)
+
+	go func() {
+		var rateLimiter *rate.RateLimiter = nil
+		if produceArgs.Rate > 0 {
+			rateLimiter = rate.New(produceArgs.Rate, time.Second)
+		}
+
+		for {
+			if rateLimiter != nil {
+				rateLimiter.Wait()
+			}
+
+			start := time.Now()
+
+			producer.SendAsync(ctx, &pulsar.ProducerMessage{
+				Payload: payload,
+			}, func(msgID pulsar.MessageID, message *pulsar.ProducerMessage, e error) {
+				if e != nil {
+					log.Fatal("Failed to publish", e)
+				}
+
+				latency := time.Since(start).Seconds()
+				ch <- latency
+			})
+		}
+	}()
+
+	// Print stats of the publish rate and latencies
+	tick := time.NewTicker(10 * time.Second)
+	q := quantile.NewTargeted(0.90, 0.95, 0.99, 0.999, 1.0)
+	messagesPublished := 0
+
+	for {
+		select {
+		case <-tick.C:
+			messageRate := float64(messagesPublished) / float64(10)
+			log.Printf(`Stats - Publish rate: %6.1f msg/s - %6.1f Mbps - Latency ms: 50%% %5.1f - 95%% %5.1f - 99%% %5.1f - 99.9%% %5.1f - max %6.1f`,
+				messageRate,
+				messageRate*float64(produceArgs.MessageSize)/1024/1024*8,
+				q.Query(0.5)*1000,
+				q.Query(0.95)*1000,
+				q.Query(0.99)*1000,
+				q.Query(0.999)*1000,
+				q.Query(1.0)*1000,
+			)
+
+			q.Reset()
+			messagesPublished = 0
+		case latency := <-ch:
+			messagesPublished += 1
+			q.Insert(latency)
+		}
+	}
+}
diff --git a/perf/pulsar-perf-go.go b/perf/pulsar-perf-go.go
new file mode 100644
index 0000000..7a6d101
--- /dev/null
+++ b/perf/pulsar-perf-go.go
@@ -0,0 +1,49 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package main
+
+import (
+	"github.com/spf13/cobra"
+	log "github.com/sirupsen/logrus"
+)
+
+type ClientArgs struct {
+	ServiceUrl      string
+}
+
+var clientArgs ClientArgs
+
+func main() {
+	log.SetFormatter(&log.TextFormatter{
+		FullTimestamp:   true,
+		TimestampFormat: "15:04:05.000",
+	})
+	log.SetLevel(log.InfoLevel)
+
+	initProducer()
+	initConsumer()
+
+	var rootCmd = &cobra.Command{Use: "pulsar-perf-go"}
+	rootCmd.Flags().StringVarP(&clientArgs.ServiceUrl, "service-url", "u",
+		"pulsar://localhost:6650", "The Pulsar service URL")
+	rootCmd.AddCommand(cmdProduce, cmdConsume)
+
+	rootCmd.Execute()
+}