You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2018/12/27 15:03:26 UTC

[GitHub] ShannonDing closed pull request #19: Optimizing Codebase Style

ShannonDing closed pull request #19: Optimizing Codebase Style
URL: https://github.com/apache/rocketmq-client-go/pull/19
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/cmd/producer.go b/cmd/producer.go
deleted file mode 100644
index b67f50d..0000000
--- a/cmd/producer.go
+++ /dev/null
@@ -1,58 +0,0 @@
-package main
-
-import (
-	"flag"
-	"fmt"
-	"github.com/apache/rocketmq-client-go/core"
-)
-
-var (
-	namesrvAddrs string
-	topic        string
-	body         string
-	groupID      string
-	keys         string
-)
-
-func init() {
-	flag.StringVar(&namesrvAddrs, "addr", "", "name server address")
-	flag.StringVar(&topic, "t", "", "topic name")
-	flag.StringVar(&groupID, "group", "", "producer group")
-	flag.StringVar(&body, "body", "", "message body")
-	flag.StringVar(&keys, "keys", "", "message keys")
-
-}
-
-func main() {
-	flag.Parse()
-	if namesrvAddrs == "" {
-		println("empty nameServer address")
-		return
-	}
-
-	if topic == "" {
-		println("empty topic")
-		return
-	}
-
-	if body == "" {
-		println("empty body")
-		return
-	}
-
-	if groupID == "" {
-		println("empty groupID")
-		return
-	}
-
-	cfg := &rocketmq.ProducerConfig{}
-	cfg.GroupID = groupID
-	cfg.NameServer = namesrvAddrs
-
-	producer, _ := rocketmq.NewProducer(cfg)
-	producer.Start()
-	defer producer.Shutdown()
-
-	result := producer.SendMessageSync(&rocketmq.Message{Topic: topic, Body: body, Keys: keys})
-	println(fmt.Sprintf("send message result: %s", result))
-}
diff --git a/core/api.go b/core/api.go
index 1ed6c82..8a6fdd3 100644
--- a/core/api.go
+++ b/core/api.go
@@ -32,7 +32,7 @@ type ClientConfig struct {
 	LogC             *LogConfig
 }
 
-func (config *ClientConfig) string() string {
+func (config *ClientConfig) String() string {
 	// For security, don't print Credentials.
 	str := ""
 	str = strJoin(str, "GroupId", config.GroupID)
@@ -62,7 +62,7 @@ type ProducerConfig struct {
 }
 
 func (config *ProducerConfig) String() string {
-	str := "ProducerConfig=[" + config.ClientConfig.string()
+	str := "ProducerConfig=[" + config.ClientConfig.String()
 
 	if config.SendMsgTimeout > 0 {
 		str = strJoin(str, "SendMsgTimeout", config.SendMsgTimeout)
@@ -82,13 +82,17 @@ func (config *ProducerConfig) String() string {
 type Producer interface {
 	baseAPI
 	// SendMessageSync send a message with sync
-	SendMessageSync(msg *Message) SendResult
+	SendMessageSync(msg *Message) (*SendResult, error)
 
 	// SendMessageOrderly send the message orderly
-	SendMessageOrderly(msg *Message, selector MessageQueueSelector, arg interface{}, autoRetryTimes int) SendResult
+	SendMessageOrderly(
+		msg *Message,
+		selector MessageQueueSelector,
+		arg interface{},
+		autoRetryTimes int) (*SendResult, error)
 
 	// SendMessageOneway send a message with oneway
-	SendMessageOneway(msg *Message)
+	SendMessageOneway(msg *Message) error
 }
 
 // NewPushConsumer create a new consumer with config.
@@ -124,7 +128,7 @@ type PushConsumerConfig struct {
 
 func (config *PushConsumerConfig) String() string {
 	// For security, don't print Credentials.
-	str := "PushConsumerConfig=[" + config.ClientConfig.string()
+	str := "PushConsumerConfig=[" + config.ClientConfig.String()
 
 	if config.ThreadCount > 0 {
 		str = strJoin(str, "ThreadCount", config.ThreadCount)
@@ -148,6 +152,15 @@ type PushConsumer interface {
 	Subscribe(topic, expression string, consumeFunc func(msg *MessageExt) ConsumeStatus) error
 }
 
+// PullConsumerConfig the configuration for the pull consumer
+type PullConsumerConfig struct {
+	ClientConfig
+}
+
+func (config *PullConsumerConfig) String() string {
+	return "PushConsumerConfig=[" + config.ClientConfig.String() + "]"
+}
+
 // PullConsumer consumer pulling the message
 type PullConsumer interface {
 	baseAPI
@@ -176,7 +189,7 @@ type SendResult struct {
 	Offset int64
 }
 
-func (result SendResult) String() string {
+func (result *SendResult) String() string {
 	return fmt.Sprintf("[status: %s, messageId: %s, offset: %d]", result.Status, result.MsgId, result.Offset)
 }
 
diff --git a/core/api_test.go b/core/api_test.go
index fc507f0..01f360b 100644
--- a/core/api_test.go
+++ b/core/api_test.go
@@ -37,8 +37,8 @@ func TestProducerConfig_String(t *testing.T) {
 	pConfig.CompressLevel = 4
 	pConfig.MaxMessageSize = 1024
 
-	expect := "ProducerConfig=[GroupId: testGroup, NameServer: localhost:9876, NameServerDomain: NameServerDomain, " +
-		"GroupId: testGroup, InstanceName: testProducer, " +
+	expect := "ProducerConfig=[GroupId: testGroup, NameServer: localhost:9876, NameServerDomain: domain1, " +
+		"GroupName: producerGroupName, InstanceName: testProducer, " +
 		"LogConfig: {Path:/rocketmq/log FileNum:16 FileSize:1048576 Level:Debug}, S" +
 		"endMsgTimeout: 30, CompressLevel: 4, MaxMessageSize: 1024, ]"
 	assert.Equal(t, expect, pConfig.String())
diff --git a/core/error.go b/core/error.go
new file mode 100644
index 0000000..b50b4b2
--- /dev/null
+++ b/core/error.go
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package rocketmq
+
+/*
+#include "rocketmq/CCommon.h"
+*/
+import "C"
+import "fmt"
+
+type rmqError int
+
+const (
+	NIL                        = rmqError(C.OK)
+	ErrNullPoint               = rmqError(C.NULL_POINTER)
+	ErrMallocFailed            = rmqError(C.MALLOC_FAILED)
+	ErrProducerStartFailed     = rmqError(C.PRODUCER_START_FAILED)
+	ErrSendSyncFailed          = rmqError(C.PRODUCER_SEND_SYNC_FAILED)
+	ErrSendOnewayFailed        = rmqError(C.PRODUCER_SEND_ONEWAY_FAILED)
+	ErrSendOrderlyFailed       = rmqError(C.PRODUCER_SEND_ORDERLY_FAILED)
+	ErrPushConsumerStartFailed = rmqError(C.PUSHCONSUMER_ERROR_CODE_START)
+	ErrPullConsumerStartFailed = rmqError(C.PULLCONSUMER_START_FAILED)
+	ErrFetchMQFailed           = rmqError(C.PULLCONSUMER_FETCH_MQ_FAILED)
+	ErrFetchMessageFailed      = rmqError(C.PULLCONSUMER_FETCH_MESSAGE_FAILED)
+)
+
+func (e rmqError) Error() string {
+	switch e {
+	case ErrNullPoint:
+		return "null point"
+	case ErrMallocFailed:
+		return "malloc memory failed"
+	case ErrProducerStartFailed:
+		return "start producer failed"
+	case ErrSendSyncFailed:
+		return "send message with sync failed"
+	case ErrSendOrderlyFailed:
+		return "send message with orderly failed"
+	case ErrSendOnewayFailed:
+		return "send message with oneway failed"
+	case ErrPushConsumerStartFailed:
+		return "start push-consumer failed"
+	case ErrPullConsumerStartFailed:
+		return "start pull-consumer failed"
+	case ErrFetchMQFailed:
+		return "fetch MessageQueue failed"
+	case ErrFetchMessageFailed:
+		return "fetch Message failed"
+	default:
+		return fmt.Sprintf("unknow error: %v", int(e))
+	}
+}
diff --git a/core/producer.go b/core/producer.go
index f402589..8ad3ea4 100644
--- a/core/producer.go
+++ b/core/producer.go
@@ -33,7 +33,6 @@ int queueSelectorCallback_cgo(int size, CMessage *msg, void *selectorKey) {
 import "C"
 import (
 	"errors"
-	"fmt"
 	log "github.com/sirupsen/logrus"
 	"unsafe"
 )
@@ -81,34 +80,34 @@ func newDefaultProducer(config *ProducerConfig) (*defaultProducer, error) {
 	C.free(unsafe.Pointer(cs))
 
 	if cproduer == nil {
-		return nil, errors.New("create Producer failed, please check cpp logs for details")
+		return nil, errors.New("create Producer failed")
 	}
 
-	var code int
+	var err rmqError
 	if config.NameServer != "" {
 		cs = C.CString(config.NameServer)
-		code = int(C.SetProducerNameServerAddress(cproduer, cs))
+		err = rmqError(C.SetProducerNameServerAddress(cproduer, cs))
 		C.free(unsafe.Pointer(cs))
-		if code != 0 {
-			return nil, fmt.Errorf("producer Set NameServerAddress error, code is: %d", code)
+		if err != NIL {
+			return nil, err
 		}
 	}
 
 	if config.NameServerDomain != "" {
 		cs = C.CString(config.NameServerDomain)
-		code = int(C.SetProducerNameServerDomain(cproduer, cs))
+		err = rmqError(C.SetProducerNameServerDomain(cproduer, cs))
 		C.free(unsafe.Pointer(cs))
-		if code != 0 {
-			return nil, fmt.Errorf("producer Set NameServerDomain error, code is: %d", code)
+		if err != NIL {
+			return nil, err
 		}
 	}
 
 	if config.InstanceName != "" {
 		cs = C.CString(config.InstanceName)
-		code = int(C.SetProducerInstanceName(cproduer, cs))
+		err = rmqError(C.SetProducerInstanceName(cproduer, cs))
 		C.free(unsafe.Pointer(cs))
-		if code != 0 {
-			return nil, fmt.Errorf("producer Set InstanceName error, code is: %d", code)
+		if err != NIL {
+			return nil, err
 		}
 	}
 
@@ -116,53 +115,53 @@ func newDefaultProducer(config *ProducerConfig) (*defaultProducer, error) {
 		ak := C.CString(config.Credentials.AccessKey)
 		sk := C.CString(config.Credentials.SecretKey)
 		ch := C.CString(config.Credentials.Channel)
-		code = int(C.SetProducerSessionCredentials(cproduer, ak, sk, ch))
+		err = rmqError(C.SetProducerSessionCredentials(cproduer, ak, sk, ch))
 
 		C.free(unsafe.Pointer(ak))
 		C.free(unsafe.Pointer(sk))
 		C.free(unsafe.Pointer(ch))
-		if code != 0 {
-			return nil, fmt.Errorf("producer Set Credentials error, code is: %d", code)
+		if err != NIL {
+			return nil, err
 		}
 	}
 
 	if config.LogC != nil {
 		cs = C.CString(config.LogC.Path)
-		code = int(C.SetProducerLogPath(cproduer, cs))
+		err = rmqError(C.SetProducerLogPath(cproduer, cs))
 		C.free(unsafe.Pointer(cs))
-		if code != 0 {
-			return nil, fmt.Errorf("producer Set LogPath error, code is: %d", code)
+		if err != NIL {
+			return nil, err
 		}
 
-		code = int(C.SetProducerLogFileNumAndSize(cproduer, C.int(config.LogC.FileNum), C.long(config.LogC.FileSize)))
-		if code != 0 {
-			return nil, fmt.Errorf("producer Set FileNumAndSize error, code is: %d", code)
+		err = rmqError(C.SetProducerLogFileNumAndSize(cproduer, C.int(config.LogC.FileNum), C.long(config.LogC.FileSize)))
+		if err != NIL {
+			return nil, err
 		}
 
-		code = int(C.SetProducerLogLevel(cproduer, C.CLogLevel(config.LogC.Level)))
-		if code != 0 {
-			return nil, fmt.Errorf("producer Set LogLevel error, code is: %d", code)
+		err = rmqError(C.SetProducerLogLevel(cproduer, C.CLogLevel(config.LogC.Level)))
+		if err != NIL {
+			return nil, err
 		}
 	}
 
 	if config.SendMsgTimeout > 0 {
-		code = int(C.SetProducerSendMsgTimeout(cproduer, C.int(config.SendMsgTimeout)))
-		if code != 0 {
-			return nil, fmt.Errorf("producer Set SendMsgTimeout error, code is: %d", code)
+		err = rmqError(C.SetProducerSendMsgTimeout(cproduer, C.int(config.SendMsgTimeout)))
+		if err != NIL {
+			return nil, err
 		}
 	}
 
 	if config.CompressLevel > 0 {
-		code = int(C.SetProducerCompressLevel(cproduer, C.int(config.CompressLevel)))
-		if code != 0 {
-			return nil, fmt.Errorf("producer Set CompressLevel error, code is: %d", code)
+		err = rmqError(C.SetProducerCompressLevel(cproduer, C.int(config.CompressLevel)))
+		if err != NIL {
+			return nil, err
 		}
 	}
 
 	if config.MaxMessageSize > 0 {
-		code = int(C.SetProducerMaxMessageSize(cproduer, C.int(config.MaxMessageSize)))
-		if code != 0 {
-			return nil, fmt.Errorf("producer Set MaxMessageSize error, code is: %d", code)
+		err = rmqError(C.SetProducerMaxMessageSize(cproduer, C.int(config.MaxMessageSize)))
+		if err != NIL {
+			return nil, err
 		}
 	}
 
@@ -181,77 +180,84 @@ func (p *defaultProducer) String() string {
 
 // Start the producer.
 func (p *defaultProducer) Start() error {
-	code := int(C.StartProducer(p.cproduer))
-	if code != 0 {
-		return fmt.Errorf("start producer error, error code is: %d", code)
+	err := rmqError(C.StartProducer(p.cproduer))
+	if err != NIL {
+		return err
 	}
 	return nil
 }
 
 // Shutdown the producer.
 func (p *defaultProducer) Shutdown() error {
-	code := int(C.ShutdownProducer(p.cproduer))
+	err := rmqError(C.ShutdownProducer(p.cproduer))
 
-	if code != 0 {
-		log.Warnf("shutdown producer error, error code is: %d", code)
+	if err != NIL {
+		return err
 	}
 
-	code = int(int(C.DestroyProducer(p.cproduer)))
-	if code != 0 {
-		log.Warnf("destroy producer error, error code is: %d", code)
+	err = rmqError(int(C.DestroyProducer(p.cproduer)))
+	if err != NIL {
+		return err
 	}
 
-	return nil
+	return err
 }
 
-func (p *defaultProducer) SendMessageSync(msg *Message) SendResult {
+func (p *defaultProducer) SendMessageSync(msg *Message) (*SendResult, error) {
 	cmsg := goMsgToC(msg)
 	defer C.DestroyMessage(cmsg)
 
 	var sr C.struct__SendResult_
-	code := int(C.SendMessageSync(p.cproduer, cmsg, &sr))
+	err := rmqError(C.SendMessageSync(p.cproduer, cmsg, &sr))
 
-	if code != 0 {
-		log.Warnf("send message error, error code is: %d", code)
+	if err != NIL {
+		log.Warnf("send message error, error is: %s", err.Error())
+		return nil, err
 	}
 
-	result := SendResult{}
+	result := &SendResult{}
 	result.Status = SendStatus(sr.sendStatus)
 	result.MsgId = C.GoString(&sr.msgId[0])
 	result.Offset = int64(sr.offset)
-	return result
+	return result, nil
 }
 
-func (p *defaultProducer) SendMessageOrderly(msg *Message, selector MessageQueueSelector, arg interface{}, autoRetryTimes int) SendResult {
+func (p *defaultProducer) SendMessageOrderly(msg *Message, selector MessageQueueSelector, arg interface{}, autoRetryTimes int) (*SendResult, error) {
 	cmsg := goMsgToC(msg)
+	defer C.DestroyMessage(cmsg)
 	key := selectors.put(&messageQueueSelectorWrapper{selector: selector, m: msg, arg: arg})
 
 	var sr C.struct__SendResult_
-	C.SendMessageOrderly(
+	err := rmqError(C.SendMessageOrderly(
 		p.cproduer,
 		cmsg,
 		(C.QueueSelectorCallback)(unsafe.Pointer(C.queueSelectorCallback_cgo)),
 		unsafe.Pointer(&key),
 		C.int(autoRetryTimes),
-		&sr,
-	)
-	C.DestroyMessage(cmsg)
-
-	return SendResult{
+		&sr))
+	
+	if err != NIL {
+		log.Warnf("send message orderly error, error is: %s", err.Error())
+		return nil, err
+	}
+	
+	return &SendResult{
 		Status: SendStatus(sr.sendStatus),
 		MsgId:  C.GoString(&sr.msgId[0]),
 		Offset: int64(sr.offset),
-	}
+	}, nil
 }
 
-func (p *defaultProducer) SendMessageOneway(msg *Message) {
+func (p *defaultProducer) SendMessageOneway(msg *Message) error {
 	cmsg := goMsgToC(msg)
 	defer C.DestroyMessage(cmsg)
 
-	code := int(C.SendMessageOneway(p.cproduer, cmsg))
-	if code != 0 {
-		log.Warnf("send message with oneway error, error code is: %d", code)
-	} else {
-		log.Debugf("Send Message: %s with oneway success.", msg.String())
+	err := rmqError(C.SendMessageOneway(p.cproduer, cmsg))
+	if err != NIL {
+		log.Warnf("send message with oneway error, error is: %s", err.Error())
+		return err
 	}
+	
+	log.Debugf("Send Message: %s with oneway success.", msg.String())
+	return nil
 }
diff --git a/core/pull_consumer.go b/core/pull_consumer.go
index fdf8d76..74e4a0b 100644
--- a/core/pull_consumer.go
+++ b/core/pull_consumer.go
@@ -63,19 +63,14 @@ func (ps PullStatus) String() string {
 	}
 }
 
-// PullConsumerConfig the configuration for the pull consumer
-type PullConsumerConfig struct {
-	ClientConfig
-}
-
-// DefaultPullConsumer default consumer pulling the message
-type DefaultPullConsumer struct {
+// defaultPullConsumer default consumer pulling the message
+type defaultPullConsumer struct {
 	PullConsumerConfig
 	cconsumer *C.struct_CPullConsumer
 	funcsMap  sync.Map
 }
 
-func (c *DefaultPullConsumer) String() string {
+func (c *defaultPullConsumer) String() string {
 	topics := ""
 	c.funcsMap.Range(func(key, value interface{}) bool {
 		topics += key.(string) + ", "
@@ -84,75 +79,105 @@ func (c *DefaultPullConsumer) String() string {
 	return fmt.Sprintf("[%+v, subcribed topics: [%s]]", c.PullConsumerConfig, topics)
 }
 
-// NewPullConsumer creates one pull consumer
-func NewPullConsumer(conf *PullConsumerConfig) (*DefaultPullConsumer, error) {
-	if conf == nil {
+// NewPullConsumer creates a pull consumer
+func NewPullConsumer(config *PullConsumerConfig) (PullConsumer, error) {
+	if config == nil {
 		return nil, errors.New("config is nil")
 	}
+	if config.GroupID == "" {
+		return nil, errors.New("GroupId is empty")
+	}
 
-	cs := C.CString(conf.GroupID)
-	cconsumer := C.CreatePullConsumer(cs)
-	C.free(unsafe.Pointer(cs))
+	if config.NameServer == "" && config.NameServerDomain == "" {
+		return nil, errors.New("NameServer and NameServerDomain is empty")
+	}
 
-	cs = C.CString(conf.NameServer)
-	C.SetPullConsumerNameServerAddress(cconsumer, cs)
+	cs := C.CString(config.GroupID)
+	cconsumer := C.CreatePullConsumer(cs)
 	C.free(unsafe.Pointer(cs))
+	if cconsumer == nil {
+		return nil, errors.New("create PullConsumer failed")
+	}
 
-	log := conf.LogC
-	if log != nil {
-		cs = C.CString(log.Path)
-		if C.SetPullConsumerLogPath(cconsumer, cs) != 0 {
-			return nil, errors.New("new pull consumer error:set log path failed")
-		}
+	var err rmqError
+	if config.NameServer != "" {
+		cs = C.CString(config.NameServer)
+		err = rmqError(C.SetPullConsumerNameServerAddress(cconsumer, cs))
 		C.free(unsafe.Pointer(cs))
-
-		if C.SetPullConsumerLogFileNumAndSize(cconsumer, C.int(log.FileNum), C.long(log.FileSize)) != 0 {
-			return nil, errors.New("new pull consumer error:set log file num and size failed")
-		}
-		if C.SetPullConsumerLogLevel(cconsumer, C.CLogLevel(log.Level)) != 0 {
-			return nil, errors.New("new pull consumer error:set log level failed")
+		if err != NIL {
+			return nil, err
 		}
 	}
 
-	if conf.Credentials != nil {
-		ak := C.CString(conf.Credentials.AccessKey)
-		sk := C.CString(conf.Credentials.SecretKey)
-		ch := C.CString(conf.Credentials.Channel)
-		C.SetPullConsumerSessionCredentials(cconsumer, ak, sk, ch)
+	if config.NameServerDomain != "" {
+		cs = C.CString(config.NameServerDomain)
+		err = rmqError(C.SetPullConsumerNameServerDomain(cconsumer, cs))
+		C.free(unsafe.Pointer(cs))
+		if err != NIL {
+			return nil, err
+		}
+	}
 
+	if config.Credentials != nil {
+		ak := C.CString(config.Credentials.AccessKey)
+		sk := C.CString(config.Credentials.SecretKey)
+		ch := C.CString(config.Credentials.Channel)
+		err = rmqError(C.SetPullConsumerSessionCredentials(cconsumer, ak, sk, ch))
 		C.free(unsafe.Pointer(ak))
 		C.free(unsafe.Pointer(sk))
 		C.free(unsafe.Pointer(ch))
+		if err != NIL {
+			return nil, err
+		}
+	}
+
+	if config.LogC != nil {
+		cs = C.CString(config.LogC.Path)
+		err = rmqError(C.SetPullConsumerLogPath(cconsumer, cs))
+		C.free(unsafe.Pointer(cs))
+		if err != NIL {
+			return nil, err
+		}
+
+		err = rmqError(C.SetPullConsumerLogFileNumAndSize(cconsumer, C.int(config.LogC.FileNum), C.long(config.LogC.FileSize)))
+		if err != NIL {
+			return nil, err
+		}
+
+		err = rmqError(C.SetPullConsumerLogLevel(cconsumer, C.CLogLevel(config.LogC.Level)))
+		if err != NIL {
+			return nil, err
+		}
 	}
 
-	return &DefaultPullConsumer{PullConsumerConfig: *conf, cconsumer: cconsumer}, nil
+	return &defaultPullConsumer{PullConsumerConfig: *config, cconsumer: cconsumer}, nil
 }
 
-// Start starts the pulling conumser
-func (c *DefaultPullConsumer) Start() error {
-	r := C.StartPullConsumer(c.cconsumer)
-	if r != 0 {
-		return fmt.Errorf("start failed, code:%d", int(r))
+// Start starts the pulling consumer
+func (c *defaultPullConsumer) Start() error {
+	err := rmqError(C.StartPullConsumer(c.cconsumer))
+	if err != NIL {
+		return err
 	}
 	return nil
 }
 
 // Shutdown shutdown the pulling consumer
-func (c *DefaultPullConsumer) Shutdown() error {
-	r := C.ShutdownPullConsumer(c.cconsumer)
-	if r != 0 {
-		return fmt.Errorf("shutdown failed, code:%d", int(r))
+func (c *defaultPullConsumer) Shutdown() error {
+	err := rmqError(C.ShutdownPullConsumer(c.cconsumer))
+	if err != NIL {
+		return err
 	}
 
-	r = C.DestroyPullConsumer(c.cconsumer)
-	if r != 0 {
-		return fmt.Errorf("destory failed, code:%d", int(r))
+	err = rmqError(C.DestroyPullConsumer(c.cconsumer))
+	if err != NIL {
+		return err
 	}
 	return nil
 }
 
 // FetchSubscriptionMessageQueues returns the topic's consume queue
-func (c *DefaultPullConsumer) FetchSubscriptionMessageQueues(topic string) []MessageQueue {
+func (c *defaultPullConsumer) FetchSubscriptionMessageQueues(topic string) []MessageQueue {
 	var (
 		q    *C.struct__CMessageQueue_
 		size C.int
@@ -191,7 +216,7 @@ func (pr *PullResult) String() string {
 }
 
 // Pull pulling the message from the specified message queue
-func (c *DefaultPullConsumer) Pull(mq MessageQueue, subExpression string, offset int64, maxNums int) PullResult {
+func (c *defaultPullConsumer) Pull(mq MessageQueue, subExpression string, offset int64, maxNums int) PullResult {
 	cmq := C.struct__CMessageQueue_{
 		queueId: C.int(mq.ID),
 	}
diff --git a/core/push_consumer.go b/core/push_consumer.go
index c39dc09..e7ee1d7 100644
--- a/core/push_consumer.go
+++ b/core/push_consumer.go
@@ -92,32 +92,31 @@ func newPushConsumer(config *PushConsumerConfig) (PushConsumer, error) {
 		return nil, errors.New("Create PushConsumer failed")
 	}
 
-	var code int
+	var err rmqError
 	if config.NameServer != "" {
 		cs = C.CString(config.NameServer)
-		code = int(C.SetPushConsumerNameServerAddress(cconsumer, cs))
+		err = rmqError(C.SetPushConsumerNameServerAddress(cconsumer, cs))
 		C.free(unsafe.Pointer(cs))
-		if code != 0 {
-			return nil, fmt.Errorf("PushConsumer Set NameServerAddress error, code is: %d", code)
+		if err != NIL {
+			return nil, err
 		}
 	}
 
 	if config.NameServerDomain != "" {
 		cs = C.CString(config.NameServerDomain)
-		code = int(C.SetPushConsumerNameServerDomain(cconsumer, cs))
+		err = rmqError(C.SetPushConsumerNameServerDomain(cconsumer, cs))
 		C.free(unsafe.Pointer(cs))
-		if code != 0 {
-			return nil, fmt.Errorf("PushConsumer Set NameServerDomain error, code is: %d", code)
+		if err != NIL {
+			return nil, err
 		}
 	}
 
 	if config.InstanceName != "" {
 		cs = C.CString(config.InstanceName)
-		code = int(C.SetPushConsumerInstanceName(cconsumer, cs))
+		err = rmqError(C.SetPushConsumerInstanceName(cconsumer, cs))
 		C.free(unsafe.Pointer(cs))
-		if code != 0 {
-			return nil, fmt.Errorf("PushConsumer Set InstanceName error, code is: %d, "+
-				"please check cpp logs for details", code)
+		if err != NIL {
+			return nil, err
 		}
 	}
 
@@ -125,45 +124,45 @@ func newPushConsumer(config *PushConsumerConfig) (PushConsumer, error) {
 		ak := C.CString(config.Credentials.AccessKey)
 		sk := C.CString(config.Credentials.SecretKey)
 		ch := C.CString(config.Credentials.Channel)
-		code = int(C.SetPushConsumerSessionCredentials(cconsumer, ak, sk, ch))
+		err = rmqError(C.SetPushConsumerSessionCredentials(cconsumer, ak, sk, ch))
 		C.free(unsafe.Pointer(ak))
 		C.free(unsafe.Pointer(sk))
 		C.free(unsafe.Pointer(ch))
-		if code != 0 {
-			return nil, fmt.Errorf("PushConsumer Set Credentials error, code is: %d", int(code))
+		if err != NIL {
+			return nil, err
 		}
 	}
 
 	if config.LogC != nil {
 		cs = C.CString(config.LogC.Path)
-		code = int(C.SetPushConsumerLogPath(cconsumer, cs))
+		err = rmqError(C.SetPushConsumerLogPath(cconsumer, cs))
 		C.free(unsafe.Pointer(cs))
-		if code != 0 {
-			return nil, fmt.Errorf("PushConsumer Set LogPath error, code is: %d", code)
+		if err != NIL {
+			return nil, err
 		}
 
-		code = int(C.SetPushConsumerLogFileNumAndSize(cconsumer, C.int(config.LogC.FileNum), C.long(config.LogC.FileSize)))
-		if code != 0 {
-			return nil, fmt.Errorf("PushConsumer Set FileNumAndSize error, code is: %d", code)
+		err = rmqError(C.SetPushConsumerLogFileNumAndSize(cconsumer, C.int(config.LogC.FileNum), C.long(config.LogC.FileSize)))
+		if err != NIL {
+			return nil, err
 		}
 
-		code = int(C.SetPushConsumerLogLevel(cconsumer, C.CLogLevel(config.LogC.Level)))
-		if code != 0 {
-			return nil, fmt.Errorf("PushConsumer Set LogLevel error, code is: %d", code)
+		err = rmqError(C.SetPushConsumerLogLevel(cconsumer, C.CLogLevel(config.LogC.Level)))
+		if err != NIL {
+			return nil, err
 		}
 	}
 
 	if config.ThreadCount > 0 {
-		code = int(C.SetPushConsumerThreadCount(cconsumer, C.int(config.ThreadCount)))
-		if code != 0 {
-			return nil, fmt.Errorf("PushConsumer Set ThreadCount error, code is: %d", int(code))
+		err = rmqError(C.SetPushConsumerThreadCount(cconsumer, C.int(config.ThreadCount)))
+		if err != NIL {
+			return nil, err
 		}
 	}
 
 	if config.MessageBatchMaxSize > 0 {
-		code = int(C.SetPushConsumerMessageBatchMaxSize(cconsumer, C.int(config.MessageBatchMaxSize)))
-		if code != 0 {
-			return nil, fmt.Errorf("PushConsumer Set MessageBatchMaxSize error, code is: %d", int(code))
+		err = rmqError(C.SetPushConsumerMessageBatchMaxSize(cconsumer, C.int(config.MessageBatchMaxSize)))
+		if err != NIL {
+			return nil, err
 		}
 	}
 
@@ -175,18 +174,18 @@ func newPushConsumer(config *PushConsumerConfig) (PushConsumer, error) {
 		case Clustering:
 			mode = C.CLUSTERING
 		}
-		code = int(C.SetPushConsumerMessageModel(cconsumer, mode))
+		err = rmqError(C.SetPushConsumerMessageModel(cconsumer, mode))
 
-		if code != 0 {
-			return nil, fmt.Errorf("PushConsumer Set ConsumerMessageModel error, code is: %d", int(code))
+		if err != NIL {
+			return nil, err
 		}
 
 	}
 
-	code = int(C.RegisterMessageCallback(cconsumer, (C.MessageCallBack)(unsafe.Pointer(C.callback_cgo))))
+	err = rmqError(C.RegisterMessageCallback(cconsumer, (C.MessageCallBack)(unsafe.Pointer(C.callback_cgo))))
 
-	if code != 0 {
-		return nil, fmt.Errorf("PushConsumer RegisterMessageCallback error, code is: %d", int(code))
+	if err != NIL {
+		return nil, err
 	}
 
 	consumer.cconsumer = cconsumer
@@ -195,23 +194,23 @@ func newPushConsumer(config *PushConsumerConfig) (PushConsumer, error) {
 }
 
 func (c *defaultPushConsumer) Start() error {
-	code := C.StartPushConsumer(c.cconsumer)
-	if code != 0 {
-		return fmt.Errorf("start PushConsumer error, code is: %d", int(code))
+	err := rmqError(C.StartPushConsumer(c.cconsumer))
+	if err != NIL {
+		return err
 	}
 	return nil
 }
 
 func (c *defaultPushConsumer) Shutdown() error {
-	code := C.ShutdownPushConsumer(c.cconsumer)
+	err := rmqError(C.ShutdownPushConsumer(c.cconsumer))
 
-	if code != 0 {
-		log.Warnf("Shutdown PushConsumer error, code is: %d, please check cpp logs for details", code)
+	if err != NIL {
+		return err
 	}
 
-	C.DestroyPushConsumer(c.cconsumer)
-	if code != 0 {
-		log.Warnf("Destroy PushConsumer error, code is: %d, please check cpp logs for details", code)
+	err = rmqError(C.DestroyPushConsumer(c.cconsumer))
+	if err != NIL {
+		return err
 	}
 	return nil
 }
@@ -221,9 +220,9 @@ func (c *defaultPushConsumer) Subscribe(topic, expression string, consumeFunc fu
 	if exist {
 		return nil
 	}
-	code := C.Subscribe(c.cconsumer, C.CString(topic), C.CString(expression))
-	if code != 0 {
-		return fmt.Errorf("subscribe topic: %s failed, error code is: %d", topic, int(code))
+	err := rmqError(C.Subscribe(c.cconsumer, C.CString(topic), C.CString(expression)))
+	if err != NIL {
+		return err
 	}
 	c.funcsMap.Store(topic, consumeFunc)
 	log.Infof("subscribe topic[%s] with expression[%s] successfully.", topic, expression)
diff --git a/examples/main.go b/examples/main.go
new file mode 100644
index 0000000..72a2a68
--- /dev/null
+++ b/examples/main.go
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package main
+
+import (
+	"github.com/apache/rocketmq-client-go/core"
+	"gopkg.in/alecthomas/kingpin.v2"
+	"os"
+)
+
+var (
+	rmq     = kingpin.New("rocketmq", "RocketMQ cmd tools")
+	namesrv = rmq.Flag("namesrv", "NameServer address.").Default("localhost:9876").Short('n').String()
+	topic   = rmq.Flag("topic", "topic name.").Short('t').Required().String()
+	gid     = rmq.Flag("groupId", "group Id").Short('g').Default("testGroup").String()
+	amount  = rmq.Flag("amount", "how many message to produce or consume").Default("64").Short('a').Int()
+
+	produce     = rmq.Command("produce", "send messages to RocketMQ")
+	body        = produce.Flag("body", "message body").Short('b').Required().String()
+	workerCount = produce.Flag("workerCount", "works of send message with orderly").Default("1").Short('w').Int()
+	orderly     = produce.Flag("orderly", "send msg orderly").Short('o').Bool()
+
+	consume = rmq.Command("consume", "consumes message from RocketMQ")
+)
+
+func main() {
+	switch kingpin.MustParse(rmq.Parse(os.Args[1:])) {
+	case produce.FullCommand():
+		pConfig := &rocketmq.ProducerConfig{ClientConfig: rocketmq.ClientConfig{
+			GroupID:    *gid,
+			NameServer: *namesrv,
+			LogC: &rocketmq.LogConfig{
+				Path:     "example",
+				FileSize: 64 * 1 << 10,
+				FileNum:  1,
+				Level:    rocketmq.LogLevelDebug,
+			},
+		}}
+		if *orderly {
+			sendMessageOrderly(pConfig)
+		} else {
+			sendMessage(pConfig)
+		}
+	case consume.FullCommand():
+		cConfig := &rocketmq.PushConsumerConfig{ClientConfig: rocketmq.ClientConfig{
+			GroupID:    *gid,
+			NameServer: *namesrv,
+			LogC: &rocketmq.LogConfig{
+				Path:     "example",
+				FileSize: 64 * 1 << 10,
+				FileNum:  1,
+				Level:    rocketmq.LogLevelInfo,
+			},
+		}}
+
+		ConsumeWithPush(cConfig)
+	}
+}
diff --git a/examples/producer.go b/examples/producer.go
index 33eab07..01e8105 100644
--- a/examples/producer.go
+++ b/examples/producer.go
@@ -22,24 +22,29 @@ import (
 	"github.com/apache/rocketmq-client-go/core"
 )
 
-func main() {
-	cfg := &rocketmq.ProducerConfig{}
-	cfg.GroupID = "testGroup"
-	cfg.NameServer = "localhost:9876"
-	producer, err := rocketmq.NewProducer(cfg)
+func sendMessage(config *rocketmq.ProducerConfig) {
+	producer, err := rocketmq.NewProducer(config)
+
 	if err != nil {
 		fmt.Println("create Producer failed, error:", err)
 		return
 	}
 
-	producer.Start()
+	err = producer.Start()
+	if err != nil {
+		fmt.Println("start producer error", err)
+		return
+	}
 	defer producer.Shutdown()
 
 	fmt.Printf("Producer: %s started... \n", producer)
-	for i := 0; i < 100; i++ {
-		msg := fmt.Sprintf("Hello RocketMQ-%d", i)
-		result := producer.SendMessageSync(&rocketmq.Message{Topic: "test", Body: msg})
-		fmt.Println(fmt.Sprintf("send message: %s result: %s", msg, result))
+	for i := 0; i < *amount; i++ {
+		msg := fmt.Sprintf("%s-%d", *body, i)
+		result, err := producer.SendMessageSync(&rocketmq.Message{Topic: "test", Body: msg})
+		if err != nil {
+			fmt.Println("Error:", err)
+		}
+		fmt.Printf("send message: %s result: %s\n", msg, result)
 	}
 	fmt.Println("shutdown producer.")
 }
diff --git a/examples/orderproducer/producer.go b/examples/producer_orderly.go
similarity index 50%
rename from examples/orderproducer/producer.go
rename to examples/producer_orderly.go
index f3d70c7..9943f5b 100644
--- a/examples/orderproducer/producer.go
+++ b/examples/producer_orderly.go
@@ -17,12 +17,11 @@
 package main
 
 import (
-	"flag"
 	"fmt"
 	"sync"
 	"sync/atomic"
 
-	rocketmq "github.com/apache/rocketmq-client-go/core"
+	"github.com/apache/rocketmq-client-go/core"
 )
 
 type queueSelectorByOrderID struct{}
@@ -31,79 +30,26 @@ func (s queueSelectorByOrderID) Select(size int, m *rocketmq.Message, arg interf
 	return arg.(int) % size
 }
 
-var (
-	namesrvAddrs string
-	topic        string
-	body         string
-	groupID      string
-	msgCount     int64
-	workerCount  int
-)
-
-func init() {
-	flag.StringVar(&namesrvAddrs, "n", "", "name server address")
-	flag.StringVar(&topic, "t", "", "topic")
-	flag.StringVar(&groupID, "g", "", "group")
-	flag.StringVar(&body, "d", "", "body")
-	flag.Int64Var(&msgCount, "m", 0, "message count")
-	flag.IntVar(&workerCount, "w", 0, "worker count")
-}
-
 type worker struct {
 	p            rocketmq.Producer
-	leftMsgCount *int64
+	leftMsgCount int64
 }
 
 func (w *worker) run() {
 	selector := queueSelectorByOrderID{}
-	for atomic.AddInt64(w.leftMsgCount, -1) >= 0 {
-		r := w.p.SendMessageOrderly(
-			&rocketmq.Message{Topic: topic, Body: body}, selector, 7 /*orderID*/, 3,
+	for atomic.AddInt64(&w.leftMsgCount, -1) >= 0 {
+		r, err := w.p.SendMessageOrderly(
+			&rocketmq.Message{Topic: *topic, Body: *body}, selector, 7 /*orderID*/, 3,
 		)
-		fmt.Printf("send result:%+v\n", r)
+		if err != nil {
+			println("Send Orderly Error:", err)
+		}
+		fmt.Printf("send orderly result:%+v\n", r)
 	}
 }
 
-// example:
-// ./producer -n "localhost:9876" -t local_test -g local_test -d data -m 100 -w 10
-func main() {
-	flag.Parse()
-
-	if namesrvAddrs == "" {
-		println("empty namesrv address")
-		return
-	}
-
-	if topic == "" {
-		println("empty topic")
-		return
-	}
-
-	if body == "" {
-		println("empty body")
-		return
-	}
-
-	if groupID == "" {
-		println("empty groupID")
-		return
-	}
-
-	if msgCount == 0 {
-		println("zero message count")
-		return
-	}
-
-	if workerCount == 0 {
-		println("zero worker count")
-		return
-	}
-
-	cfg := &rocketmq.ProducerConfig{}
-	cfg.GroupID = groupID
-	cfg.NameServer = namesrvAddrs
-
-	producer, err := rocketmq.NewProducer(cfg)
+func sendMessageOrderly(config *rocketmq.ProducerConfig) {
+	producer, err := rocketmq.NewProducer(config)
 	if err != nil {
 		fmt.Println("create Producer failed, error:", err)
 		return
@@ -113,12 +59,12 @@ func main() {
 	defer producer.Shutdown()
 
 	wg := sync.WaitGroup{}
-	wg.Add(workerCount)
+	wg.Add(*workerCount)
 
-	workers := make([]worker, workerCount)
+	workers := make([]worker, *workerCount)
 	for i := range workers {
 		workers[i].p = producer
-		workers[i].leftMsgCount = &msgCount
+		workers[i].leftMsgCount = (int64)(*amount)
 	}
 
 	for i := range workers {
diff --git a/examples/pullconsumer/consumer.go b/examples/pull_consumer.go
similarity index 69%
rename from examples/pullconsumer/consumer.go
rename to examples/pull_consumer.go
index d63729e..1b209c0 100644
--- a/examples/pullconsumer/consumer.go
+++ b/examples/pull_consumer.go
@@ -18,55 +18,15 @@
 package main
 
 import (
-	"flag"
 	"fmt"
 	"time"
 
-	rocketmq "github.com/apache/rocketmq-client-go/core"
+	"github.com/apache/rocketmq-client-go/core"
 )
 
-var (
-	namesrvAddrs string
-	topic        string
-	body         string
-	groupID      string
-	msgCount     int64
-	workerCount  int
-)
-
-func init() {
-	flag.StringVar(&namesrvAddrs, "n", "", "name server address")
-	flag.StringVar(&topic, "t", "", "topic")
-	flag.StringVar(&groupID, "g", "", "group")
-}
-
-// ./consumer -n "localhost:9876" -t test -g local_test
-func main() {
-	flag.Parse()
-
-	if namesrvAddrs == "" {
-		fmt.Println("empty namesrv")
-		return
-	}
-
-	if topic == "" {
-		fmt.Println("empty topic")
-		return
-	}
-
-	if groupID == "" {
-		fmt.Println("empty groupID")
-		return
-	}
-
-	cfg := &rocketmq.PullConsumerConfig{}
-	cfg.GroupID = groupID
-	cfg.NameServer = namesrvAddrs
-	cfg.LogC = &rocketmq.LogConfig{
-		Path: "example",
-	}
+func ConsumeWithPull(config *rocketmq.PullConsumerConfig, topic string) {
 
-	consumer, err := rocketmq.NewPullConsumer(cfg)
+	consumer, err := rocketmq.NewPullConsumer(config)
 	if err != nil {
 		fmt.Printf("new pull consumer error:%s\n", err)
 		return
diff --git a/examples/push_consumer.go b/examples/push_consumer.go
index d17559c..723e32f 100644
--- a/examples/push_consumer.go
+++ b/examples/push_consumer.go
@@ -20,31 +20,40 @@ package main
 import (
 	"fmt"
 	"github.com/apache/rocketmq-client-go/core"
-	"time"
+	"sync/atomic"
 )
 
-func main() {
-	fmt.Println("Start Receiving Messages...")
-	cfg := &rocketmq.PushConsumerConfig{
-		ThreadCount:         2,
-		MessageBatchMaxSize: 16,
-	}
-	cfg.GroupID = "testGroupId"
-	cfg.NameServer = "localhost:9876"
-	consumer, err := rocketmq.NewPushConsumer(cfg)
+func ConsumeWithPush(config *rocketmq.PushConsumerConfig) {
+
+	consumer, err := rocketmq.NewPushConsumer(config)
 	if err != nil {
-		fmt.Println("create Consumer failed, error:", err)
+		println("create Consumer failed, error:", err)
 		return
 	}
 
+	ch := make(chan interface{})
+	var count = (int64)(*amount)
 	// MUST subscribe topic before consumer started.
 	consumer.Subscribe("test", "*", func(msg *rocketmq.MessageExt) rocketmq.ConsumeStatus {
 		fmt.Printf("A message received: \"%s\" \n", msg.Body)
+		if atomic.AddInt64(&count, -1) <= 0 {
+			ch <- "quit"
+		}
 		return rocketmq.ConsumeSuccess
 	})
 
-	consumer.Start()
-	defer consumer.Shutdown()
+	err = consumer.Start()
+	if err != nil {
+		println("consumer start failed,", err)
+		return
+	}
+
 	fmt.Printf("consumer: %s started...\n", consumer)
-	time.Sleep(10 * time.Minute)
+	<-ch
+	err = consumer.Shutdown()
+	if err != nil {
+		println("consumer shutdown failed")
+		return
+	}
+	println("consumer has shutdown.")
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services