You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2018/12/17 07:53:54 UTC

[GitHub] vongosling closed pull request #12: More Configurable fields and mores

vongosling closed pull request #12: More Configurable fields and mores
URL: https://github.com/apache/rocketmq-client-go/pull/12
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/cmd/producer.go b/cmd/producer.go
new file mode 100644
index 0000000..b67f50d
--- /dev/null
+++ b/cmd/producer.go
@@ -0,0 +1,58 @@
+package main
+
+import (
+	"flag"
+	"fmt"
+	"github.com/apache/rocketmq-client-go/core"
+)
+
+var (
+	namesrvAddrs string
+	topic        string
+	body         string
+	groupID      string
+	keys         string
+)
+
+func init() {
+	flag.StringVar(&namesrvAddrs, "addr", "", "name server address")
+	flag.StringVar(&topic, "t", "", "topic name")
+	flag.StringVar(&groupID, "group", "", "producer group")
+	flag.StringVar(&body, "body", "", "message body")
+	flag.StringVar(&keys, "keys", "", "message keys")
+
+}
+
+func main() {
+	flag.Parse()
+	if namesrvAddrs == "" {
+		println("empty nameServer address")
+		return
+	}
+
+	if topic == "" {
+		println("empty topic")
+		return
+	}
+
+	if body == "" {
+		println("empty body")
+		return
+	}
+
+	if groupID == "" {
+		println("empty groupID")
+		return
+	}
+
+	cfg := &rocketmq.ProducerConfig{}
+	cfg.GroupID = groupID
+	cfg.NameServer = namesrvAddrs
+
+	producer, _ := rocketmq.NewProducer(cfg)
+	producer.Start()
+	defer producer.Shutdown()
+
+	result := producer.SendMessageSync(&rocketmq.Message{Topic: topic, Body: body, Keys: keys})
+	println(fmt.Sprintf("send message result: %s", result))
+}
diff --git a/core/api.go b/core/api.go
index bf38a78..8ad6af0 100644
--- a/core/api.go
+++ b/core/api.go
@@ -22,21 +22,35 @@ func Version() (version string) {
 	return GetVersion()
 }
 
+type clientConfig struct {
+	GroupID          string
+	NameServer       string
+	NameServerDomain string
+	GroupName        string
+	InstanceName     string
+	Credentials      *SessionCredentials
+	LogC             *LogConfig
+}
+
 // NewProducer create a new producer with config
-func NewProducer(config *ProducerConfig) Producer {
+func NewProducer(config *ProducerConfig) (Producer, error) {
 	return newDefaultProducer(config)
 }
 
 // ProducerConfig define a producer
 type ProducerConfig struct {
-	GroupID     string
-	NameServer  string
-	Credentials *SessionCredentials
+	clientConfig
+	SendMsgTimeout int
+	CompressLevel  int
+	MaxMessageSize int
 }
 
 func (config *ProducerConfig) String() string {
 	// For security, don't print Credentials default.
-	return fmt.Sprintf("[groupId: %s, nameServer: %s]", config.NameServer, config.GroupID)
+	return fmt.Sprintf("[GroupID: %s, NameServer: %s, NameServerDomain: %s, InstanceName: %s, NameServer: %s, "+
+		"SendMsgTimeout: %d, CompressLevel: %d, MaxMessageSize: %d, ]", config.NameServer, config.GroupID,
+		config.NameServerDomain, config.GroupName, config.InstanceName, config.SendMsgTimeout, config.CompressLevel,
+		config.MaxMessageSize)
 }
 
 type Producer interface {
@@ -52,27 +66,27 @@ type Producer interface {
 }
 
 // NewPushConsumer create a new consumer with config.
-func NewPushConsumer(config *ConsumerConfig) (PushConsumer, error) {
+func NewPushConsumer(config *PushConsumerConfig) (PushConsumer, error) {
 	return newPushConsumer(config)
 }
 
-// ConsumerConfig define a new consumer.
-type ConsumerConfig struct {
-	GroupID             string
-	NameServer          string
-	ConsumerThreadCount int
+// PushConsumerConfig define a new consumer.
+type PushConsumerConfig struct {
+	clientConfig
+	ThreadCount         int
 	MessageBatchMaxSize int
-	//ConsumerInstanceName int
-	Credentials *SessionCredentials
+	Model               MessageModel
 }
 
-func (config *ConsumerConfig) String() string {
-	return fmt.Sprintf("[groupId: %s, nameServer: %s, consumerThreadCount: %d, messageBatchMaxSize: %d]",
-		config.GroupID, config.NameServer, config.ConsumerThreadCount, config.MessageBatchMaxSize)
+func (config *PushConsumerConfig) String() string {
+	return fmt.Sprintf("[GroupID: %s, NameServer: %s, NameServerDomain: %s, InstanceName: %s, "+
+		"ThreadCount: %d, MessageBatchMaxSize: %d, Model: %v ]", config.NameServer, config.GroupID,
+		config.NameServerDomain, config.InstanceName, config.ThreadCount, config.MessageBatchMaxSize, config.Model)
 }
 
 type PushConsumer interface {
 	baseAPI
+
 	// Subscribe a new topic with specify filter expression and consume function.
 	Subscribe(topic, expression string, consumeFunc func(msg *MessageExt) ConsumeStatus) error
 }
@@ -80,8 +94,10 @@ type PushConsumer interface {
 // PullConsumer consumer pulling the message
 type PullConsumer interface {
 	baseAPI
+
 	// Pull returns the messages from the consume queue by specify the offset and the max number
 	Pull(mq MessageQueue, subExpression string, offset int64, maxNums int) PullResult
+
 	// FetchSubscriptionMessageQueues returns the consume queue of the topic
 	FetchSubscriptionMessageQueues(topic string) []MessageQueue
 }
@@ -105,7 +121,6 @@ type SendResult struct {
 
 func (result SendResult) String() string {
 	return fmt.Sprintf("[status: %s, messageId: %s, offset: %d]", result.Status, result.MsgId, result.Offset)
-
 }
 
 type baseAPI interface {
diff --git a/core/cfuns.go b/core/cfuns.go
index 7fe4ffe..5b4cba9 100644
--- a/core/cfuns.go
+++ b/core/cfuns.go
@@ -36,6 +36,7 @@ func consumeMessageCallback(cconsumer *C.CPushConsumer, msg *C.CMessageExt) C.in
 	}
 
 	msgExt := cmsgExtToGo(msg)
+	//C.DestroyMessageExt(msg)
 	cfunc, exist := consumer.(*defaultPushConsumer).funcsMap.Load(msgExt.Topic)
 	if !exist {
 		return C.int(ReConsumeLater)
diff --git a/core/log.go b/core/log.go
index e081e4c..5ff244e 100644
--- a/core/log.go
+++ b/core/log.go
@@ -37,6 +37,27 @@ const (
 	LogLevelNum   = LogLevel(C.E_LOG_LEVEL_LEVEL_NUM)
 )
 
+func (l LogLevel) String() string {
+	switch l {
+	case LogLevelFatal:
+		return "Fatal"
+	case LogLevelError:
+		return "Error"
+	case LogLevelWarn:
+		return "Warn"
+	case LogLevelInfo:
+		return "Info"
+	case LogLevelDebug:
+		return "Debug"
+	case LogLevelTrace:
+		return "Trace"
+	case LogLevelNum:
+		return "Num"
+	default:
+		return "Unkonw"
+	}
+}
+
 // LogConfig the log configuration for the pull consumer
 type LogConfig struct {
 	Path     string
diff --git a/core/log_test.go b/core/log_test.go
new file mode 100644
index 0000000..8c4a449
--- /dev/null
+++ b/core/log_test.go
@@ -0,0 +1,22 @@
+package rocketmq
+
+import (
+	"github.com/stretchr/testify/assert"
+	"testing"
+)
+
+func TestLogConfig_String(t *testing.T) {
+	logc := LogConfig{Path: "/log/path1", FileNum: 3, FileSize: 1 << 20, Level: LogLevelDebug}
+	assert.Equal(t, "{Path:/log/path1 FileNum:3 FileSize:1048576 Level:Debug}", logc.String())
+	logc.Level = LogLevelFatal
+	assert.Equal(t, "{Path:/log/path1 FileNum:3 FileSize:1048576 Level:Fatal}", logc.String())
+	logc.Level = LogLevelError
+	assert.Equal(t, "{Path:/log/path1 FileNum:3 FileSize:1048576 Level:Error}", logc.String())
+	logc.Level = LogLevelWarn
+	assert.Equal(t, "{Path:/log/path1 FileNum:3 FileSize:1048576 Level:Warn}", logc.String())
+	logc.Level = LogLevelInfo
+	assert.Equal(t, "{Path:/log/path1 FileNum:3 FileSize:1048576 Level:Info}", logc.String())
+	logc.Level = LogLevelTrace
+	assert.Equal(t, "{Path:/log/path1 FileNum:3 FileSize:1048576 Level:Trace}", logc.String())
+	logc.Level = LogLevelError
+}
diff --git a/core/message.go b/core/message.go
index 98dc6cb..8e75d18 100644
--- a/core/message.go
+++ b/core/message.go
@@ -16,80 +16,105 @@
  */
 package rocketmq
 
-//#cgo LDFLAGS: -L/usr/local/lib/ -lrocketmq
-//#include "rocketmq/CMessage.h"
-//#include "rocketmq/CMessageExt.h"
+/*
+#cgo LDFLAGS: -L/usr/local/lib/ -lrocketmq
+#include "rocketmq/CMessage.h"
+#include "rocketmq/CMessageExt.h"
+#include <stdlib.h>
+*/
 import "C"
-import "fmt"
+import (
+	"fmt"
+	"unsafe"
+)
 
 type Message struct {
-	Topic string
-	Keys  string
-	// improve: maybe []byte is better.
-	Body string
+	Topic          string
+	Tags           string
+	Keys           string
+	Body           string
+	DelayTimeLevel int
+	Property       map[string]string
 }
 
 func (msg *Message) String() string {
-	return fmt.Sprintf("[topic: %s, keys: %s, body: %s]", msg.Topic, msg.Keys, msg.Body)
+	return fmt.Sprintf("[Topic: %s, Tags: %s, Keys: %s, Body: %s, DelayTimeLevel: %d, Property: %v]",
+		msg.Topic, msg.Tags, msg.Keys, msg.Body, msg.DelayTimeLevel, msg.Property)
+}
+
+func goMsgToC(gomsg *Message) *C.struct_CMessage {
+	cs := C.CString(gomsg.Topic)
+	var cmsg = C.CreateMessage(cs)
+	C.free(unsafe.Pointer(cs))
+
+	cs = C.CString(gomsg.Tags)
+	C.SetMessageTags(cmsg, cs)
+	C.free(unsafe.Pointer(cs))
+
+	cs = C.CString(gomsg.Keys)
+	C.SetMessageKeys(cmsg, cs)
+	C.free(unsafe.Pointer(cs))
+
+	cs = C.CString(gomsg.Body)
+	C.SetMessageBody(cmsg, cs)
+	C.free(unsafe.Pointer(cs))
+
+	C.SetDelayTimeLevel(cmsg, C.int(gomsg.DelayTimeLevel))
+
+	for k, v := range gomsg.Property {
+		key := C.CString(k)
+		value := C.CString(v)
+		C.SetMessageProperty(cmsg, key, value)
+		C.free(unsafe.Pointer(key))
+		C.free(unsafe.Pointer(value))
+	}
+	return cmsg
 }
 
 type MessageExt struct {
 	Message
-	MessageID string
-	Tags      string
+	MessageID                 string
+	QueueId                   int
+	ReconsumeTimes            int
+	StoreSize                 int
+	BornTimestamp             int64
+	StoreTimestamp            int64
+	QueueOffset               int64
+	CommitLogOffset           int64
+	PreparedTransactionOffset int64
+
 	// improve: is there is a method convert c++ map to go variable?
 	cmsgExt *C.struct_CMessageExt
-	//Properties  string
 }
 
 func (msgExt *MessageExt) String() string {
-	return fmt.Sprintf("[messageId: %s, %s, Tags: %s]", msgExt.MessageID, msgExt.Message, msgExt.Tags)
+	return fmt.Sprintf("[MessageId: %s, %s, QueueId: %d, ReconsumeTimes: %d, StoreSize: %d, BornTimestamp: %d, "+
+		"StoreTimestamp: %d, QueueOffset: %d, CommitLogOffset: %d, PreparedTransactionOffset: %d]", msgExt.MessageID,
+		msgExt.Message.String(), msgExt.QueueId, msgExt.ReconsumeTimes, msgExt.StoreSize, msgExt.BornTimestamp,
+		msgExt.StoreTimestamp, msgExt.QueueOffset, msgExt.CommitLogOffset, msgExt.PreparedTransactionOffset)
 }
 
 func (msgExt *MessageExt) GetProperty(key string) string {
 	return C.GoString(C.GetMessageProperty(msgExt.cmsgExt, C.CString(key)))
 }
 
-func cmsgToGo(cmsg *C.struct_CMessage) *Message {
-	defer C.DestroyMessage(cmsg)
-	gomsg := &Message{}
-
-	return gomsg
-}
-
-func goMsgToC(gomsg *Message) *C.struct_CMessage {
-	var cmsg = C.CreateMessage(C.CString(gomsg.Topic))
-
-	// int(C.SetMessageKeys(msg.(*C.struct_CMessage),C.CString(keys)))
-	C.SetMessageKeys(cmsg, C.CString(gomsg.Keys))
-
-	// int(C.SetMessageBody(msg.(*C.struct_CMessage),C.CString(body)))
-	C.SetMessageBody(cmsg, C.CString(gomsg.Body))
-	return cmsg
-}
-
-//
 func cmsgExtToGo(cmsg *C.struct_CMessageExt) *MessageExt {
-	//defer C.DestroyMessageExt(cmsg)
-	gomsg := &MessageExt{}
+	gomsg := &MessageExt{cmsgExt: cmsg}
 
 	gomsg.Topic = C.GoString(C.GetMessageTopic(cmsg))
-	gomsg.Body = C.GoString(C.GetMessageBody(cmsg))
-	gomsg.Keys = C.GoString(C.GetMessageKeys(cmsg))
 	gomsg.Tags = C.GoString(C.GetMessageTags(cmsg))
+	gomsg.Keys = C.GoString(C.GetMessageKeys(cmsg))
+	gomsg.Body = C.GoString(C.GetMessageBody(cmsg))
 	gomsg.MessageID = C.GoString(C.GetMessageId(cmsg))
+	gomsg.DelayTimeLevel = int(C.GetMessageDelayTimeLevel(cmsg))
+	gomsg.QueueId = int(C.GetMessageQueueId(cmsg))
+	gomsg.ReconsumeTimes = int(C.GetMessageReconsumeTimes(cmsg))
+	gomsg.StoreSize = int(C.GetMessageStoreSize(cmsg))
+	gomsg.BornTimestamp = int64(C.GetMessageBornTimestamp(cmsg))
+	gomsg.StoreTimestamp = int64(C.GetMessageStoreTimestamp(cmsg))
+	gomsg.QueueOffset = int64(C.GetMessageQueueOffset(cmsg))
+	gomsg.CommitLogOffset = int64(C.GetMessageCommitLogOffset(cmsg))
+	gomsg.PreparedTransactionOffset = int64(C.GetMessagePreparedTransactionOffset(cmsg))
 
 	return gomsg
 }
-
-//
-//func goMsgExtToC(gomsg *MessageExt) *C.struct_CMessageExt {
-//	var cmsg = C.CreateMessage(C.CString(gomsg.Topic))
-//
-//	// int(C.SetMessageKeys(msg.(*C.struct_CMessage),C.CString(keys)))
-//	C.SetMessageKeys(cmsg, C.CString(gomsg.Keys))
-//
-//	// int(C.SetMessageBody(msg.(*C.struct_CMessage),C.CString(body)))
-//	C.SetMessageBody(cmsg, C.CString(gomsg.Body))
-//	return cmsg
-//}
diff --git a/core/message_test.go b/core/message_test.go
index 1d1f2a8..1df6ade 100644
--- a/core/message_test.go
+++ b/core/message_test.go
@@ -17,12 +17,44 @@
 package rocketmq
 
 import (
+	"github.com/stretchr/testify/assert"
 	"testing"
 )
 
-func TestGetMessageTopic(test *testing.T) {
-	//fmt.Println("-----TestGetMessageTopic Start----")
-	//msg := rocketmq.CreateMessage("testTopic")
-	//rocketmq.DestroyMessage(msg)
-	//fmt.Println("-----TestGetMessageTopic Finish----")
+func TestMessage_String(t *testing.T) {
+	msg := Message{
+		Topic:          "testTopic",
+		Tags:           "TagA, TagB",
+		Keys:           "Key1, Key2",
+		Body:           "Body1234567890",
+		DelayTimeLevel: 8}
+	expect := "[Topic: testTopic, Tags: TagA, TagB, Keys: Key1, Key2, Body: Body1234567890, DelayTimeLevel: 8," +
+		" Property: map[]]"
+	assert.Equal(t, expect, msg.String())
+}
+
+func TestMessageExt_String(t *testing.T) {
+	msg := Message{
+		Topic:          "testTopic",
+		Tags:           "TagA, TagB",
+		Keys:           "Key1, Key2",
+		Body:           "Body1234567890",
+		DelayTimeLevel: 8}
+	msgExt := MessageExt{
+		Message:                   msg,
+		MessageID:                 "messageId",
+		QueueId:                   2,
+		ReconsumeTimes:            13,
+		StoreSize:                 1 << 10,
+		BornTimestamp:             int64(1234567890897),
+		StoreTimestamp:            int64(1234567890),
+		QueueOffset:               int64(1234567890),
+		CommitLogOffset:           int64(1234567890),
+		PreparedTransactionOffset: int64(1234567890),
+	}
+	expect := "[MessageId: messageId, [Topic: testTopic, Tags: TagA, TagB, Keys: Key1, Key2, " +
+		"Body: Body1234567890, DelayTimeLevel: 8, Property: map[]], QueueId: 2, ReconsumeTimes: " +
+		"13, StoreSize: 1024, BornTimestamp: 1234567890897, StoreTimestamp: 1234567890, QueueOffset: 1234567890," +
+		" CommitLogOffset: 1234567890, PreparedTransactionOffset: 1234567890]"
+	assert.Equal(t, expect, msgExt.String())
 }
diff --git a/core/producer.go b/core/producer.go
index 3cccf21..77ed63b 100644
--- a/core/producer.go
+++ b/core/producer.go
@@ -20,6 +20,7 @@ package rocketmq
 #cgo LDFLAGS: -L/usr/local/lib/ -lrocketmq
 
 #include <stdio.h>
+#include <stdlib.h>
 #include "rocketmq/CMessage.h"
 #include "rocketmq/CProducer.h"
 #include "rocketmq/CSendResult.h"
@@ -31,7 +32,9 @@ int queueSelectorCallback_cgo(int size, CMessage *msg, void *selectorKey) {
 */
 import "C"
 import (
+	"errors"
 	"fmt"
+	log "github.com/sirupsen/logrus"
 	"unsafe"
 )
 
@@ -59,21 +62,115 @@ func (status SendStatus) String() string {
 	}
 }
 
-func newDefaultProducer(config *ProducerConfig) *defaultProducer {
+func newDefaultProducer(config *ProducerConfig) (*defaultProducer, error) {
+	if config == nil {
+		return nil, errors.New("config is nil")
+	}
+
+	if config.GroupID == "" {
+		return nil, errors.New("GroupId is empty")
+	}
+
+	if config.NameServer == "" && config.NameServerDomain == "" {
+		return nil, errors.New("NameServer and NameServerDomain is empty")
+	}
+
 	producer := &defaultProducer{config: config}
-	producer.cproduer = C.CreateProducer(C.CString(config.GroupID))
-	code := int(C.SetProducerNameServerAddress(producer.cproduer, C.CString(producer.config.NameServer)))
+	cs := C.CString(config.GroupID)
+	cproduer := C.CreateProducer(cs)
+	C.free(unsafe.Pointer(cs))
+
+	if cproduer == nil {
+		return nil, errors.New("create Producer failed, please check cpp logs for details")
+	}
+
+	var code int
+	if config.NameServer != "" {
+		cs = C.CString(config.NameServer)
+		code = int(C.SetProducerNameServerAddress(cproduer, cs))
+		C.free(unsafe.Pointer(cs))
+		if code != 0 {
+			return nil, errors.New(fmt.Sprintf("Producer Set NameServerAddress error, code is: %d"+
+				"please check cpp logs for details", code))
+		}
+	}
+
+	if config.NameServerDomain != "" {
+		cs = C.CString(config.NameServerDomain)
+		code = int(C.SetProducerNameServerDomain(cproduer, cs))
+		C.free(unsafe.Pointer(cs))
+		if code != 0 {
+			return nil, errors.New(fmt.Sprintf("Producer Set NameServerDomain error, code is: %d"+
+				"please check cpp logs for details", code))
+		}
+	}
+
+	if config.InstanceName != "" {
+		cs = C.CString(config.InstanceName)
+		code = int(C.SetProducerInstanceName(cproduer, cs))
+		C.free(unsafe.Pointer(cs))
+		if code != 0 {
+			return nil, errors.New(fmt.Sprintf("Producer Set InstanceName error, code is: %d"+
+				"please check cpp logs for details", code))
+		}
+	}
+
 	if config.Credentials != nil {
-		ret := C.SetProducerSessionCredentials(producer.cproduer,
-			C.CString(config.Credentials.AccessKey),
-			C.CString(config.Credentials.SecretKey),
-			C.CString(config.Credentials.Channel))
-		code = int(ret)
+		ak := C.CString(config.Credentials.AccessKey)
+		sk := C.CString(config.Credentials.SecretKey)
+		ch := C.CString(config.Credentials.Channel)
+		code = int(C.SetProducerSessionCredentials(cproduer, ak, sk, ch))
+
+		C.free(unsafe.Pointer(ak))
+		C.free(unsafe.Pointer(sk))
+		C.free(unsafe.Pointer(ch))
+		if code != 0 {
+			return nil, errors.New(fmt.Sprintf("Producer Set Credentials error, code is: %d", code))
+		}
 	}
-	switch code {
 
+	if config.LogC != nil {
+		cs = C.CString(config.LogC.Path)
+		code = int(C.SetProducerLogPath(cproduer, cs))
+		C.free(unsafe.Pointer(cs))
+		if code != 0 {
+			return nil, errors.New(fmt.Sprintf("Producer Set LogPath error, code is: %d", code))
+		}
+
+		code = int(C.SetProducerLogFileNumAndSize(cproduer, C.int(config.LogC.FileNum), C.long(config.LogC.FileSize)))
+		if code != 0 {
+			return nil, errors.New(fmt.Sprintf("Producer Set FileNumAndSize error, code is: %d", code))
+		}
+
+		code = int(C.SetProducerLogLevel(cproduer, C.CLogLevel(config.LogC.Level)))
+		if code != 0 {
+			return nil, errors.New(fmt.Sprintf("Producer Set LogLevel error, code is: %d", code))
+		}
 	}
-	return producer
+
+	if config.SendMsgTimeout > 0 {
+		code = int(C.SetProducerSendMsgTimeout(cproduer, C.int(config.SendMsgTimeout)))
+		if code != 0 {
+			return nil, errors.New(fmt.Sprintf("Producer Set SendMsgTimeout error, code is: %d", code))
+		}
+	}
+
+	if config.CompressLevel > 0 {
+		code = int(C.SetProducerCompressLevel(cproduer, C.int(config.CompressLevel)))
+		if code != 0 {
+			return nil, errors.New(fmt.Sprintf("Producer Set CompressLevel error, code is: %d", code))
+		}
+	}
+
+	if config.MaxMessageSize > 0 {
+		code = int(C.SetProducerMaxMessageSize(cproduer, C.int(config.MaxMessageSize)))
+		if code != 0 {
+			return nil, errors.New(fmt.Sprintf("Producer Set MaxMessageSize error, code is: %d", code))
+		}
+	}
+
+	producer.cproduer = cproduer
+	return producer, nil
 }
 
 type defaultProducer struct {
@@ -87,19 +184,26 @@ func (p *defaultProducer) String() string {
 
 // Start the producer.
 func (p *defaultProducer) Start() error {
-	err := int(C.StartProducer(p.cproduer))
-	// TODO How to process err code.
-	fmt.Printf("producer start result: %v \n", err)
+	code := int(C.StartProducer(p.cproduer))
+	if code != 0 {
+		return errors.New(fmt.Sprintf("start producer error, error code is: %d", code))
+	}
 	return nil
 }
 
 // Shutdown the producer.
 func (p *defaultProducer) Shutdown() error {
-	defer C.DestroyProducer(p.cproduer)
-	err := C.ShutdownProducer(p.cproduer)
+	code := int(C.ShutdownProducer(p.cproduer))
+
+	if code != 0 {
+		log.Warnf("shutdown producer error, error code is: %d", code)
+	}
+
+	code = int(int(C.DestroyProducer(p.cproduer)))
+	if code != 0 {
+		log.Warnf("destroy producer error, error code is: %d", code)
+	}
 
-	// TODO How to process err code.
-	fmt.Printf("shutdown result: %v \n", err)
 	return nil
 }
 
@@ -108,7 +212,11 @@ func (p *defaultProducer) SendMessageSync(msg *Message) SendResult {
 	defer C.DestroyMessage(cmsg)
 
 	var sr C.struct__SendResult_
-	C.SendMessageSync(p.cproduer, cmsg, &sr)
+	code := int(C.SendMessageSync(p.cproduer, cmsg, &sr))
+
+	if code != 0 {
+		log.Warnf("send message error, error code is: %d", code)
+	}
 
 	result := SendResult{}
 	result.Status = SendStatus(sr.sendStatus)
diff --git a/core/pull_consumer.go b/core/pull_consumer.go
index 19a033c..6dcac43 100644
--- a/core/pull_consumer.go
+++ b/core/pull_consumer.go
@@ -65,10 +65,7 @@ func (ps PullStatus) String() string {
 
 // PullConsumerConfig the configuration for the pull consumer
 type PullConsumerConfig struct {
-	GroupID     string
-	NameServer  string
-	Credentials *SessionCredentials
-	Log         *LogConfig
+	clientConfig
 }
 
 // DefaultPullConsumer default consumer pulling the message
@@ -89,6 +86,10 @@ func (c *DefaultPullConsumer) String() string {
 
 // NewPullConsumer creates one pull consumer
 func NewPullConsumer(conf *PullConsumerConfig) (*DefaultPullConsumer, error) {
+	if conf == nil {
+		return nil, errors.New("config is nil")
+	}
+
 	cs := C.CString(conf.GroupID)
 	cconsumer := C.CreatePullConsumer(cs)
 	C.free(unsafe.Pointer(cs))
@@ -97,7 +98,7 @@ func NewPullConsumer(conf *PullConsumerConfig) (*DefaultPullConsumer, error) {
 	C.SetPullConsumerNameServerAddress(cconsumer, cs)
 	C.free(unsafe.Pointer(cs))
 
-	log := conf.Log
+	log := conf.LogC
 	if log != nil {
 		cs = C.CString(log.Path)
 		if C.SetPullConsumerLogPath(cconsumer, cs) != 0 {
@@ -131,21 +132,21 @@ func NewPullConsumer(conf *PullConsumerConfig) (*DefaultPullConsumer, error) {
 func (c *DefaultPullConsumer) Start() error {
 	r := C.StartPullConsumer(c.cconsumer)
 	if r != 0 {
-		return fmt.Errorf("start failed, code:%d", r)
+		return fmt.Errorf("start failed, code:%d", int(r))
 	}
 	return nil
 }
 
-// Shutdown shutdown the pulling conumser
+// Shutdown shutdown the pulling consumer
 func (c *DefaultPullConsumer) Shutdown() error {
 	r := C.ShutdownPullConsumer(c.cconsumer)
 	if r != 0 {
-		return fmt.Errorf("shutdown failed, code:%d", r)
+		return fmt.Errorf("shutdown failed, code:%d", int(r))
 	}
 
 	r = C.DestroyPullConsumer(c.cconsumer)
 	if r != 0 {
-		return fmt.Errorf("destory failed, code:%d", r)
+		return fmt.Errorf("destory failed, code:%d", int(r))
 	}
 	return nil
 }
diff --git a/core/push_consumer.go b/core/push_consumer.go
index e431d95..5f63b16 100644
--- a/core/push_consumer.go
+++ b/core/push_consumer.go
@@ -18,9 +18,9 @@ package rocketmq
 
 /*
 #cgo LDFLAGS: -L/usr/local/lib -lrocketmq
+#include <stdlib.h>
 #include "rocketmq/CMessageExt.h"
 #include "rocketmq/CPushConsumer.h"
-#include "stdio.h"
 
 extern int consumeMessageCallback(CPushConsumer *consumer, CMessageExt *msg);
 
@@ -32,10 +32,14 @@ import "C"
 
 import (
 	"fmt"
+	"github.com/pkg/errors"
+	"github.com/prometheus/common/log"
 	"sync"
 	"unsafe"
 )
 
+type MessageModel C.CMessageModel
+
 type ConsumeStatus int
 
 const (
@@ -55,7 +59,7 @@ func (status ConsumeStatus) String() string {
 }
 
 type defaultPushConsumer struct {
-	config    *ConsumerConfig
+	config    *PushConsumerConfig
 	cconsumer *C.struct_CPushConsumer
 	funcsMap  sync.Map
 }
@@ -69,18 +73,112 @@ func (c *defaultPushConsumer) String() string {
 	return fmt.Sprintf("[%s, subcribed topics: [%s]]", c.config, topics)
 }
 
-func newPushConsumer(config *ConsumerConfig) (PushConsumer, error) {
+func newPushConsumer(config *PushConsumerConfig) (PushConsumer, error) {
+	if config == nil {
+		return nil, errors.New("config is nil")
+	}
+	if config.GroupID == "" {
+		return nil, errors.New("GroupId is empty.")
+	}
+
+	if config.NameServer == "" && config.NameServerDomain == "" {
+		return nil, errors.New("NameServer and NameServerDomain is empty.")
+	}
+
 	consumer := &defaultPushConsumer{config: config}
-	cconsumer := C.CreatePushConsumer(C.CString(config.GroupID))
-	C.SetPushConsumerNameServerAddress(cconsumer, C.CString(config.NameServer))
-	C.SetPushConsumerThreadCount(cconsumer, C.int(config.ConsumerThreadCount))
-	C.SetPushConsumerMessageBatchMaxSize(cconsumer, C.int(config.ConsumerThreadCount))
-	C.RegisterMessageCallback(cconsumer, (C.MessageCallBack)(unsafe.Pointer(C.callback_cgo)))
+	cs := C.CString(config.GroupID)
+	cconsumer := C.CreatePushConsumer(cs)
+	C.free(unsafe.Pointer(cs))
+
+	if cconsumer == nil {
+		return nil, errors.New("Create PushConsumer failed")
+	}
+
+	var code int
+	if config.NameServer != "" {
+		cs = C.CString(config.NameServer)
+		code = int(C.SetPushConsumerNameServerAddress(cconsumer, cs))
+		C.free(unsafe.Pointer(cs))
+		if code != 0 {
+			return nil, errors.New(fmt.Sprintf(fmt.Sprintf("PushConsumer Set NameServerAddress error, code is: %d", code)))
+		}
+	}
+
+	if config.NameServerDomain != "" {
+		cs = C.CString(config.NameServerDomain)
+		code = int(C.SetPushConsumerNameServerDomain(cconsumer, cs))
+		C.free(unsafe.Pointer(cs))
+		if code != 0 {
+			return nil, errors.New(fmt.Sprintf("PushConsumer Set NameServerDomain error, code is: %d", code))
+		}
+	}
+
+	if config.InstanceName != "" {
+		cs = C.CString(config.InstanceName)
+		code = int(C.SetPushConsumerInstanceName(cconsumer, cs))
+		C.free(unsafe.Pointer(cs))
+		if code != 0 {
+			return nil, errors.New(fmt.Sprintf("PushConsumer Set InstanceName error, code is: %d, "+
+				"please check cpp logs for details", code))
+		}
+	}
+
 	if config.Credentials != nil {
-		C.SetPushConsumerSessionCredentials(cconsumer,
-			C.CString(config.Credentials.AccessKey),
-			C.CString(config.Credentials.SecretKey),
-			C.CString(config.Credentials.Channel))
+		ak := C.CString(config.Credentials.AccessKey)
+		sk := C.CString(config.Credentials.SecretKey)
+		ch := C.CString(config.Credentials.Channel)
+		code = int(C.SetPushConsumerSessionCredentials(cconsumer, ak, sk, ch))
+		C.free(unsafe.Pointer(ak))
+		C.free(unsafe.Pointer(sk))
+		C.free(unsafe.Pointer(ch))
+		if code != 0 {
+			return nil, errors.New(fmt.Sprintf("PushConsumer Set Credentials error, code is: %d", int(code)))
+		}
+	}
+
+	if config.LogC != nil {
+		cs = C.CString(config.LogC.Path)
+		code = int(C.SetProducerLogPath(cconsumer, cs))
+		C.free(unsafe.Pointer(cs))
+		if code != 0 {
+			return nil, errors.New(fmt.Sprintf("Producer Set LogPath error, code is: %d", code))
+		}
+
+		code = int(C.SetProducerLogFileNumAndSize(cconsumer, C.int(config.LogC.FileNum), C.long(config.LogC.FileSize)))
+		if code != 0 {
+			return nil, errors.New(fmt.Sprintf("Producer Set FileNumAndSize error, code is: %d", code))
+		}
+
+		code = int(C.SetProducerLogLevel(cconsumer, C.CLogLevel(config.LogC.Level)))
+		if code != 0 {
+			return nil, errors.New(fmt.Sprintf("Producer Set LogLevel error, code is: %d", code))
+		}
+	}
+
+	if config.ThreadCount > 0 {
+		code = int(C.SetPushConsumerThreadCount(cconsumer, C.int(config.ThreadCount)))
+		if code != 0 {
+			return nil, errors.New(fmt.Sprintf("PushConsumer Set ThreadCount error, code is: %d", int(code)))
+		}
+	}
+
+	if config.MessageBatchMaxSize > 0 {
+		code = int(C.SetPushConsumerMessageBatchMaxSize(cconsumer, C.int(config.MessageBatchMaxSize)))
+		if code != 0 {
+			return nil, errors.New(fmt.Sprintf("PushConsumer Set MessageBatchMaxSize error, code is: %d", int(code)))
+		}
+	}
+
+	code = int(C.SetPushConsumerMessageModel(cconsumer, (C.CMessageModel)(config.Model)))
+
+	if code != 0 {
+		return nil, errors.New(fmt.Sprintf("PushConsumer Set ConsumerMessageModel error, code is: %d", int(code)))
+	}
+
+	code = int(C.RegisterMessageCallback(cconsumer, (C.MessageCallBack)(unsafe.Pointer(C.callback_cgo))))
+
+	if code != 0 {
+		return nil, errors.New(fmt.Sprintf("PushConsumer RegisterMessageCallback error, code is: %d", int(code)))
 	}
 
 	consumer.cconsumer = cconsumer
@@ -89,13 +187,24 @@ func newPushConsumer(config *ConsumerConfig) (PushConsumer, error) {
 }
 
 func (c *defaultPushConsumer) Start() error {
-	C.StartPushConsumer(c.cconsumer)
+	code := C.StartPushConsumer(c.cconsumer)
+	if code != 0 {
+		return errors.New(fmt.Sprintf("start PushConsumer error, code is: %d", int(code)))
+	}
 	return nil
 }
 
 func (c *defaultPushConsumer) Shutdown() error {
-	C.ShutdownPushConsumer(c.cconsumer)
+	code := C.ShutdownPushConsumer(c.cconsumer)
+
+	if code != 0 {
+		log.Warnf("Shutdown PushConsumer error, code is: %d, please check cpp logs for details", code)
+	}
+
 	C.DestroyPushConsumer(c.cconsumer)
+	if code != 0 {
+		log.Warnf("Destroy PushConsumer error, code is: %d, please check cpp logs for details", code)
+	}
 	return nil
 }
 
@@ -104,9 +213,11 @@ func (c *defaultPushConsumer) Subscribe(topic, expression string, consumeFunc fu
 	if exist {
 		return nil
 	}
-	err := C.Subscribe(c.cconsumer, C.CString(topic), C.CString(expression))
-	fmt.Println("err:", err)
+	code := C.Subscribe(c.cconsumer, C.CString(topic), C.CString(expression))
+	if code != 0 {
+		return errors.New(fmt.Sprintf("subscribe topic: %s failed, error code is: %d", topic, int(code)))
+	}
 	c.funcsMap.Store(topic, consumeFunc)
-	fmt.Printf("subscribe topic[%s] with expression[%s] successfully. \n", topic, expression)
+	log.Infof("subscribe topic[%s] with expression[%s] successfully.", topic, expression)
 	return nil
 }
diff --git a/core/version.go b/core/version.go
index 5700714..4ac86d5 100644
--- a/core/version.go
+++ b/core/version.go
@@ -16,8 +16,8 @@
  */
 package rocketmq
 
-const GO_CLIENT_VERSION = "Go Client V1.0.0, BuildTime:2018.10.30"
+const GoClientVersion = "Go Client V1.0.0, BuildTime:2018.10.30"
 
 func GetVersion() (version string) {
-	return GO_CLIENT_VERSION
+	return GoClientVersion
 }
diff --git a/examples/orderproducer/producer.go b/examples/orderproducer/producer.go
index 11fb864..5559d87 100644
--- a/examples/orderproducer/producer.go
+++ b/examples/orderproducer/producer.go
@@ -83,11 +83,16 @@ func main() {
 		return
 	}
 
-	producer := rocketmq.NewProducer(&rocketmq.ProducerConfig{
-		GroupID:    groupID,
-		NameServer: namesrvAddrs,
-	})
-  
+	cfg := &rocketmq.ProducerConfig{}
+	cfg.GroupID = groupID
+	cfg.NameServer = namesrvAddrs
+
+	producer, err := rocketmq.NewProducer(cfg)
+	if err != nil {
+		fmt.Println("create Producer failed, error:", err)
+		return
+	}
+
 	producer.Start()
 	defer producer.Shutdown()
 
diff --git a/examples/producer.go b/examples/producer.go
index 98a45b0..56ecee2 100644
--- a/examples/producer.go
+++ b/examples/producer.go
@@ -24,20 +24,23 @@ import (
 )
 
 func main() {
-	SendMessage()
-}
+	cfg := &rocketmq.ProducerConfig{}
+	cfg.GroupID = "testGroup"
+	cfg.NameServer = "47.101.55.250:9876"
+	producer, err := rocketmq.NewProducer(cfg)
+	if err != nil {
+		fmt.Println("create Producer failed, error:", err)
+		return
+	}
 
-func SendMessage() {
-	producer := rocketmq.NewProducer(&rocketmq.ProducerConfig{GroupID: "testGroup", NameServer: "localhost:9876"})
 	producer.Start()
 	defer producer.Shutdown()
 
 	fmt.Printf("Producer: %s started... \n", producer)
 	for i := 0; i < 100; i++ {
 		msg := fmt.Sprintf("Hello RocketMQ-%d", i)
-		result := producer.SendMessageSync(&rocketmq.Message{Topic: "test", Body: msg})
+		result := producer.SendMessageSync(&rocketmq.Message{Topic: "wwf1", Body: msg})
 		fmt.Println(fmt.Sprintf("send message: %s result: %s", msg, result))
 	}
 	time.Sleep(10 * time.Second)
-	producer.Shutdown()
 }
diff --git a/examples/pullconsumer/consumer.go b/examples/pullconsumer/consumer.go
index 3643a22..d63729e 100644
--- a/examples/pullconsumer/consumer.go
+++ b/examples/pullconsumer/consumer.go
@@ -59,13 +59,14 @@ func main() {
 		return
 	}
 
-	consumer, err := rocketmq.NewPullConsumer(&rocketmq.PullConsumerConfig{
-		GroupID:    groupID,
-		NameServer: namesrvAddrs,
-		Log: &rocketmq.LogConfig{
-			Path: "example",
-		},
-	})
+	cfg := &rocketmq.PullConsumerConfig{}
+	cfg.GroupID = groupID
+	cfg.NameServer = namesrvAddrs
+	cfg.LogC = &rocketmq.LogConfig{
+		Path: "example",
+	}
+
+	consumer, err := rocketmq.NewPullConsumer(cfg)
 	if err != nil {
 		fmt.Printf("new pull consumer error:%s\n", err)
 		return
diff --git a/examples/push_consumer.go b/examples/push_consumer.go
index e65613b..d17559c 100644
--- a/examples/push_consumer.go
+++ b/examples/push_consumer.go
@@ -24,13 +24,18 @@ import (
 )
 
 func main() {
-	PushConsumeMessage()
-}
-
-func PushConsumeMessage() {
 	fmt.Println("Start Receiving Messages...")
-	consumer, _ := rocketmq.NewPushConsumer(&rocketmq.ConsumerConfig{GroupID: "testGroupId", NameServer: "localhost:9876",
-		ConsumerThreadCount: 2, MessageBatchMaxSize: 16})
+	cfg := &rocketmq.PushConsumerConfig{
+		ThreadCount:         2,
+		MessageBatchMaxSize: 16,
+	}
+	cfg.GroupID = "testGroupId"
+	cfg.NameServer = "localhost:9876"
+	consumer, err := rocketmq.NewPushConsumer(cfg)
+	if err != nil {
+		fmt.Println("create Consumer failed, error:", err)
+		return
+	}
 
 	// MUST subscribe topic before consumer started.
 	consumer.Subscribe("test", "*", func(msg *rocketmq.MessageExt) rocketmq.ConsumeStatus {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services