You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2020/03/26 02:38:21 UTC
[rocketmq-client-go] branch master updated: fix typo + add
GetLatestErrorMessage(for easy debugging) (#461)
This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new d9ce248 fix typo + add GetLatestErrorMessage(for easy debugging) (#461)
d9ce248 is described below
commit d9ce2486470b58c406110dba87b6b50ac6889b0d
Author: morysky <sy...@gmail.com>
AuthorDate: Thu Mar 26 10:38:12 2020 +0800
fix typo + add GetLatestErrorMessage(for easy debugging) (#461)
---
core/error.go | 6 ++++++
core/producer.go | 46 ++++++++++++++++++++++----------------------
core/transaction_producer.go | 42 ++++++++++++++++++++--------------------
3 files changed, 50 insertions(+), 44 deletions(-)
diff --git a/core/error.go b/core/error.go
index d847af4..0f15a1e 100644
--- a/core/error.go
+++ b/core/error.go
@@ -19,6 +19,7 @@ package rocketmq
/*
#include <rocketmq/CCommon.h>
+#include <rocketmq/CErrorMessage.h>
*/
import "C"
import "fmt"
@@ -72,3 +73,8 @@ func (e rmqError) Error() string {
return fmt.Sprintf("unknow error: %v", int(e))
}
}
+
+// GetLatestErrorMessage Get latest detailed error message from CPP-SDK
+func GetLatestErrorMessage() string {
+ return C.GoString(C.GetLatestErrorMessage())
+}
\ No newline at end of file
diff --git a/core/producer.go b/core/producer.go
index 2e1279a..e28b652 100644
--- a/core/producer.go
+++ b/core/producer.go
@@ -82,25 +82,25 @@ func newDefaultProducer(config *ProducerConfig) (*defaultProducer, error) {
producer := &defaultProducer{config: config}
cs := C.CString(config.GroupID)
- var cproduer *C.struct_CProducer
+ var cproducer *C.struct_CProducer
if config.ProducerModel == OrderlyProducer {
- cproduer = C.CreateOrderlyProducer(cs)
+ cproducer = C.CreateOrderlyProducer(cs)
} else if config.ProducerModel == CommonProducer {
- cproduer = C.CreateProducer(cs)
+ cproducer = C.CreateProducer(cs)
} else {
C.free(unsafe.Pointer(cs))
return nil, errors.New("ProducerModel is invalid or empty")
}
C.free(unsafe.Pointer(cs))
- if cproduer == nil {
+ if cproducer == nil {
return nil, errors.New("create Producer failed")
}
var err rmqError
if config.NameServer != "" {
cs = C.CString(config.NameServer)
- err = rmqError(C.SetProducerNameServerAddress(cproduer, cs))
+ err = rmqError(C.SetProducerNameServerAddress(cproducer, cs))
C.free(unsafe.Pointer(cs))
if err != NIL {
return nil, err
@@ -109,7 +109,7 @@ func newDefaultProducer(config *ProducerConfig) (*defaultProducer, error) {
if config.NameServerDomain != "" {
cs = C.CString(config.NameServerDomain)
- err = rmqError(C.SetProducerNameServerDomain(cproduer, cs))
+ err = rmqError(C.SetProducerNameServerDomain(cproducer, cs))
C.free(unsafe.Pointer(cs))
if err != NIL {
return nil, err
@@ -118,7 +118,7 @@ func newDefaultProducer(config *ProducerConfig) (*defaultProducer, error) {
if config.InstanceName != "" {
cs = C.CString(config.InstanceName)
- err = rmqError(C.SetProducerInstanceName(cproduer, cs))
+ err = rmqError(C.SetProducerInstanceName(cproducer, cs))
C.free(unsafe.Pointer(cs))
if err != NIL {
return nil, err
@@ -129,7 +129,7 @@ func newDefaultProducer(config *ProducerConfig) (*defaultProducer, error) {
ak := C.CString(config.Credentials.AccessKey)
sk := C.CString(config.Credentials.SecretKey)
ch := C.CString(config.Credentials.Channel)
- err = rmqError(C.SetProducerSessionCredentials(cproduer, ak, sk, ch))
+ err = rmqError(C.SetProducerSessionCredentials(cproducer, ak, sk, ch))
C.free(unsafe.Pointer(ak))
C.free(unsafe.Pointer(sk))
@@ -141,51 +141,51 @@ func newDefaultProducer(config *ProducerConfig) (*defaultProducer, error) {
if config.LogC != nil {
cs = C.CString(config.LogC.Path)
- err = rmqError(C.SetProducerLogPath(cproduer, cs))
+ err = rmqError(C.SetProducerLogPath(cproducer, cs))
C.free(unsafe.Pointer(cs))
if err != NIL {
return nil, err
}
- err = rmqError(C.SetProducerLogFileNumAndSize(cproduer, C.int(config.LogC.FileNum), C.long(config.LogC.FileSize)))
+ err = rmqError(C.SetProducerLogFileNumAndSize(cproducer, C.int(config.LogC.FileNum), C.long(config.LogC.FileSize)))
if err != NIL {
return nil, err
}
- err = rmqError(C.SetProducerLogLevel(cproduer, C.CLogLevel(config.LogC.Level)))
+ err = rmqError(C.SetProducerLogLevel(cproducer, C.CLogLevel(config.LogC.Level)))
if err != NIL {
return nil, err
}
}
if config.SendMsgTimeout > 0 {
- err = rmqError(C.SetProducerSendMsgTimeout(cproduer, C.int(config.SendMsgTimeout)))
+ err = rmqError(C.SetProducerSendMsgTimeout(cproducer, C.int(config.SendMsgTimeout)))
if err != NIL {
return nil, err
}
}
if config.CompressLevel > 0 {
- err = rmqError(C.SetProducerCompressLevel(cproduer, C.int(config.CompressLevel)))
+ err = rmqError(C.SetProducerCompressLevel(cproducer, C.int(config.CompressLevel)))
if err != NIL {
return nil, err
}
}
if config.MaxMessageSize > 0 {
- err = rmqError(C.SetProducerMaxMessageSize(cproduer, C.int(config.MaxMessageSize)))
+ err = rmqError(C.SetProducerMaxMessageSize(cproducer, C.int(config.MaxMessageSize)))
if err != NIL {
return nil, err
}
}
- producer.cproduer = cproduer
+ producer.cproducer = cproducer
return producer, nil
}
type defaultProducer struct {
config *ProducerConfig
- cproduer *C.struct_CProducer
+ cproducer *C.struct_CProducer
}
func (p *defaultProducer) String() string {
@@ -194,7 +194,7 @@ func (p *defaultProducer) String() string {
// Start the producer.
func (p *defaultProducer) Start() error {
- err := rmqError(C.StartProducer(p.cproduer))
+ err := rmqError(C.StartProducer(p.cproducer))
if err != NIL {
return err
}
@@ -203,13 +203,13 @@ func (p *defaultProducer) Start() error {
// Shutdown the producer.
func (p *defaultProducer) Shutdown() error {
- err := rmqError(C.ShutdownProducer(p.cproduer))
+ err := rmqError(C.ShutdownProducer(p.cproducer))
if err != NIL {
return err
}
- err = rmqError(int(C.DestroyProducer(p.cproduer)))
+ err = rmqError(int(C.DestroyProducer(p.cproducer)))
if err != NIL {
return err
}
@@ -222,7 +222,7 @@ func (p *defaultProducer) SendMessageSync(msg *Message) (*SendResult, error) {
defer C.DestroyMessage(cmsg)
var sr C.struct__SendResult_
- err := rmqError(C.SendMessageSync(p.cproduer, cmsg, &sr))
+ err := rmqError(C.SendMessageSync(p.cproducer, cmsg, &sr))
if err != NIL {
log.Warnf("send message error, error is: %s", err.Error())
@@ -247,7 +247,7 @@ func (p *defaultProducer) SendMessageOrderly(msg *Message, selector MessageQueue
var sr C.struct__SendResult_
err := rmqError(C.SendMessageOrderly(
- p.cproduer,
+ p.cproducer,
cmsg,
(C.QueueSelectorCallback)(unsafe.Pointer(C.queueSelectorCallback_cgo)),
unsafe.Pointer(&key),
@@ -270,7 +270,7 @@ func (p *defaultProducer) SendMessageOneway(msg *Message) error {
cmsg := goMsgToC(msg)
defer C.DestroyMessage(cmsg)
- err := rmqError(C.SendMessageOneway(p.cproduer, cmsg))
+ err := rmqError(C.SendMessageOneway(p.cproducer, cmsg))
if err != NIL {
log.Warnf("send message with oneway error, error is: %s", err.Error())
return err
@@ -291,7 +291,7 @@ func (p *defaultProducer) SendMessageOrderlyByShardingKey(msg *Message, sharding
defer C.free(unsafe.Pointer(cshardingkey))
var sr C.struct__SendResult_
err := rmqError(C.SendMessageOrderlyByShardingKey(
- p.cproduer,
+ p.cproducer,
cmsg,
cshardingkey,
&sr))
diff --git a/core/transaction_producer.go b/core/transaction_producer.go
index 09ae52f..e79f2c2 100644
--- a/core/transaction_producer.go
+++ b/core/transaction_producer.go
@@ -82,20 +82,20 @@ func newDefaultTransactionProducer(config *ProducerConfig, listener TransactionL
producer := &defaultTransactionProducer{config: config}
cs := C.CString(config.GroupID)
- var cproduer *C.struct_CProducer
+ var cproducer *C.struct_CProducer
- cproduer = C.CreateTransactionProducer(cs, (C.CLocalTransactionCheckerCallback)(unsafe.Pointer(C.transactionChecker_cgo)), unsafe.Pointer(&arg))
+ cproducer = C.CreateTransactionProducer(cs, (C.CLocalTransactionCheckerCallback)(unsafe.Pointer(C.transactionChecker_cgo)), unsafe.Pointer(&arg))
C.free(unsafe.Pointer(cs))
- if cproduer == nil {
+ if cproducer == nil {
return nil, errors.New("create transaction Producer failed")
}
var err rmqError
if config.NameServer != "" {
cs = C.CString(config.NameServer)
- err = rmqError(C.SetProducerNameServerAddress(cproduer, cs))
+ err = rmqError(C.SetProducerNameServerAddress(cproducer, cs))
C.free(unsafe.Pointer(cs))
if err != NIL {
return nil, err
@@ -104,7 +104,7 @@ func newDefaultTransactionProducer(config *ProducerConfig, listener TransactionL
if config.NameServerDomain != "" {
cs = C.CString(config.NameServerDomain)
- err = rmqError(C.SetProducerNameServerDomain(cproduer, cs))
+ err = rmqError(C.SetProducerNameServerDomain(cproducer, cs))
C.free(unsafe.Pointer(cs))
if err != NIL {
return nil, err
@@ -113,7 +113,7 @@ func newDefaultTransactionProducer(config *ProducerConfig, listener TransactionL
if config.InstanceName != "" {
cs = C.CString(config.InstanceName)
- err = rmqError(C.SetProducerInstanceName(cproduer, cs))
+ err = rmqError(C.SetProducerInstanceName(cproducer, cs))
C.free(unsafe.Pointer(cs))
if err != NIL {
return nil, err
@@ -124,7 +124,7 @@ func newDefaultTransactionProducer(config *ProducerConfig, listener TransactionL
ak := C.CString(config.Credentials.AccessKey)
sk := C.CString(config.Credentials.SecretKey)
ch := C.CString(config.Credentials.Channel)
- err = rmqError(C.SetProducerSessionCredentials(cproduer, ak, sk, ch))
+ err = rmqError(C.SetProducerSessionCredentials(cproducer, ak, sk, ch))
C.free(unsafe.Pointer(ak))
C.free(unsafe.Pointer(sk))
@@ -136,53 +136,53 @@ func newDefaultTransactionProducer(config *ProducerConfig, listener TransactionL
if config.LogC != nil {
cs = C.CString(config.LogC.Path)
- err = rmqError(C.SetProducerLogPath(cproduer, cs))
+ err = rmqError(C.SetProducerLogPath(cproducer, cs))
C.free(unsafe.Pointer(cs))
if err != NIL {
return nil, err
}
- err = rmqError(C.SetProducerLogFileNumAndSize(cproduer, C.int(config.LogC.FileNum), C.long(config.LogC.FileSize)))
+ err = rmqError(C.SetProducerLogFileNumAndSize(cproducer, C.int(config.LogC.FileNum), C.long(config.LogC.FileSize)))
if err != NIL {
return nil, err
}
- err = rmqError(C.SetProducerLogLevel(cproduer, C.CLogLevel(config.LogC.Level)))
+ err = rmqError(C.SetProducerLogLevel(cproducer, C.CLogLevel(config.LogC.Level)))
if err != NIL {
return nil, err
}
}
if config.SendMsgTimeout > 0 {
- err = rmqError(C.SetProducerSendMsgTimeout(cproduer, C.int(config.SendMsgTimeout)))
+ err = rmqError(C.SetProducerSendMsgTimeout(cproducer, C.int(config.SendMsgTimeout)))
if err != NIL {
return nil, err
}
}
if config.CompressLevel > 0 {
- err = rmqError(C.SetProducerCompressLevel(cproduer, C.int(config.CompressLevel)))
+ err = rmqError(C.SetProducerCompressLevel(cproducer, C.int(config.CompressLevel)))
if err != NIL {
return nil, err
}
}
if config.MaxMessageSize > 0 {
- err = rmqError(C.SetProducerMaxMessageSize(cproduer, C.int(config.MaxMessageSize)))
+ err = rmqError(C.SetProducerMaxMessageSize(cproducer, C.int(config.MaxMessageSize)))
if err != NIL {
return nil, err
}
}
- producer.cproduer = cproduer
- transactionProducerMap.Store(cproduer, producer)
- producer.listenerFuncsMap.Store(cproduer, listener)
+ producer.cproducer = cproducer
+ transactionProducerMap.Store(cproducer, producer)
+ producer.listenerFuncsMap.Store(cproducer, listener)
return producer, nil
}
type defaultTransactionProducer struct {
config *ProducerConfig
- cproduer *C.struct_CProducer
+ cproducer *C.struct_CProducer
listenerFuncsMap sync.Map
}
@@ -192,7 +192,7 @@ func (p *defaultTransactionProducer) String() string {
// Start the producer.
func (p *defaultTransactionProducer) Start() error {
- err := rmqError(C.StartProducer(p.cproduer))
+ err := rmqError(C.StartProducer(p.cproducer))
if err != NIL {
return err
}
@@ -201,13 +201,13 @@ func (p *defaultTransactionProducer) Start() error {
// Shutdown the producer.
func (p *defaultTransactionProducer) Shutdown() error {
- err := rmqError(C.ShutdownProducer(p.cproduer))
+ err := rmqError(C.ShutdownProducer(p.cproducer))
if err != NIL {
return err
}
- err = rmqError(int(C.DestroyProducer(p.cproduer)))
+ err = rmqError(int(C.DestroyProducer(p.cproducer)))
if err != NIL {
return err
}
@@ -220,7 +220,7 @@ func (p *defaultTransactionProducer) SendMessageTransaction(msg *Message, arg in
defer C.DestroyMessage(cmsg)
var sr C.struct__SendResult_
- err := rmqError(C.SendMessageTransaction(p.cproduer, cmsg, (C.CLocalTransactionExecutorCallback)(unsafe.Pointer(C.transactionExecutor_cgo)), unsafe.Pointer(&arg), &sr))
+ err := rmqError(C.SendMessageTransaction(p.cproducer, cmsg, (C.CLocalTransactionExecutorCallback)(unsafe.Pointer(C.transactionExecutor_cgo)), unsafe.Pointer(&arg), &sr))
if err != NIL {
log.Warnf("send message error, error is: %s", err.Error())
return nil, err