You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2020/03/18 09:47:51 UTC

[rocketmq-client-cpp] branch master updated: feat(trace): add message trace shift for C style apis. (#281)

This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git


The following commit(s) were added to refs/heads/master by this push:
     new f7c0a1a  feat(trace): add message trace shift for C style apis. (#281)
f7c0a1a is described below

commit f7c0a1a3ffb457f75b234fc2248b13683bcdffee
Author: dinglei <li...@163.com>
AuthorDate: Wed Mar 18 17:47:41 2020 +0800

    feat(trace): add message trace shift for C style apis. (#281)
---
 include/CCommon.h                      |  1 +
 include/CProducer.h                    |  1 +
 include/CPushConsumer.h                |  1 +
 include/TransactionMQProducer.h        |  3 ++-
 src/extern/CProducer.cpp               | 18 ++++++++++++++++++
 src/extern/CPushConsumer.cpp           |  9 ++++++++-
 src/producer/TransactionMQProducer.cpp |  7 ++++++-
 test/src/extern/CProducerTest.cpp      |  4 ++++
 test/src/extern/CPushConsumerTest.cpp  |  2 ++
 9 files changed, 43 insertions(+), 3 deletions(-)

diff --git a/include/CCommon.h b/include/CCommon.h
index fa6edb9..ae8a9e1 100644
--- a/include/CCommon.h
+++ b/include/CCommon.h
@@ -83,6 +83,7 @@ typedef enum _CLogLevel_ {
 #endif
 
 typedef enum _CMessageModel_ { BROADCASTING, CLUSTERING } CMessageModel;
+typedef enum _CTraceModel_ { OPEN, CLOSE } CTraceModel;
 
 #ifdef __cplusplus
 }
diff --git a/include/CProducer.h b/include/CProducer.h
index fa6a730..296b13f 100644
--- a/include/CProducer.h
+++ b/include/CProducer.h
@@ -63,6 +63,7 @@ ROCKETMQCLIENT_API int SetProducerLogLevel(CProducer* producer, CLogLevel level)
 ROCKETMQCLIENT_API int SetProducerSendMsgTimeout(CProducer* producer, int timeout);
 ROCKETMQCLIENT_API int SetProducerCompressLevel(CProducer* producer, int level);
 ROCKETMQCLIENT_API int SetProducerMaxMessageSize(CProducer* producer, int size);
+ROCKETMQCLIENT_API int SetProducerMessageTrace(CProducer* consumer, CTraceModel openTrace);
 
 ROCKETMQCLIENT_API int SendMessageSync(CProducer* producer, CMessage* msg, CSendResult* result);
 ROCKETMQCLIENT_API int SendBatchMessage(CProducer* producer, CBatchMessage* msg, CSendResult* result);
diff --git a/include/CPushConsumer.h b/include/CPushConsumer.h
index e2bce18..5ed83f3 100644
--- a/include/CPushConsumer.h
+++ b/include/CPushConsumer.h
@@ -59,6 +59,7 @@ ROCKETMQCLIENT_API int SetPushConsumerLogLevel(CPushConsumer* consumer, CLogLeve
 ROCKETMQCLIENT_API int SetPushConsumerMessageModel(CPushConsumer* consumer, CMessageModel messageModel);
 ROCKETMQCLIENT_API int SetPushConsumerMaxCacheMessageSize(CPushConsumer* consumer, int maxCacheSize);
 ROCKETMQCLIENT_API int SetPushConsumerMaxCacheMessageSizeInMb(CPushConsumer* consumer, int maxCacheSizeInMb);
+ROCKETMQCLIENT_API int SetPushConsumerMessageTrace(CPushConsumer* consumer, CTraceModel openTrace);
 
 #ifdef __cplusplus
 }
diff --git a/include/TransactionMQProducer.h b/include/TransactionMQProducer.h
index f4c0281..784de6c 100644
--- a/include/TransactionMQProducer.h
+++ b/include/TransactionMQProducer.h
@@ -79,7 +79,8 @@ class ROCKETMQCLIENT_API TransactionMQProducer {
   void setLogLevel(elogLevel inputLevel);
   elogLevel getLogLevel();
   void setLogFileSizeAndNum(int fileNum, long perFileSize);  // perFileSize is MB unit
-
+  void setMessageTrace(bool messageTrace);
+  bool getMessageTrace() const;
   std::shared_ptr<TransactionListener> getTransactionListener();
   void setTransactionListener(TransactionListener* listener);
   TransactionSendResult sendMessageInTransaction(MQMessage& msg, void* arg);
diff --git a/src/extern/CProducer.cpp b/src/extern/CProducer.cpp
index 00a9b05..0b139e2 100644
--- a/src/extern/CProducer.cpp
+++ b/src/extern/CProducer.cpp
@@ -797,6 +797,24 @@ int SetProducerMaxMessageSize(CProducer* producer, int size) {
   }
   return OK;
 }
+int SetProducerMessageTrace(CProducer* producer, CTraceModel openTrace) {
+  if (producer == NULL) {
+    return NULL_POINTER;
+  }
+  DefaultProducer* defaultMQProducer = (DefaultProducer*)producer;
+  bool messageTrace = openTrace == OPEN ? true : false;
+  try {
+    if (CAPI_C_PRODUCER_TYPE_TRANSACTION == defaultMQProducer->producerType) {
+      defaultMQProducer->innerTransactionProducer->setMessageTrace(messageTrace);
+    } else {
+      defaultMQProducer->innerProducer->setMessageTrace(messageTrace);
+    }
+  } catch (exception& e) {
+    MQClientErrorContainer::setErr(string(e.what()));
+    return PRODUCER_START_FAILED;
+  }
+  return OK;
+}
 #ifdef __cplusplus
 };
 #endif
diff --git a/src/extern/CPushConsumer.cpp b/src/extern/CPushConsumer.cpp
index 5ee89ca..77da3af 100644
--- a/src/extern/CPushConsumer.cpp
+++ b/src/extern/CPushConsumer.cpp
@@ -296,7 +296,14 @@ int SetPushConsumerLogLevel(CPushConsumer* consumer, CLogLevel level) {
   ((DefaultMQPushConsumer*)consumer)->setLogLevel((elogLevel)level);
   return OK;
 }
-
+int SetPushConsumerMessageTrace(CPushConsumer* consumer, CTraceModel openTrace) {
+  if (consumer == NULL) {
+    return NULL_POINTER;
+  }
+  bool messageTrace = openTrace == OPEN ? true : false;
+  ((DefaultMQPushConsumer*)consumer)->setMessageTrace(messageTrace);
+  return OK;
+}
 #ifdef __cplusplus
 };
 #endif
diff --git a/src/producer/TransactionMQProducer.cpp b/src/producer/TransactionMQProducer.cpp
index c246a80..97ed1b3 100644
--- a/src/producer/TransactionMQProducer.cpp
+++ b/src/producer/TransactionMQProducer.cpp
@@ -160,7 +160,12 @@ void TransactionMQProducer::setUnitName(std::string unitName) {
 const std::string& TransactionMQProducer::getUnitName() const {
   return impl->getUnitName();
 }
-
+void TransactionMQProducer::setMessageTrace(bool messageTrace) {
+  impl->setMessageTrace(messageTrace);
+}
+bool TransactionMQProducer::getMessageTrace() const {
+  return impl->getMessageTrace();
+}
 std::shared_ptr<TransactionListener> TransactionMQProducer::getTransactionListener() {
   return impl->getTransactionListener();
 }
diff --git a/test/src/extern/CProducerTest.cpp b/test/src/extern/CProducerTest.cpp
index 7798fe6..9a5156f 100644
--- a/test/src/extern/CProducerTest.cpp
+++ b/test/src/extern/CProducerTest.cpp
@@ -228,6 +228,10 @@ TEST(cProducer, info) {
   EXPECT_EQ(SetProducerSessionCredentials(cProducer, "accessKey", "secretKey", "channel"), OK);
   SessionCredentials sessionCredentials = defaultMQProducer->getSessionCredentials();
   EXPECT_EQ(sessionCredentials.getAccessKey(), "accessKey");
+
+  EXPECT_EQ(SetProducerMessageTrace(cProducer, OPEN), OK);
+  EXPECT_EQ(defaultMQProducer->getMessageTrace(), true);
+
   Mock::AllowLeak(defaultMQProducer);
 }
 
diff --git a/test/src/extern/CPushConsumerTest.cpp b/test/src/extern/CPushConsumerTest.cpp
index 2eff2e7..6b516ab 100644
--- a/test/src/extern/CPushConsumerTest.cpp
+++ b/test/src/extern/CPushConsumerTest.cpp
@@ -118,6 +118,8 @@ TEST(cPushComsumer, info) {
   EXPECT_EQ(SetPushConsumerMessageModel(cpushConsumer, BROADCASTING), OK);
   EXPECT_EQ(mqPushConsumer->getMessageModel(), MessageModel::BROADCASTING);
 
+  EXPECT_EQ(SetPushConsumerMessageTrace(cpushConsumer, CLOSE), OK);
+  EXPECT_EQ(mqPushConsumer->getMessageTrace(), false);
   Mock::AllowLeak(mqPushConsumer);
 }