You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2018/12/24 09:34:51 UTC

[GitHub] vongosling closed pull request #17: Producer benchmark

vongosling closed pull request #17: Producer benchmark
URL: https://github.com/apache/rocketmq-client-go/pull/17
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/benchmark/main.go b/benchmark/main.go
new file mode 100644
index 0000000..d268d72
--- /dev/null
+++ b/benchmark/main.go
@@ -0,0 +1,62 @@
+package main
+
+import (
+	"fmt"
+	"os"
+	"strings"
+)
+
+type command interface {
+	usage()
+	run(args []string)
+}
+
+var (
+	cmds        = map[string]command{}
+	longText    = ""
+	longTextLen = 0
+)
+
+func init() {
+	longText = strings.Repeat("0123456789", 100)
+	longTextLen = len(longText)
+}
+
+func registerCommand(name string, cmd command) {
+	if cmd == nil {
+		panic("empty command")
+	}
+
+	_, ok := cmds[name]
+	if ok {
+		panic(fmt.Sprintf("%s command existed", name))
+	}
+
+	cmds[name] = cmd
+}
+
+func usage() {
+	println(os.Args[0] + " commandName [...]")
+	for _, cmd := range cmds {
+		cmd.usage()
+	}
+}
+
+// go run *.go [command name] [command args]
+func main() {
+	if len(os.Args) < 2 {
+		println("error:lack cmd name\n")
+		usage()
+		return
+	}
+
+	name := os.Args[1]
+	cmd, ok := cmds[name]
+	if !ok {
+		fmt.Printf("command %s is not supported\n", name)
+		usage()
+		return
+	}
+
+	cmd.run(os.Args[2:])
+}
diff --git a/benchmark/producer.go b/benchmark/producer.go
new file mode 100644
index 0000000..b51df0a
--- /dev/null
+++ b/benchmark/producer.go
@@ -0,0 +1,267 @@
+package main
+
+import (
+	"flag"
+	"fmt"
+	"os"
+	"os/signal"
+	"sync"
+	"sync/atomic"
+	"syscall"
+	"time"
+
+	rocketmq "github.com/apache/rocketmq-client-go/core"
+)
+
+type statiBenchmarkProducerSnapshot struct {
+	sendRequestSuccessCount     int64
+	sendRequestFailedCount      int64
+	receiveResponseSuccessCount int64
+	receiveResponseFailedCount  int64
+	sendMessageSuccessTimeTotal int64
+	sendMessageMaxRT            int64
+	createdAt                   time.Time
+	next                        *statiBenchmarkProducerSnapshot
+}
+
+type snapshots struct {
+	sync.RWMutex
+	head, tail, cur *statiBenchmarkProducerSnapshot
+	len             int
+}
+
+func (s *snapshots) takeSnapshot() {
+	b := s.cur
+	sn := new(statiBenchmarkProducerSnapshot)
+	sn.sendRequestSuccessCount = atomic.LoadInt64(&b.sendRequestSuccessCount)
+	sn.sendRequestFailedCount = atomic.LoadInt64(&b.sendRequestFailedCount)
+	sn.receiveResponseSuccessCount = atomic.LoadInt64(&b.receiveResponseSuccessCount)
+	sn.receiveResponseFailedCount = atomic.LoadInt64(&b.receiveResponseFailedCount)
+	sn.sendMessageSuccessTimeTotal = atomic.LoadInt64(&b.sendMessageSuccessTimeTotal)
+	sn.sendMessageMaxRT = atomic.LoadInt64(&b.sendMessageMaxRT)
+	sn.createdAt = time.Now()
+
+	s.Lock()
+	if s.tail != nil {
+		s.tail.next = sn
+	}
+	s.tail = sn
+	if s.head == nil {
+		s.head = s.tail
+	}
+
+	s.len++
+	if s.len > 10 {
+		s.head = s.head.next
+		s.len--
+	}
+	s.Unlock()
+}
+
+func (s *snapshots) printStati() {
+	s.RLock()
+	if s.len < 10 {
+		s.RUnlock()
+		return
+	}
+
+	f, l := s.head, s.tail
+	respSucCount := float64(l.receiveResponseSuccessCount - f.receiveResponseSuccessCount)
+	sendTps := respSucCount / l.createdAt.Sub(f.createdAt).Seconds()
+	avgRT := float64(l.sendMessageSuccessTimeTotal-f.sendMessageSuccessTimeTotal) / respSucCount
+	maxRT := atomic.LoadInt64(&s.cur.sendMessageMaxRT)
+	s.RUnlock()
+
+	fmt.Printf(
+		"Send TPS: %d Max RT: %d Average RT: %7.3f Send Failed: %d Response Failed: %d Total:%d\n",
+		int64(sendTps), maxRT, avgRT, l.sendRequestFailedCount, l.receiveResponseFailedCount, l.receiveResponseSuccessCount,
+	)
+}
+func takeSnapshot(s *snapshots, exit chan struct{}) {
+	ticker := time.NewTicker(time.Second)
+	for {
+		select {
+		case <-ticker.C:
+			s.takeSnapshot()
+		case <-exit:
+			ticker.Stop()
+			return
+		}
+	}
+}
+
+func printStati(s *snapshots, exit chan struct{}) {
+	ticker := time.NewTicker(time.Second * 10)
+	for {
+		select {
+		case <-ticker.C:
+			s.printStati()
+		case <-exit:
+			ticker.Stop()
+			return
+		}
+	}
+}
+
+type producer struct {
+	topic         string
+	nameSrv       string
+	groupID       string
+	instanceCount int
+	testMinutes   int
+	bodySize      int
+
+	flags *flag.FlagSet
+}
+
+func init() {
+	p := &producer{}
+	flags := flag.NewFlagSet("producer", flag.ExitOnError)
+	p.flags = flags
+
+	flags.StringVar(&p.topic, "t", "", "topic name")
+	flags.StringVar(&p.nameSrv, "n", "", "nameserver address")
+	flags.StringVar(&p.groupID, "g", "", "group id")
+	flags.IntVar(&p.instanceCount, "i", 1, "instance count")
+	flags.IntVar(&p.testMinutes, "m", 10, "test minutes")
+	flags.IntVar(&p.bodySize, "s", 32, "body size")
+
+	registerCommand("producer", p)
+}
+
+func (bp *producer) produceMsg(stati *statiBenchmarkProducerSnapshot, exit chan struct{}) {
+	p, err := rocketmq.NewProducer(&rocketmq.ProducerConfig{
+		ClientConfig: rocketmq.ClientConfig{GroupID: bp.groupID, NameServer: bp.nameSrv},
+	})
+	if err != nil {
+		fmt.Printf("new producer error:%s\n", err)
+		return
+	}
+
+	p.Start()
+	defer p.Shutdown()
+
+	topic, tag := bp.topic, "benchmark-producer"
+
+AGAIN:
+	select {
+	case <-exit:
+		return
+	default:
+	}
+
+	now := time.Now()
+	r := p.SendMessageSync(&rocketmq.Message{
+		Topic: bp.topic, Body: longText[:bp.bodySize],
+	})
+
+	if r.Status == rocketmq.SendOK {
+		atomic.AddInt64(&stati.receiveResponseSuccessCount, 1)
+		atomic.AddInt64(&stati.sendRequestSuccessCount, 1)
+		currentRT := int64(time.Since(now) / time.Millisecond)
+		atomic.AddInt64(&stati.sendMessageSuccessTimeTotal, currentRT)
+		prevRT := atomic.LoadInt64(&stati.sendMessageMaxRT)
+		for currentRT > prevRT {
+			if atomic.CompareAndSwapInt64(&stati.sendMessageMaxRT, prevRT, currentRT) {
+				break
+			}
+			prevRT = atomic.LoadInt64(&stati.sendMessageMaxRT)
+		}
+		goto AGAIN
+	}
+
+	fmt.Printf("%v send message %s:%s error:%s\n", time.Now(), topic, tag, err.Error())
+	//if _, ok := err.(*rpc.ErrorInfo); ok { TODO
+	//atomic.AddInt64(&stati.receiveResponseFailedCount, 1)
+	//} else {
+	//atomic.AddInt64(&stati.sendRequestFailedCount, 1)
+	//}
+	goto AGAIN
+}
+
+func (bp *producer) run(args []string) {
+	bp.flags.Parse(args)
+
+	if bp.topic == "" {
+		println("empty topic")
+		bp.flags.Usage()
+		return
+	}
+
+	if bp.groupID == "" {
+		println("empty group id")
+		bp.flags.Usage()
+		return
+	}
+
+	if bp.nameSrv == "" {
+		println("empty namesrv")
+		bp.flags.Usage()
+		return
+	}
+	if bp.instanceCount <= 0 {
+		println("instance count must be positive integer")
+		bp.flags.Usage()
+		return
+	}
+	if bp.testMinutes <= 0 {
+		println("test time must be positive integer")
+		bp.flags.Usage()
+		return
+	}
+	if bp.bodySize <= 0 {
+		println("body size must be positive integer")
+		bp.flags.Usage()
+		return
+	}
+
+	stati := statiBenchmarkProducerSnapshot{}
+	snapshots := snapshots{cur: &stati}
+	exitChan := make(chan struct{})
+	wg := sync.WaitGroup{}
+
+	for i := 0; i < bp.instanceCount; i++ {
+		i := i
+		go func() {
+			wg.Add(1)
+			bp.produceMsg(&stati, exitChan)
+			fmt.Printf("exit of produce %d\n", i)
+			wg.Done()
+		}()
+	}
+
+	// snapshot
+	go func() {
+		wg.Add(1)
+		takeSnapshot(&snapshots, exitChan)
+		wg.Done()
+	}()
+
+	// print statistic
+	go func() {
+		wg.Add(1)
+		printStati(&snapshots, exitChan)
+		wg.Done()
+	}()
+
+	signalChan := make(chan os.Signal, 1)
+	signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
+	select {
+	case <-time.Tick(time.Minute * time.Duration(bp.testMinutes)):
+	case <-signalChan:
+	}
+
+	close(exitChan)
+	wg.Wait()
+	snapshots.takeSnapshot()
+	snapshots.printStati()
+	fmt.Println("TEST DONE")
+}
+
+func (bp *producer) usage() {
+	bp.flags.Usage()
+}
+
+func (bp *producer) buildMsg() string {
+	return longText[:bp.bodySize]
+}
diff --git a/core/api.go b/core/api.go
index 58c1465..1ed6c82 100644
--- a/core/api.go
+++ b/core/api.go
@@ -22,7 +22,7 @@ func Version() (version string) {
 	return GetVersion()
 }
 
-type clientConfig struct {
+type ClientConfig struct {
 	GroupID          string
 	NameServer       string
 	NameServerDomain string
@@ -32,7 +32,7 @@ type clientConfig struct {
 	LogC             *LogConfig
 }
 
-func (config *clientConfig) string() string {
+func (config *ClientConfig) string() string {
 	// For security, don't print Credentials.
 	str := ""
 	str = strJoin(str, "GroupId", config.GroupID)
@@ -55,14 +55,14 @@ func NewProducer(config *ProducerConfig) (Producer, error) {
 
 // ProducerConfig define a producer
 type ProducerConfig struct {
-	clientConfig
+	ClientConfig
 	SendMsgTimeout int
 	CompressLevel  int
 	MaxMessageSize int
 }
 
 func (config *ProducerConfig) String() string {
-	str := "ProducerConfig=[" + config.clientConfig.string()
+	str := "ProducerConfig=[" + config.ClientConfig.string()
 
 	if config.SendMsgTimeout > 0 {
 		str = strJoin(str, "SendMsgTimeout", config.SendMsgTimeout)
@@ -116,7 +116,7 @@ func (mode MessageModel) String() string {
 
 // PushConsumerConfig define a new consumer.
 type PushConsumerConfig struct {
-	clientConfig
+	ClientConfig
 	ThreadCount         int
 	MessageBatchMaxSize int
 	Model               MessageModel
@@ -124,7 +124,7 @@ type PushConsumerConfig struct {
 
 func (config *PushConsumerConfig) String() string {
 	// For security, don't print Credentials.
-	str := "PushConsumerConfig=[" + config.clientConfig.string()
+	str := "PushConsumerConfig=[" + config.ClientConfig.string()
 
 	if config.ThreadCount > 0 {
 		str = strJoin(str, "ThreadCount", config.ThreadCount)
diff --git a/core/pull_consumer.go b/core/pull_consumer.go
index 6dcac43..fdf8d76 100644
--- a/core/pull_consumer.go
+++ b/core/pull_consumer.go
@@ -65,7 +65,7 @@ func (ps PullStatus) String() string {
 
 // PullConsumerConfig the configuration for the pull consumer
 type PullConsumerConfig struct {
-	clientConfig
+	ClientConfig
 }
 
 // DefaultPullConsumer default consumer pulling the message


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services