You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2020/03/26 02:38:21 UTC

[rocketmq-client-go] branch master updated: fix typo + add GetLatestErrorMessage(for easy debugging) (#461)

This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new d9ce248  fix typo + add GetLatestErrorMessage(for easy debugging) (#461)
d9ce248 is described below

commit d9ce2486470b58c406110dba87b6b50ac6889b0d
Author: morysky <sy...@gmail.com>
AuthorDate: Thu Mar 26 10:38:12 2020 +0800

    fix typo + add GetLatestErrorMessage(for easy debugging) (#461)
---
 core/error.go                |  6 ++++++
 core/producer.go             | 46 ++++++++++++++++++++++----------------------
 core/transaction_producer.go | 42 ++++++++++++++++++++--------------------
 3 files changed, 50 insertions(+), 44 deletions(-)

diff --git a/core/error.go b/core/error.go
index d847af4..0f15a1e 100644
--- a/core/error.go
+++ b/core/error.go
@@ -19,6 +19,7 @@ package rocketmq
 
 /*
 #include <rocketmq/CCommon.h>
+#include <rocketmq/CErrorMessage.h>
 */
 import "C"
 import "fmt"
@@ -72,3 +73,8 @@ func (e rmqError) Error() string {
 		return fmt.Sprintf("unknow error: %v", int(e))
 	}
 }
+
+// GetLatestErrorMessage Get latest detailed error message from CPP-SDK
+func GetLatestErrorMessage() string {
+	return C.GoString(C.GetLatestErrorMessage())
+}
\ No newline at end of file
diff --git a/core/producer.go b/core/producer.go
index 2e1279a..e28b652 100644
--- a/core/producer.go
+++ b/core/producer.go
@@ -82,25 +82,25 @@ func newDefaultProducer(config *ProducerConfig) (*defaultProducer, error) {
 
 	producer := &defaultProducer{config: config}
 	cs := C.CString(config.GroupID)
-	var cproduer *C.struct_CProducer
+	var cproducer *C.struct_CProducer
 	if config.ProducerModel == OrderlyProducer {
-		cproduer = C.CreateOrderlyProducer(cs)
+		cproducer = C.CreateOrderlyProducer(cs)
 	} else if config.ProducerModel == CommonProducer {
-		cproduer = C.CreateProducer(cs)
+		cproducer = C.CreateProducer(cs)
 	} else {
 		C.free(unsafe.Pointer(cs))
 		return nil, errors.New("ProducerModel is invalid or empty")
 	}
 	C.free(unsafe.Pointer(cs))
 
-	if cproduer == nil {
+	if cproducer == nil {
 		return nil, errors.New("create Producer failed")
 	}
 
 	var err rmqError
 	if config.NameServer != "" {
 		cs = C.CString(config.NameServer)
-		err = rmqError(C.SetProducerNameServerAddress(cproduer, cs))
+		err = rmqError(C.SetProducerNameServerAddress(cproducer, cs))
 		C.free(unsafe.Pointer(cs))
 		if err != NIL {
 			return nil, err
@@ -109,7 +109,7 @@ func newDefaultProducer(config *ProducerConfig) (*defaultProducer, error) {
 
 	if config.NameServerDomain != "" {
 		cs = C.CString(config.NameServerDomain)
-		err = rmqError(C.SetProducerNameServerDomain(cproduer, cs))
+		err = rmqError(C.SetProducerNameServerDomain(cproducer, cs))
 		C.free(unsafe.Pointer(cs))
 		if err != NIL {
 			return nil, err
@@ -118,7 +118,7 @@ func newDefaultProducer(config *ProducerConfig) (*defaultProducer, error) {
 
 	if config.InstanceName != "" {
 		cs = C.CString(config.InstanceName)
-		err = rmqError(C.SetProducerInstanceName(cproduer, cs))
+		err = rmqError(C.SetProducerInstanceName(cproducer, cs))
 		C.free(unsafe.Pointer(cs))
 		if err != NIL {
 			return nil, err
@@ -129,7 +129,7 @@ func newDefaultProducer(config *ProducerConfig) (*defaultProducer, error) {
 		ak := C.CString(config.Credentials.AccessKey)
 		sk := C.CString(config.Credentials.SecretKey)
 		ch := C.CString(config.Credentials.Channel)
-		err = rmqError(C.SetProducerSessionCredentials(cproduer, ak, sk, ch))
+		err = rmqError(C.SetProducerSessionCredentials(cproducer, ak, sk, ch))
 
 		C.free(unsafe.Pointer(ak))
 		C.free(unsafe.Pointer(sk))
@@ -141,51 +141,51 @@ func newDefaultProducer(config *ProducerConfig) (*defaultProducer, error) {
 
 	if config.LogC != nil {
 		cs = C.CString(config.LogC.Path)
-		err = rmqError(C.SetProducerLogPath(cproduer, cs))
+		err = rmqError(C.SetProducerLogPath(cproducer, cs))
 		C.free(unsafe.Pointer(cs))
 		if err != NIL {
 			return nil, err
 		}
 
-		err = rmqError(C.SetProducerLogFileNumAndSize(cproduer, C.int(config.LogC.FileNum), C.long(config.LogC.FileSize)))
+		err = rmqError(C.SetProducerLogFileNumAndSize(cproducer, C.int(config.LogC.FileNum), C.long(config.LogC.FileSize)))
 		if err != NIL {
 			return nil, err
 		}
 
-		err = rmqError(C.SetProducerLogLevel(cproduer, C.CLogLevel(config.LogC.Level)))
+		err = rmqError(C.SetProducerLogLevel(cproducer, C.CLogLevel(config.LogC.Level)))
 		if err != NIL {
 			return nil, err
 		}
 	}
 
 	if config.SendMsgTimeout > 0 {
-		err = rmqError(C.SetProducerSendMsgTimeout(cproduer, C.int(config.SendMsgTimeout)))
+		err = rmqError(C.SetProducerSendMsgTimeout(cproducer, C.int(config.SendMsgTimeout)))
 		if err != NIL {
 			return nil, err
 		}
 	}
 
 	if config.CompressLevel > 0 {
-		err = rmqError(C.SetProducerCompressLevel(cproduer, C.int(config.CompressLevel)))
+		err = rmqError(C.SetProducerCompressLevel(cproducer, C.int(config.CompressLevel)))
 		if err != NIL {
 			return nil, err
 		}
 	}
 
 	if config.MaxMessageSize > 0 {
-		err = rmqError(C.SetProducerMaxMessageSize(cproduer, C.int(config.MaxMessageSize)))
+		err = rmqError(C.SetProducerMaxMessageSize(cproducer, C.int(config.MaxMessageSize)))
 		if err != NIL {
 			return nil, err
 		}
 	}
 
-	producer.cproduer = cproduer
+	producer.cproducer = cproducer
 	return producer, nil
 }
 
 type defaultProducer struct {
 	config   *ProducerConfig
-	cproduer *C.struct_CProducer
+	cproducer *C.struct_CProducer
 }
 
 func (p *defaultProducer) String() string {
@@ -194,7 +194,7 @@ func (p *defaultProducer) String() string {
 
 // Start the producer.
 func (p *defaultProducer) Start() error {
-	err := rmqError(C.StartProducer(p.cproduer))
+	err := rmqError(C.StartProducer(p.cproducer))
 	if err != NIL {
 		return err
 	}
@@ -203,13 +203,13 @@ func (p *defaultProducer) Start() error {
 
 // Shutdown the producer.
 func (p *defaultProducer) Shutdown() error {
-	err := rmqError(C.ShutdownProducer(p.cproduer))
+	err := rmqError(C.ShutdownProducer(p.cproducer))
 
 	if err != NIL {
 		return err
 	}
 
-	err = rmqError(int(C.DestroyProducer(p.cproduer)))
+	err = rmqError(int(C.DestroyProducer(p.cproducer)))
 	if err != NIL {
 		return err
 	}
@@ -222,7 +222,7 @@ func (p *defaultProducer) SendMessageSync(msg *Message) (*SendResult, error) {
 	defer C.DestroyMessage(cmsg)
 
 	var sr C.struct__SendResult_
-	err := rmqError(C.SendMessageSync(p.cproduer, cmsg, &sr))
+	err := rmqError(C.SendMessageSync(p.cproducer, cmsg, &sr))
 
 	if err != NIL {
 		log.Warnf("send message error, error is: %s", err.Error())
@@ -247,7 +247,7 @@ func (p *defaultProducer) SendMessageOrderly(msg *Message, selector MessageQueue
 
 	var sr C.struct__SendResult_
 	err := rmqError(C.SendMessageOrderly(
-		p.cproduer,
+		p.cproducer,
 		cmsg,
 		(C.QueueSelectorCallback)(unsafe.Pointer(C.queueSelectorCallback_cgo)),
 		unsafe.Pointer(&key),
@@ -270,7 +270,7 @@ func (p *defaultProducer) SendMessageOneway(msg *Message) error {
 	cmsg := goMsgToC(msg)
 	defer C.DestroyMessage(cmsg)
 
-	err := rmqError(C.SendMessageOneway(p.cproduer, cmsg))
+	err := rmqError(C.SendMessageOneway(p.cproducer, cmsg))
 	if err != NIL {
 		log.Warnf("send message with oneway error, error is: %s", err.Error())
 		return err
@@ -291,7 +291,7 @@ func (p *defaultProducer) SendMessageOrderlyByShardingKey(msg *Message, sharding
 	defer C.free(unsafe.Pointer(cshardingkey))
 	var sr C.struct__SendResult_
 	err := rmqError(C.SendMessageOrderlyByShardingKey(
-		p.cproduer,
+		p.cproducer,
 		cmsg,
 		cshardingkey,
 		&sr))
diff --git a/core/transaction_producer.go b/core/transaction_producer.go
index 09ae52f..e79f2c2 100644
--- a/core/transaction_producer.go
+++ b/core/transaction_producer.go
@@ -82,20 +82,20 @@ func newDefaultTransactionProducer(config *ProducerConfig, listener TransactionL
 
 	producer := &defaultTransactionProducer{config: config}
 	cs := C.CString(config.GroupID)
-	var cproduer *C.struct_CProducer
+	var cproducer *C.struct_CProducer
 
-	cproduer = C.CreateTransactionProducer(cs, (C.CLocalTransactionCheckerCallback)(unsafe.Pointer(C.transactionChecker_cgo)), unsafe.Pointer(&arg))
+	cproducer = C.CreateTransactionProducer(cs, (C.CLocalTransactionCheckerCallback)(unsafe.Pointer(C.transactionChecker_cgo)), unsafe.Pointer(&arg))
 
 	C.free(unsafe.Pointer(cs))
 
-	if cproduer == nil {
+	if cproducer == nil {
 		return nil, errors.New("create transaction Producer failed")
 	}
 
 	var err rmqError
 	if config.NameServer != "" {
 		cs = C.CString(config.NameServer)
-		err = rmqError(C.SetProducerNameServerAddress(cproduer, cs))
+		err = rmqError(C.SetProducerNameServerAddress(cproducer, cs))
 		C.free(unsafe.Pointer(cs))
 		if err != NIL {
 			return nil, err
@@ -104,7 +104,7 @@ func newDefaultTransactionProducer(config *ProducerConfig, listener TransactionL
 
 	if config.NameServerDomain != "" {
 		cs = C.CString(config.NameServerDomain)
-		err = rmqError(C.SetProducerNameServerDomain(cproduer, cs))
+		err = rmqError(C.SetProducerNameServerDomain(cproducer, cs))
 		C.free(unsafe.Pointer(cs))
 		if err != NIL {
 			return nil, err
@@ -113,7 +113,7 @@ func newDefaultTransactionProducer(config *ProducerConfig, listener TransactionL
 
 	if config.InstanceName != "" {
 		cs = C.CString(config.InstanceName)
-		err = rmqError(C.SetProducerInstanceName(cproduer, cs))
+		err = rmqError(C.SetProducerInstanceName(cproducer, cs))
 		C.free(unsafe.Pointer(cs))
 		if err != NIL {
 			return nil, err
@@ -124,7 +124,7 @@ func newDefaultTransactionProducer(config *ProducerConfig, listener TransactionL
 		ak := C.CString(config.Credentials.AccessKey)
 		sk := C.CString(config.Credentials.SecretKey)
 		ch := C.CString(config.Credentials.Channel)
-		err = rmqError(C.SetProducerSessionCredentials(cproduer, ak, sk, ch))
+		err = rmqError(C.SetProducerSessionCredentials(cproducer, ak, sk, ch))
 
 		C.free(unsafe.Pointer(ak))
 		C.free(unsafe.Pointer(sk))
@@ -136,53 +136,53 @@ func newDefaultTransactionProducer(config *ProducerConfig, listener TransactionL
 
 	if config.LogC != nil {
 		cs = C.CString(config.LogC.Path)
-		err = rmqError(C.SetProducerLogPath(cproduer, cs))
+		err = rmqError(C.SetProducerLogPath(cproducer, cs))
 		C.free(unsafe.Pointer(cs))
 		if err != NIL {
 			return nil, err
 		}
 
-		err = rmqError(C.SetProducerLogFileNumAndSize(cproduer, C.int(config.LogC.FileNum), C.long(config.LogC.FileSize)))
+		err = rmqError(C.SetProducerLogFileNumAndSize(cproducer, C.int(config.LogC.FileNum), C.long(config.LogC.FileSize)))
 		if err != NIL {
 			return nil, err
 		}
 
-		err = rmqError(C.SetProducerLogLevel(cproduer, C.CLogLevel(config.LogC.Level)))
+		err = rmqError(C.SetProducerLogLevel(cproducer, C.CLogLevel(config.LogC.Level)))
 		if err != NIL {
 			return nil, err
 		}
 	}
 
 	if config.SendMsgTimeout > 0 {
-		err = rmqError(C.SetProducerSendMsgTimeout(cproduer, C.int(config.SendMsgTimeout)))
+		err = rmqError(C.SetProducerSendMsgTimeout(cproducer, C.int(config.SendMsgTimeout)))
 		if err != NIL {
 			return nil, err
 		}
 	}
 
 	if config.CompressLevel > 0 {
-		err = rmqError(C.SetProducerCompressLevel(cproduer, C.int(config.CompressLevel)))
+		err = rmqError(C.SetProducerCompressLevel(cproducer, C.int(config.CompressLevel)))
 		if err != NIL {
 			return nil, err
 		}
 	}
 
 	if config.MaxMessageSize > 0 {
-		err = rmqError(C.SetProducerMaxMessageSize(cproduer, C.int(config.MaxMessageSize)))
+		err = rmqError(C.SetProducerMaxMessageSize(cproducer, C.int(config.MaxMessageSize)))
 		if err != NIL {
 			return nil, err
 		}
 	}
 
-	producer.cproduer = cproduer
-	transactionProducerMap.Store(cproduer, producer)
-	producer.listenerFuncsMap.Store(cproduer, listener)
+	producer.cproducer = cproducer
+	transactionProducerMap.Store(cproducer, producer)
+	producer.listenerFuncsMap.Store(cproducer, listener)
 	return producer, nil
 }
 
 type defaultTransactionProducer struct {
 	config           *ProducerConfig
-	cproduer         *C.struct_CProducer
+	cproducer         *C.struct_CProducer
 	listenerFuncsMap sync.Map
 }
 
@@ -192,7 +192,7 @@ func (p *defaultTransactionProducer) String() string {
 
 // Start the producer.
 func (p *defaultTransactionProducer) Start() error {
-	err := rmqError(C.StartProducer(p.cproduer))
+	err := rmqError(C.StartProducer(p.cproducer))
 	if err != NIL {
 		return err
 	}
@@ -201,13 +201,13 @@ func (p *defaultTransactionProducer) Start() error {
 
 // Shutdown the producer.
 func (p *defaultTransactionProducer) Shutdown() error {
-	err := rmqError(C.ShutdownProducer(p.cproduer))
+	err := rmqError(C.ShutdownProducer(p.cproducer))
 
 	if err != NIL {
 		return err
 	}
 
-	err = rmqError(int(C.DestroyProducer(p.cproduer)))
+	err = rmqError(int(C.DestroyProducer(p.cproducer)))
 	if err != NIL {
 		return err
 	}
@@ -220,7 +220,7 @@ func (p *defaultTransactionProducer) SendMessageTransaction(msg *Message, arg in
 	defer C.DestroyMessage(cmsg)
 
 	var sr C.struct__SendResult_
-	err := rmqError(C.SendMessageTransaction(p.cproduer, cmsg, (C.CLocalTransactionExecutorCallback)(unsafe.Pointer(C.transactionExecutor_cgo)), unsafe.Pointer(&arg), &sr))
+	err := rmqError(C.SendMessageTransaction(p.cproducer, cmsg, (C.CLocalTransactionExecutorCallback)(unsafe.Pointer(C.transactionExecutor_cgo)), unsafe.Pointer(&arg), &sr))
 	if err != NIL {
 		log.Warnf("send message error, error is: %s", err.Error())
 		return nil, err