You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2018/12/24 09:34:51 UTC
[GitHub] vongosling closed pull request #17: Producer benchmark
vongosling closed pull request #17: Producer benchmark
URL: https://github.com/apache/rocketmq-client-go/pull/17
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/benchmark/main.go b/benchmark/main.go
new file mode 100644
index 0000000..d268d72
--- /dev/null
+++ b/benchmark/main.go
@@ -0,0 +1,62 @@
+package main
+
+import (
+ "fmt"
+ "os"
+ "strings"
+)
+
+type command interface {
+ usage()
+ run(args []string)
+}
+
+var (
+ cmds = map[string]command{}
+ longText = ""
+ longTextLen = 0
+)
+
+func init() {
+ longText = strings.Repeat("0123456789", 100)
+ longTextLen = len(longText)
+}
+
+func registerCommand(name string, cmd command) {
+ if cmd == nil {
+ panic("empty command")
+ }
+
+ _, ok := cmds[name]
+ if ok {
+ panic(fmt.Sprintf("%s command existed", name))
+ }
+
+ cmds[name] = cmd
+}
+
+func usage() {
+ println(os.Args[0] + " commandName [...]")
+ for _, cmd := range cmds {
+ cmd.usage()
+ }
+}
+
+// go run *.go [command name] [command args]
+func main() {
+ if len(os.Args) < 2 {
+ println("error:lack cmd name\n")
+ usage()
+ return
+ }
+
+ name := os.Args[1]
+ cmd, ok := cmds[name]
+ if !ok {
+ fmt.Printf("command %s is not supported\n", name)
+ usage()
+ return
+ }
+
+ cmd.run(os.Args[2:])
+}
diff --git a/benchmark/producer.go b/benchmark/producer.go
new file mode 100644
index 0000000..b51df0a
--- /dev/null
+++ b/benchmark/producer.go
@@ -0,0 +1,267 @@
+package main
+
+import (
+ "flag"
+ "fmt"
+ "os"
+ "os/signal"
+ "sync"
+ "sync/atomic"
+ "syscall"
+ "time"
+
+ rocketmq "github.com/apache/rocketmq-client-go/core"
+)
+
+type statiBenchmarkProducerSnapshot struct {
+ sendRequestSuccessCount int64
+ sendRequestFailedCount int64
+ receiveResponseSuccessCount int64
+ receiveResponseFailedCount int64
+ sendMessageSuccessTimeTotal int64
+ sendMessageMaxRT int64
+ createdAt time.Time
+ next *statiBenchmarkProducerSnapshot
+}
+
+type snapshots struct {
+ sync.RWMutex
+ head, tail, cur *statiBenchmarkProducerSnapshot
+ len int
+}
+
+func (s *snapshots) takeSnapshot() {
+ b := s.cur
+ sn := new(statiBenchmarkProducerSnapshot)
+ sn.sendRequestSuccessCount = atomic.LoadInt64(&b.sendRequestSuccessCount)
+ sn.sendRequestFailedCount = atomic.LoadInt64(&b.sendRequestFailedCount)
+ sn.receiveResponseSuccessCount = atomic.LoadInt64(&b.receiveResponseSuccessCount)
+ sn.receiveResponseFailedCount = atomic.LoadInt64(&b.receiveResponseFailedCount)
+ sn.sendMessageSuccessTimeTotal = atomic.LoadInt64(&b.sendMessageSuccessTimeTotal)
+ sn.sendMessageMaxRT = atomic.LoadInt64(&b.sendMessageMaxRT)
+ sn.createdAt = time.Now()
+
+ s.Lock()
+ if s.tail != nil {
+ s.tail.next = sn
+ }
+ s.tail = sn
+ if s.head == nil {
+ s.head = s.tail
+ }
+
+ s.len++
+ if s.len > 10 {
+ s.head = s.head.next
+ s.len--
+ }
+ s.Unlock()
+}
+
+func (s *snapshots) printStati() {
+ s.RLock()
+ if s.len < 10 {
+ s.RUnlock()
+ return
+ }
+
+ f, l := s.head, s.tail
+ respSucCount := float64(l.receiveResponseSuccessCount - f.receiveResponseSuccessCount)
+ sendTps := respSucCount / l.createdAt.Sub(f.createdAt).Seconds()
+ avgRT := float64(l.sendMessageSuccessTimeTotal-f.sendMessageSuccessTimeTotal) / respSucCount
+ maxRT := atomic.LoadInt64(&s.cur.sendMessageMaxRT)
+ s.RUnlock()
+
+ fmt.Printf(
+ "Send TPS: %d Max RT: %d Average RT: %7.3f Send Failed: %d Response Failed: %d Total:%d\n",
+ int64(sendTps), maxRT, avgRT, l.sendRequestFailedCount, l.receiveResponseFailedCount, l.receiveResponseSuccessCount,
+ )
+}
+func takeSnapshot(s *snapshots, exit chan struct{}) {
+ ticker := time.NewTicker(time.Second)
+ for {
+ select {
+ case <-ticker.C:
+ s.takeSnapshot()
+ case <-exit:
+ ticker.Stop()
+ return
+ }
+ }
+}
+
+func printStati(s *snapshots, exit chan struct{}) {
+ ticker := time.NewTicker(time.Second * 10)
+ for {
+ select {
+ case <-ticker.C:
+ s.printStati()
+ case <-exit:
+ ticker.Stop()
+ return
+ }
+ }
+}
+
+type producer struct {
+ topic string
+ nameSrv string
+ groupID string
+ instanceCount int
+ testMinutes int
+ bodySize int
+
+ flags *flag.FlagSet
+}
+
+func init() {
+ p := &producer{}
+ flags := flag.NewFlagSet("producer", flag.ExitOnError)
+ p.flags = flags
+
+ flags.StringVar(&p.topic, "t", "", "topic name")
+ flags.StringVar(&p.nameSrv, "n", "", "nameserver address")
+ flags.StringVar(&p.groupID, "g", "", "group id")
+ flags.IntVar(&p.instanceCount, "i", 1, "instance count")
+ flags.IntVar(&p.testMinutes, "m", 10, "test minutes")
+ flags.IntVar(&p.bodySize, "s", 32, "body size")
+
+ registerCommand("producer", p)
+}
+
+func (bp *producer) produceMsg(stati *statiBenchmarkProducerSnapshot, exit chan struct{}) {
+ p, err := rocketmq.NewProducer(&rocketmq.ProducerConfig{
+ ClientConfig: rocketmq.ClientConfig{GroupID: bp.groupID, NameServer: bp.nameSrv},
+ })
+ if err != nil {
+ fmt.Printf("new producer error:%s\n", err)
+ return
+ }
+
+ p.Start()
+ defer p.Shutdown()
+
+ topic, tag := bp.topic, "benchmark-producer"
+
+AGAIN:
+ select {
+ case <-exit:
+ return
+ default:
+ }
+
+ now := time.Now()
+ r := p.SendMessageSync(&rocketmq.Message{
+ Topic: bp.topic, Body: longText[:bp.bodySize],
+ })
+
+ if r.Status == rocketmq.SendOK {
+ atomic.AddInt64(&stati.receiveResponseSuccessCount, 1)
+ atomic.AddInt64(&stati.sendRequestSuccessCount, 1)
+ currentRT := int64(time.Since(now) / time.Millisecond)
+ atomic.AddInt64(&stati.sendMessageSuccessTimeTotal, currentRT)
+ prevRT := atomic.LoadInt64(&stati.sendMessageMaxRT)
+ for currentRT > prevRT {
+ if atomic.CompareAndSwapInt64(&stati.sendMessageMaxRT, prevRT, currentRT) {
+ break
+ }
+ prevRT = atomic.LoadInt64(&stati.sendMessageMaxRT)
+ }
+ goto AGAIN
+ }
+
+ fmt.Printf("%v send message %s:%s error:%s\n", time.Now(), topic, tag, err.Error())
+ //if _, ok := err.(*rpc.ErrorInfo); ok { TODO
+ //atomic.AddInt64(&stati.receiveResponseFailedCount, 1)
+ //} else {
+ //atomic.AddInt64(&stati.sendRequestFailedCount, 1)
+ //}
+ goto AGAIN
+}
+
+func (bp *producer) run(args []string) {
+ bp.flags.Parse(args)
+
+ if bp.topic == "" {
+ println("empty topic")
+ bp.flags.Usage()
+ return
+ }
+
+ if bp.groupID == "" {
+ println("empty group id")
+ bp.flags.Usage()
+ return
+ }
+
+ if bp.nameSrv == "" {
+ println("empty namesrv")
+ bp.flags.Usage()
+ return
+ }
+ if bp.instanceCount <= 0 {
+ println("instance count must be positive integer")
+ bp.flags.Usage()
+ return
+ }
+ if bp.testMinutes <= 0 {
+ println("test time must be positive integer")
+ bp.flags.Usage()
+ return
+ }
+ if bp.bodySize <= 0 {
+ println("body size must be positive integer")
+ bp.flags.Usage()
+ return
+ }
+
+ stati := statiBenchmarkProducerSnapshot{}
+ snapshots := snapshots{cur: &stati}
+ exitChan := make(chan struct{})
+ wg := sync.WaitGroup{}
+
+ for i := 0; i < bp.instanceCount; i++ {
+ i := i
+ go func() {
+ wg.Add(1)
+ bp.produceMsg(&stati, exitChan)
+ fmt.Printf("exit of produce %d\n", i)
+ wg.Done()
+ }()
+ }
+
+ // snapshot
+ go func() {
+ wg.Add(1)
+ takeSnapshot(&snapshots, exitChan)
+ wg.Done()
+ }()
+
+ // print statistic
+ go func() {
+ wg.Add(1)
+ printStati(&snapshots, exitChan)
+ wg.Done()
+ }()
+
+ signalChan := make(chan os.Signal, 1)
+ signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
+ select {
+ case <-time.Tick(time.Minute * time.Duration(bp.testMinutes)):
+ case <-signalChan:
+ }
+
+ close(exitChan)
+ wg.Wait()
+ snapshots.takeSnapshot()
+ snapshots.printStati()
+ fmt.Println("TEST DONE")
+}
+
+func (bp *producer) usage() {
+ bp.flags.Usage()
+}
+
+func (bp *producer) buildMsg() string {
+ return longText[:bp.bodySize]
+}
diff --git a/core/api.go b/core/api.go
index 58c1465..1ed6c82 100644
--- a/core/api.go
+++ b/core/api.go
@@ -22,7 +22,7 @@ func Version() (version string) {
return GetVersion()
}
-type clientConfig struct {
+type ClientConfig struct {
GroupID string
NameServer string
NameServerDomain string
@@ -32,7 +32,7 @@ type clientConfig struct {
LogC *LogConfig
}
-func (config *clientConfig) string() string {
+func (config *ClientConfig) string() string {
// For security, don't print Credentials.
str := ""
str = strJoin(str, "GroupId", config.GroupID)
@@ -55,14 +55,14 @@ func NewProducer(config *ProducerConfig) (Producer, error) {
// ProducerConfig define a producer
type ProducerConfig struct {
- clientConfig
+ ClientConfig
SendMsgTimeout int
CompressLevel int
MaxMessageSize int
}
func (config *ProducerConfig) String() string {
- str := "ProducerConfig=[" + config.clientConfig.string()
+ str := "ProducerConfig=[" + config.ClientConfig.string()
if config.SendMsgTimeout > 0 {
str = strJoin(str, "SendMsgTimeout", config.SendMsgTimeout)
@@ -116,7 +116,7 @@ func (mode MessageModel) String() string {
// PushConsumerConfig define a new consumer.
type PushConsumerConfig struct {
- clientConfig
+ ClientConfig
ThreadCount int
MessageBatchMaxSize int
Model MessageModel
@@ -124,7 +124,7 @@ type PushConsumerConfig struct {
func (config *PushConsumerConfig) String() string {
// For security, don't print Credentials.
- str := "PushConsumerConfig=[" + config.clientConfig.string()
+ str := "PushConsumerConfig=[" + config.ClientConfig.string()
if config.ThreadCount > 0 {
str = strJoin(str, "ThreadCount", config.ThreadCount)
diff --git a/core/pull_consumer.go b/core/pull_consumer.go
index 6dcac43..fdf8d76 100644
--- a/core/pull_consumer.go
+++ b/core/pull_consumer.go
@@ -65,7 +65,7 @@ func (ps PullStatus) String() string {
// PullConsumerConfig the configuration for the pull consumer
type PullConsumerConfig struct {
- clientConfig
+ ClientConfig
}
// DefaultPullConsumer default consumer pulling the message
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services