You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2018/12/28 13:53:01 UTC

[GitHub] ShannonDing closed pull request #20: Stable test

ShannonDing closed pull request #20: Stable test
URL: https://github.com/apache/rocketmq-client-go/pull/20
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/benchmark/consumer.go b/benchmark/consumer.go
index 9e1cd77..1a893c9 100644
--- a/benchmark/consumer.go
+++ b/benchmark/consumer.go
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
 package main
 
 import (
diff --git a/benchmark/main.go b/benchmark/main.go
index d268d72..080a948 100644
--- a/benchmark/main.go
+++ b/benchmark/main.go
@@ -1,9 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
 package main
 
 import (
 	"fmt"
 	"os"
-	"strings"
 )
 
 type command interface {
@@ -12,16 +28,9 @@ type command interface {
 }
 
 var (
-	cmds        = map[string]command{}
-	longText    = ""
-	longTextLen = 0
+	cmds = map[string]command{}
 )
 
-func init() {
-	longText = strings.Repeat("0123456789", 100)
-	longTextLen = len(longText)
-}
-
 func registerCommand(name string, cmd command) {
 	if cmd == nil {
 		panic("empty command")
diff --git a/benchmark/message.go b/benchmark/message.go
new file mode 100644
index 0000000..d5690fe
--- /dev/null
+++ b/benchmark/message.go
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package main
+
+import "strings"
+
+var (
+	longText    = ""
+	longTextLen = 0
+)
+
+func init() {
+	longText = strings.Repeat("0123456789", 100)
+	longTextLen = len(longText)
+}
+
+func buildMsg(size int) string {
+	return longText[:size]
+}
diff --git a/benchmark/producer.go b/benchmark/producer.go
index 7b21356..e183269 100644
--- a/benchmark/producer.go
+++ b/benchmark/producer.go
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
 package main
 
 import (
@@ -126,10 +143,15 @@ AGAIN:
 	}
 
 	now := time.Now()
-	r := p.SendMessageSync(&rocketmq.Message{
-		Topic: bp.topic, Body: longText[:bp.bodySize],
+	r, err := p.SendMessageSync(&rocketmq.Message{
+		Topic: bp.topic, Body: buildMsg(bp.bodySize),
 	})
 
+	if err != nil {
+		fmt.Printf("send message sync error:%s", err)
+		goto AGAIN
+	}
+
 	if r.Status == rocketmq.SendOK {
 		atomic.AddInt64(&stati.receiveResponseSuccessCount, 1)
 		atomic.AddInt64(&stati.sendRequestSuccessCount, 1)
@@ -249,7 +271,3 @@ func (bp *producer) run(args []string) {
 func (bp *producer) usage() {
 	bp.flags.Usage()
 }
-
-func (bp *producer) buildMsg() string {
-	return longText[:bp.bodySize]
-}
diff --git a/benchmark/stable.go b/benchmark/stable.go
new file mode 100644
index 0000000..6c7a12c
--- /dev/null
+++ b/benchmark/stable.go
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package main
+
+import (
+	"errors"
+	"flag"
+	"fmt"
+	"os"
+	"os/signal"
+	"syscall"
+	"time"
+
+	rocketmq "github.com/apache/rocketmq-client-go/core"
+)
+
+type stableTest struct {
+	nameSrv       string
+	topic         string
+	groupID       string
+	opIntervalSec int
+	testMin       int
+
+	op func()
+
+	flags *flag.FlagSet
+}
+
+func (st *stableTest) buildFlags(name string) {
+	flags := flag.NewFlagSet(name, flag.ExitOnError)
+	flags.StringVar(&st.topic, "t", "stable-test", "topic name")
+	flags.StringVar(&st.nameSrv, "n", "", "nameserver address")
+	flags.StringVar(&st.groupID, "g", "stable-test", "group id")
+	flags.IntVar(&st.testMin, "m", 10, "test minutes")
+	flags.IntVar(&st.opIntervalSec, "s", 1, "operation interval[produce/consume]")
+
+	st.flags = flags
+}
+
+func (st *stableTest) checkFlag() error {
+	if st.topic == "" {
+		return errors.New("empty topic")
+	}
+
+	if st.nameSrv == "" {
+		return errors.New("empty namesrv")
+	}
+
+	if st.groupID == "" {
+		return errors.New("empty group id")
+	}
+
+	if st.testMin <= 0 {
+		return errors.New("test miniutes must be positive integer")
+	}
+
+	if st.opIntervalSec <= 0 {
+		return errors.New("operation interval must be positive integer")
+	}
+
+	return nil
+}
+
+func (st *stableTest) run() {
+	opTicker := time.NewTicker(time.Duration(st.opIntervalSec) * time.Second)
+	closeChan := time.Tick(time.Duration(st.testMin) * time.Minute)
+
+	signalChan := make(chan os.Signal, 1)
+	signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
+	for {
+		select {
+		case <-signalChan:
+			opTicker.Stop()
+			fmt.Println("test over")
+			return
+		case <-closeChan:
+			opTicker.Stop()
+			fmt.Println("test over")
+			return
+		case <-opTicker.C:
+			st.op()
+		}
+	}
+}
+
+type stableTestProducer struct {
+	*stableTest
+	bodySize int
+
+	p rocketmq.Producer
+}
+
+func (stp *stableTestProducer) buildFlags(name string) {
+	stp.stableTest.buildFlags(name)
+	stp.flags.IntVar(&stp.bodySize, "b", 32, "body size")
+}
+
+func (stp *stableTestProducer) checkFlag() error {
+	err := stp.stableTest.checkFlag()
+	if err != nil {
+		return err
+	}
+	if stp.bodySize <= 0 {
+		return errors.New("message body size must be positive integer")
+	}
+
+	return nil
+}
+
+func (stp *stableTestProducer) usage() {
+	stp.flags.Usage()
+}
+
+func (stp *stableTestProducer) run(args []string) {
+	err := stp.flags.Parse(args)
+	if err != nil {
+		fmt.Printf("parse args:%v, error:%s\n", args, err)
+		stp.usage()
+		return
+	}
+
+	err = stp.checkFlag()
+	if err != nil {
+		fmt.Println(err)
+		stp.usage()
+		return
+	}
+
+	p, err := rocketmq.NewProducer(&rocketmq.ProducerConfig{
+		ClientConfig: rocketmq.ClientConfig{GroupID: stp.groupID, NameServer: stp.nameSrv},
+	})
+	if err != nil {
+		fmt.Printf("new producer error:%s\n", err)
+		return
+	}
+
+	err = p.Start()
+	if err != nil {
+		fmt.Printf("start producer error:%s\n", err)
+		return
+	}
+	defer p.Shutdown()
+
+	stp.p = p
+	stp.stableTest.run()
+}
+
+func (stp *stableTestProducer) sendMessage() {
+	r, err := stp.p.SendMessageSync(&rocketmq.Message{Topic: stp.topic, Body: buildMsg(stp.bodySize)})
+	if err == nil {
+		fmt.Printf("send result:%+v\n", r)
+		return
+	}
+	fmt.Printf("send message error:%s", err)
+}
+
+type stableTestConsumer struct {
+	*stableTest
+	expression string
+
+	c       rocketmq.PullConsumer
+	offsets map[int]int64
+}
+
+func (stc *stableTestConsumer) buildFlags(name string) {
+	stc.stableTest.buildFlags(name)
+	stc.flags.StringVar(&stc.expression, "e", "*", "expression")
+}
+
+func (stc *stableTestConsumer) checkFlag() error {
+	err := stc.stableTest.checkFlag()
+	if err != nil {
+		return err
+	}
+
+	if stc.expression == "" {
+		return errors.New("empty expression")
+	}
+	return nil
+}
+
+func (stc *stableTestConsumer) usage() {
+	stc.flags.Usage()
+}
+
+func (stc *stableTestConsumer) run(args []string) {
+	err := stc.flags.Parse(args)
+	if err != nil {
+		fmt.Printf("parse args:%v, error:%s\n", args, err)
+		stc.usage()
+		return
+	}
+
+	err = stc.checkFlag()
+	if err != nil {
+		stc.usage()
+		fmt.Printf("%s\n", err)
+		return
+	}
+
+	c, err := rocketmq.NewPullConsumer(&rocketmq.PullConsumerConfig{
+		ClientConfig: rocketmq.ClientConfig{GroupID: stc.groupID, NameServer: stc.nameSrv},
+	})
+	if err != nil {
+		fmt.Printf("new pull consumer error:%s\n", err)
+		return
+	}
+
+	err = c.Start()
+	if err != nil {
+		fmt.Printf("start consumer error:%s\n", err)
+		return
+	}
+	defer c.Shutdown()
+
+	stc.c = c
+	stc.stableTest.run()
+}
+
+func (stc *stableTestConsumer) pullMessage() {
+	mqs := stc.c.FetchSubscriptionMessageQueues(stc.topic)
+
+	for _, mq := range mqs {
+		offset := stc.offsets[mq.ID]
+		pr := stc.c.Pull(mq, stc.expression, offset, 32)
+		fmt.Printf("pull from %s, offset:%d, count:%+v\n", mq.String(), offset, len(pr.Messages))
+
+		switch pr.Status {
+		case rocketmq.PullNoNewMsg:
+			stc.offsets[mq.ID] = 0 // pull from the begin
+		case rocketmq.PullFound:
+			fallthrough
+		case rocketmq.PullNoMatchedMsg:
+			fallthrough
+		case rocketmq.PullOffsetIllegal:
+			stc.offsets[mq.ID] = pr.NextBeginOffset
+		case rocketmq.PullBrokerTimeout:
+			fmt.Println("broker timeout occur")
+		}
+	}
+}
+
+func init() {
+	// producer
+	name := "stableTestProducer"
+	p := &stableTestProducer{stableTest: &stableTest{}}
+	p.buildFlags(name)
+	p.op = p.sendMessage
+	registerCommand(name, p)
+
+	// consumer
+	name = "stableTestConsumer"
+	c := &stableTestConsumer{stableTest: &stableTest{}, offsets: map[int]int64{}}
+	c.buildFlags(name)
+	c.op = c.pullMessage
+	registerCommand(name, c)
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services