You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/14 16:34:01 UTC
[pulsar-client-go] 24/38: Added perf producer/consumer
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
commit 585500e59e10e71ad74bb982e8fab5d0eb216fb8
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Sat May 4 08:32:38 2019 -0700
Added perf producer/consumer
---
perf/perf-consumer.go | 114 +++++++++++++++++++++++++++++++++++++
perf/perf-producer.go | 148 +++++++++++++++++++++++++++++++++++++++++++++++++
perf/pulsar-perf-go.go | 49 ++++++++++++++++
3 files changed, 311 insertions(+)
diff --git a/perf/perf-consumer.go b/perf/perf-consumer.go
new file mode 100644
index 0000000..4ffa36f
--- /dev/null
+++ b/perf/perf-consumer.go
@@ -0,0 +1,114 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package main
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "pulsar-client-go-native/pulsar"
+ "github.com/spf13/cobra"
+ "log"
+ "sync/atomic"
+ "time"
+)
+
+type ConsumeArgs struct {
+ Topic string
+ SubscriptionName string
+ ReceiverQueueSize int
+}
+
+var consumeArgs ConsumeArgs
+
+var cmdConsume = &cobra.Command{
+ Use: "consume <topic>",
+ Short: "Consume from topic",
+ Args: cobra.ExactArgs(1),
+ Run: func(cmd *cobra.Command, args []string) {
+ consumeArgs.Topic = args[0]
+ consume()
+ },
+}
+
+func initConsumer() {
+ cmdConsume.Flags().StringVarP(&consumeArgs.SubscriptionName, "subscription", "s", "sub", "Subscription name")
+ cmdConsume.Flags().IntVarP(&consumeArgs.ReceiverQueueSize, "receiver-queue-size", "r", 1000, "Receiver queue size")
+}
+
+func consume() {
+ b, _ := json.MarshalIndent(clientArgs, "", " ")
+ fmt.Println("Client config: ", string(b))
+ b, _ = json.MarshalIndent(consumeArgs, "", " ")
+ fmt.Println("Consumer config: ", string(b))
+
+ client, err := pulsar.NewClient(pulsar.ClientOptions{
+ URL: clientArgs.ServiceUrl,
+ })
+
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ defer client.Close()
+
+ consumer, err := client.Subscribe(pulsar.ConsumerOptions{
+ Topic: consumeArgs.Topic,
+ SubscriptionName: consumeArgs.SubscriptionName,
+ })
+
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ defer consumer.Close()
+
+ ctx := context.Background()
+
+ var msgReceived int64 = 0
+ var bytesReceived int64 = 0
+
+ go func() {
+ for {
+ msg, _ := consumer.Receive(ctx)
+
+ atomic.AddInt64(&msgReceived, 1)
+ atomic.AddInt64(&bytesReceived, int64(len(msg.Payload())))
+
+ consumer.Ack(msg)
+ }
+ }()
+
+ // Print stats of the consume rate
+ tick := time.NewTicker(10 * time.Second)
+
+ for {
+ select {
+ case <-tick.C:
+ currentMsgReceived := atomic.SwapInt64(&msgReceived, 0)
+ currentBytesReceived := atomic.SwapInt64(&bytesReceived, 0)
+ msgRate := float64(currentMsgReceived) / float64(10)
+ bytesRate := float64(currentBytesReceived) / float64(10)
+
+ log.Printf(`Stats - Consume rate: %6.1f msg/s - %6.1f Mbps`,
+ msgRate, bytesRate*8/1024/1024)
+ }
+ }
+}
\ No newline at end of file
diff --git a/perf/perf-producer.go b/perf/perf-producer.go
new file mode 100644
index 0000000..8b5c0fd
--- /dev/null
+++ b/perf/perf-producer.go
@@ -0,0 +1,148 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package main
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "github.com/beefsack/go-rate"
+ "github.com/bmizerany/perks/quantile"
+ "github.com/spf13/cobra"
+ "log"
+ "pulsar-client-go-native/pulsar"
+ "time"
+)
+
+type ProduceArgs struct {
+ Topic string
+ Rate int
+ Batching bool
+ MessageSize int
+ ProducerQueueSize int
+}
+
+var produceArgs ProduceArgs
+
+var cmdProduce = &cobra.Command{
+ Use: "produce ",
+ Short: "Produce on a topic and measure performance",
+ Args: cobra.ExactArgs(1),
+ Run: func(cmd *cobra.Command, args []string) {
+ produceArgs.Topic = args[0]
+ produce()
+ },
+}
+
+func initProducer() {
+ cmdProduce.Flags().IntVarP(&produceArgs.Rate, "rate", "r", 100, "Publish rate. Set to 0 to go unthrottled")
+ cmdProduce.Flags().BoolVarP(&produceArgs.Batching, "batching", "b", true, "Enable batching")
+ cmdProduce.Flags().IntVarP(&produceArgs.MessageSize, "size", "s", 1024, "Message size")
+ cmdProduce.Flags().IntVarP(&produceArgs.ProducerQueueSize, "queue-size", "q", 1000, "Produce queue size")
+}
+
+func produce() {
+ b, _ := json.MarshalIndent(clientArgs, "", " ")
+ fmt.Println("Client config: ", string(b))
+ b, _ = json.MarshalIndent(produceArgs, "", " ")
+ fmt.Println("Producer config: ", string(b))
+
+ client, err := pulsar.NewClient(pulsar.ClientOptions{
+ URL: clientArgs.ServiceUrl,
+ })
+
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ defer client.Close()
+
+ producer, err := client.CreateProducer(pulsar.ProducerOptions{
+ Topic: produceArgs.Topic,
+ MaxPendingMessages: produceArgs.ProducerQueueSize,
+ BatchingMaxPublishDelay: 1 * time.Millisecond,
+ SendTimeout: 0,
+ BlockIfQueueFull: true,
+ })
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ defer producer.Close()
+
+ ctx := context.Background()
+
+ payload := make([]byte, produceArgs.MessageSize)
+
+ ch := make(chan float64)
+
+ go func() {
+ var rateLimiter *rate.RateLimiter = nil
+ if produceArgs.Rate > 0 {
+ rateLimiter = rate.New(produceArgs.Rate, time.Second)
+ }
+
+ for {
+ if rateLimiter != nil {
+ rateLimiter.Wait()
+ }
+
+ start := time.Now()
+
+ producer.SendAsync(ctx, &pulsar.ProducerMessage{
+ Payload: payload,
+ }, func(msgID pulsar.MessageID, message *pulsar.ProducerMessage, e error) {
+ if e != nil {
+ log.Fatal("Failed to publish", e)
+ }
+
+ latency := time.Since(start).Seconds()
+ ch <- latency
+ })
+ }
+ }()
+
+ // Print stats of the publish rate and latencies
+ tick := time.NewTicker(10 * time.Second)
+ q := quantile.NewTargeted(0.90, 0.95, 0.99, 0.999, 1.0)
+ messagesPublished := 0
+
+ for {
+ select {
+ case <-tick.C:
+ messageRate := float64(messagesPublished) / float64(10)
+ log.Printf(`Stats - Publish rate: %6.1f msg/s - %6.1f Mbps - Latency ms: 50%% %5.1f - 95%% %5.1f - 99%% %5.1f - 99.9%% %5.1f - max %6.1f`,
+ messageRate,
+ messageRate*float64(produceArgs.MessageSize)/1024/1024*8,
+ q.Query(0.5)*1000,
+ q.Query(0.95)*1000,
+ q.Query(0.99)*1000,
+ q.Query(0.999)*1000,
+ q.Query(1.0)*1000,
+ )
+
+ q.Reset()
+ messagesPublished = 0
+ case latency := <-ch:
+ messagesPublished += 1
+ q.Insert(latency)
+ }
+ }
+}
diff --git a/perf/pulsar-perf-go.go b/perf/pulsar-perf-go.go
new file mode 100644
index 0000000..7a6d101
--- /dev/null
+++ b/perf/pulsar-perf-go.go
@@ -0,0 +1,49 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package main
+
+import (
+ "github.com/spf13/cobra"
+ log "github.com/sirupsen/logrus"
+)
+
+type ClientArgs struct {
+ ServiceUrl string
+}
+
+var clientArgs ClientArgs
+
+func main() {
+ log.SetFormatter(&log.TextFormatter{
+ FullTimestamp: true,
+ TimestampFormat: "15:04:05.000",
+ })
+ log.SetLevel(log.InfoLevel)
+
+ initProducer()
+ initConsumer()
+
+ var rootCmd = &cobra.Command{Use: "pulsar-perf-go"}
+ rootCmd.Flags().StringVarP(&clientArgs.ServiceUrl, "service-url", "u",
+ "pulsar://localhost:6650", "The Pulsar service URL")
+ rootCmd.AddCommand(cmdProduce, cmdConsume)
+
+ rootCmd.Execute()
+}