You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2018/10/30 07:32:20 UTC

[GitHub] duhengforever closed pull request #2: [FEATURE REQUEST #488]Export C-style APIs for RocketMQ client

duhengforever closed pull request #2: [FEATURE REQUEST #488]Export C-style APIs for RocketMQ client
URL: https://github.com/apache/rocketmq-client-cpp/pull/2
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/include/CCommon.h b/include/CCommon.h
index 8e511b5..c97c719 100644
--- a/include/CCommon.h
+++ b/include/CCommon.h
@@ -31,7 +31,15 @@ typedef enum {
     // Failed, null pointer value
     NULL_POINTER = 1,
 } CStatus;
-
+typedef enum {
+    E_LOG_LEVEL_FATAL = 1,
+    E_LOG_LEVEL_ERROR = 2,
+    E_LOG_LEVEL_WARN = 3,
+    E_LOG_LEVEL_INFO = 4,
+    E_LOG_LEVEL_DEBUG = 5,
+    E_LOG_LEVEL_TRACE = 6,
+    E_LOG_LEVEL_LEVEL_NUM = 7
+} CLogLevel;
 #ifdef __cplusplus
 };
 #endif
diff --git a/include/CMessage.h b/include/CMessage.h
index ba22196..c14e47e 100644
--- a/include/CMessage.h
+++ b/include/CMessage.h
@@ -34,6 +34,7 @@ int SetMessageKeys(CMessage *msg, const char *keys);
 int SetMessageBody(CMessage *msg, const char *body);
 int SetByteMessageBody(CMessage *msg, const char *body, int len);
 int SetMessageProperty(CMessage *msg, const char *key, const char *value);
+int SetDelayTimeLevel(CMessage *msg, int level);
 
 #ifdef __cplusplus
 };
diff --git a/include/CMessageExt.h b/include/CMessageExt.h
index ec96470..79df10a 100644
--- a/include/CMessageExt.h
+++ b/include/CMessageExt.h
@@ -32,6 +32,15 @@ const char *GetMessageKeys(CMessageExt *msgExt);
 const char *GetMessageBody(CMessageExt *msgExt);
 const char *GetMessageProperty(CMessageExt *msgExt, const char *key);
 const char *GetMessageId(CMessageExt *msgExt);
+int GetMessageDelayTimeLevel(CMessageExt *msgExt);
+int GetMessageQueueId(CMessageExt *msgExt);
+int GetMessageReconsumeTimes(CMessageExt *msgExt);
+int GetMessageStoreSize(CMessageExt *msgExt);
+long long GetMessageBornTimestamp(CMessageExt *msgExt);
+long long GetMessageStoreTimestamp(CMessageExt *msgExt);
+long long GetMessageQueueOffset(CMessageExt *msgExt);
+long long GetMessageCommitLogOffset(CMessageExt *msgExt);
+long long GetMessagePreparedTransactionOffset(CMessageExt *msgExt);
 
 #ifdef __cplusplus
 };
diff --git a/include/CProducer.h b/include/CProducer.h
index 442e729..ae8e16e 100644
--- a/include/CProducer.h
+++ b/include/CProducer.h
@@ -35,6 +35,16 @@ int StartProducer(CProducer *producer);
 int ShutdownProducer(CProducer *producer);
 
 int SetProducerNameServerAddress(CProducer *producer, const char *namesrv);
+int SetProducerGroupName(CProducer *producer, const char *groupName);
+int SetProducerInstanceName(CProducer *producer, const char *instanceName);
+int SetProducerSessionCredentials(CProducer *producer, const char *accessKey, const char *secretKey,
+                                  const char *onsChannel);
+int SetProducerLogPath(CProducer *producer, const char *logPath);
+int SetProducerLogFileNumAndSize(CProducer *producer, int fileNum, long fileSize);
+int SetProducerLogLevel(CProducer *producer, CLogLevel level);
+int SetProducerSendMsgTimeout(CProducer *producer, int timeout);
+int SetProducerCompressLevel(CProducer *producer, int level);
+int SetProducerMaxMessageSize(CProducer *producer, int size);
 
 int SendMessageSync(CProducer *producer, CMessage *msg, CSendResult *result);
 
diff --git a/include/CPullConsumer.h b/include/CPullConsumer.h
index 99df8ae..8413e3f 100644
--- a/include/CPullConsumer.h
+++ b/include/CPullConsumer.h
@@ -36,7 +36,13 @@ int ShutdownPullConsumer(CPullConsumer *consumer);
 int SetPullConsumerGroupID(CPullConsumer *consumer, const char *groupId);
 const char *GetPullConsumerGroupID(CPullConsumer *consumer);
 int SetPullConsumerNameServerAddress(CPullConsumer *consumer, const char *namesrv);
-int fetchSubscribeMessageQueues(CPullConsumer *consumer, const char *topic, vector<CMessageQueue> &mqs);
+int SetPullConsumerSessionCredentials(CPullConsumer *consumer, const char *accessKey, const char *secretKey,
+                                     const char *channel);
+int SetPullConsumerLogPath(CPullConsumer *consumer, const char *logPath);
+int SetPullConsumerLogFileNumAndSize(CPullConsumer *consumer, int fileNum, long fileSize);
+int SetPullConsumerLogLevel(CPullConsumer *consumer, CLogLevel level);
+
+int fetchSubscribeMessageQueues(CPullConsumer *consumer, const char *topic, CMessageQueue *mqs , int size);
 CPullResult pull(const CMessageQueue *mq, const char *subExpression, long long offset, int maxNums);
 
 #ifdef __cplusplus
diff --git a/include/CPullResult.h b/include/CPullResult.h
index 3ab3ca7..7d234c3 100644
--- a/include/CPullResult.h
+++ b/include/CPullResult.h
@@ -20,7 +20,6 @@
 
 #include "CCommon.h"
 #include "CMessageExt.h"
-using namespace std;
 
 #ifdef __cplusplus
 extern "C" {
@@ -39,7 +38,8 @@ typedef struct _CPullResult_ {
     long long  nextBeginOffset;
     long long  minOffset;
     long long  maxOffset;
-    vector<CMessageExt> msgFoundList;
+    CMessageExt* msgFoundList;
+    int size;
 } CPullResult;
 
 #ifdef __cplusplus
diff --git a/include/CPushConsumer.h b/include/CPushConsumer.h
index 771e6d4..b7e7efb 100644
--- a/include/CPushConsumer.h
+++ b/include/CPushConsumer.h
@@ -19,6 +19,7 @@
 #define __C_PUSH_CONSUMER_H__
 
 #include "CMessageExt.h"
+#include "CCommon.h"
 
 #ifdef __cplusplus
 extern "C" {
@@ -44,9 +45,14 @@ const char *GetPushConsumerGroupID(CPushConsumer *consumer);
 int SetPushConsumerNameServerAddress(CPushConsumer *consumer, const char *namesrv);
 int Subscribe(CPushConsumer *consumer, const char *topic, const char *expression);
 int RegisterMessageCallback(CPushConsumer *consumer, MessageCallBack pCallback);
-int SetPushConsumeThreadCount(CPushConsumer *consumer, int threadCount);
-int SetPushConsumeMessageBatchMaxSize(CPushConsumer *consumer, int batchSize);
-
+int SetPushConsumerThreadCount(CPushConsumer *consumer, int threadCount);
+int SetPushConsumerMessageBatchMaxSize(CPushConsumer *consumer, int batchSize);
+int SetPushConsumerInstanceName(CPushConsumer *consumer, const char *instanceName);
+int SetPushConsumerSessionCredentials(CPushConsumer *consumer, const char *accessKey, const char *secretKey,
+                                  const char *channel);
+int SetPushConsumerLogPath(CPushConsumer *consumer, const char *logPath);
+int SetPushConsumerLogFileNumAndSize(CPushConsumer *consumer, int fileNum, long fileSize);
+int SetPushConsumerLogLevel(CPushConsumer *consumer, CLogLevel level);
 #ifdef __cplusplus
 };
 #endif
diff --git a/src/extern/CMessage.cpp b/src/extern/CMessage.cpp
index 0a14c8c..4106c27 100644
--- a/src/extern/CMessage.cpp
+++ b/src/extern/CMessage.cpp
@@ -82,6 +82,13 @@ int SetMessageProperty(CMessage *msg, const char *key, const char *value) {
     ((MQMessage *)msg)->setProperty(key,value);
     return OK;
 }
+int SetDelayTimeLevel(CMessage *msg, int level){
+    if (msg == NULL) {
+        return NULL_POINTER;
+    }
+    ((MQMessage *)msg)->setDelayTimeLevel(level);
+    return OK;
+}
 
 #ifdef __cplusplus
 };
diff --git a/src/extern/CMessageExt.cpp b/src/extern/CMessageExt.cpp
index 7fca447..e854ce4 100644
--- a/src/extern/CMessageExt.cpp
+++ b/src/extern/CMessageExt.cpp
@@ -17,6 +17,7 @@
 
 #include "MQMessageExt.h"
 #include "CMessageExt.h"
+#include "CCommon.h"
 
 #ifdef __cplusplus
 extern "C" {
@@ -58,6 +59,69 @@ const char *GetMessageId(CMessageExt *msg) {
     }
     return ((MQMessageExt *) msg)->getMsgId().c_str();
 }
+
+int GetMessageDelayTimeLevel(CMessageExt *msg){
+    if (msg == NULL) {
+        return NULL_POINTER;
+    }
+    return ((MQMessageExt *) msg)->getDelayTimeLevel();
+}
+
+int GetMessageQueueId(CMessageExt *msg){
+    if (msg == NULL) {
+        return NULL_POINTER;
+    }
+    return ((MQMessageExt *) msg)->getQueueId();
+}
+
+int GetMessageReconsumeTimes(CMessageExt *msg){
+    if (msg == NULL) {
+        return NULL_POINTER;
+    }
+    return ((MQMessageExt *) msg)->getReconsumeTimes();
+}
+
+int GetMessageStoreSize(CMessageExt *msg){
+    if (msg == NULL) {
+        return NULL_POINTER;
+    }
+    return ((MQMessageExt *) msg)->getStoreSize();
+}
+
+long long GetMessageBornTimestamp(CMessageExt *msg) {
+    if (msg == NULL) {
+        return NULL_POINTER;
+    }
+    return ((MQMessageExt *) msg)->getBornTimestamp();
+}
+
+long long GetMessageStoreTimestamp(CMessageExt *msg){
+    if (msg == NULL) {
+        return NULL_POINTER;
+    }
+    return ((MQMessageExt *) msg)->getBornTimestamp();
+}
+
+long long GetMessageQueueOffset(CMessageExt *msg){
+    if (msg == NULL) {
+        return NULL_POINTER;
+    }
+    return ((MQMessageExt *) msg)->getQueueOffset();
+}
+
+long long GetMessageCommitLogOffset(CMessageExt *msg){
+    if (msg == NULL) {
+        return NULL_POINTER;
+    }
+    return ((MQMessageExt *) msg)->getCommitLogOffset();
+}
+
+long long GetMessagePreparedTransactionOffset(CMessageExt *msg){
+    if (msg == NULL) {
+        return NULL_POINTER;
+    }
+    return ((MQMessageExt *) msg)->getPreparedTransactionOffset();
+}
 #ifdef __cplusplus
 };
 #endif
diff --git a/src/extern/CProducer.cpp b/src/extern/CProducer.cpp
index 088b63d..4ac683b 100644
--- a/src/extern/CProducer.cpp
+++ b/src/extern/CProducer.cpp
@@ -39,21 +39,21 @@ int DestroyProducer(CProducer *pProducer) {
     delete reinterpret_cast<DefaultMQProducer * >(pProducer);
     return OK;
 }
-int StartProducer( CProducer *producer) {
+int StartProducer(CProducer *producer) {
     if (producer == NULL) {
         return NULL_POINTER;
     }
     ((DefaultMQProducer *) producer)->start();
     return OK;
 }
-int ShutdownProducer( CProducer *producer) {
+int ShutdownProducer(CProducer *producer) {
     if (producer == NULL) {
         return NULL_POINTER;
     }
     ((DefaultMQProducer *) producer)->shutdown();
     return OK;
 }
-int SetProducerNameServerAddress( CProducer *producer, const char *namesrv) {
+int SetProducerNameServerAddress(CProducer *producer, const char *namesrv) {
     if (producer == NULL) {
         return NULL_POINTER;
     }
@@ -61,7 +61,7 @@ int SetProducerNameServerAddress( CProducer *producer, const char *namesrv) {
     return OK;
 }
 
-int SendMessageSync( CProducer *producer, CMessage *msg, CSendResult *result) {
+int SendMessageSync(CProducer *producer, CMessage *msg, CSendResult *result) {
     //CSendResult sendResult;
     if (producer == NULL || msg == NULL || result == NULL) {
         return NULL_POINTER;
@@ -93,6 +93,84 @@ int SendMessageSync( CProducer *producer, CMessage *msg, CSendResult *result) {
     return OK;
 }
 
+int SetProducerGroupName(CProducer *producer, const char *groupName) {
+    if (producer == NULL) {
+        return NULL_POINTER;
+    }
+    ((DefaultMQProducer *) producer)->setGroupName(groupName);
+    return OK;
+}
+int SetProducerInstanceName(CProducer *producer, const char *instanceName) {
+    if (producer == NULL) {
+        return NULL_POINTER;
+    }
+    ((DefaultMQProducer *) producer)->setGroupName(instanceName);
+    return OK;
+}
+int SetProducerSessionCredentials(CProducer *producer, const char *accessKey, const char *secretKey,
+                                  const char *onsChannel) {
+    if (producer == NULL) {
+        return NULL_POINTER;
+    }
+    ((DefaultMQProducer *) producer)->setSessionCredentials(accessKey, secretKey, onsChannel);
+    return OK;
+}
+int SetProducerLogPath(CProducer *producer, const char *logPath) {
+    if (producer == NULL) {
+        return NULL_POINTER;
+    }
+    //Todo, This api should be implemented by core api.
+    //((DefaultMQProducer *) producer)->setLogFileSizeAndNum(3, 102400000);
+    return OK;
+}
+
+int SetProducerLogFileNumAndSize(CProducer *producer, int fileNum, long fileSize) {
+    if (producer == NULL) {
+        return NULL_POINTER;
+    }
+    ((DefaultMQProducer *) producer)->setLogFileSizeAndNum(fileNum, fileSize);
+    return OK;
+}
+
+int SetProducerLogLevel(CProducer *producer, CLogLevel level) {
+    if (producer == NULL) {
+        return NULL_POINTER;
+    }
+    ((DefaultMQProducer *) producer)->setLogLevel((elogLevel) level);
+    return OK;
+}
+
+int SetProducerSendMsgTimeout(CProducer *producer, int timeout) {
+    if (producer == NULL) {
+        return NULL_POINTER;
+    }
+    ((DefaultMQProducer *) producer)->setSendMsgTimeout(timeout);
+    return OK;
+}
+
+int SetProducerCompressMsgBodyOverHowmuch(CProducer *producer, int howmuch) {
+    if (producer == NULL) {
+        return NULL_POINTER;
+    }
+    ((DefaultMQProducer *) producer)->setCompressMsgBodyOverHowmuch(howmuch);
+    return OK;
+}
+
+int SetProducerCompressLevel(CProducer *producer, int level) {
+    if (producer == NULL) {
+        return NULL_POINTER;
+    }
+    ((DefaultMQProducer *) producer)->setCompressLevel(level);
+    return OK;
+}
+
+int SetProducerMaxMessageSize(CProducer *producer, int size) {
+    if (producer == NULL) {
+        return NULL_POINTER;
+    }
+    ((DefaultMQProducer *) producer)->setMaxMessageSize(size);
+    return OK;
+}
 #ifdef __cplusplus
 };
 #endif
diff --git a/src/extern/CPullConsumer.cpp b/src/extern/CPullConsumer.cpp
index 73583b3..aff591f 100644
--- a/src/extern/CPullConsumer.cpp
+++ b/src/extern/CPullConsumer.cpp
@@ -19,7 +19,6 @@
 #include "CMessageExt.h"
 #include "CPullConsumer.h"
 #include "CCommon.h"
-#include <map>
 
 using namespace rocketmq;
 
@@ -76,6 +75,53 @@ int SetPullConsumerNameServerAddress(CPullConsumer *consumer, const char *namesr
     ((DefaultMQPullConsumer *) consumer)->setNamesrvAddr(namesrv);
     return OK;
 }
+int SetPullConsumerSessionCredentials(CPullConsumer *consumer, const char *accessKey, const char *secretKey,
+                                     const char *channel) {
+    if (consumer == NULL) {
+        return NULL_POINTER;
+    }
+    ((DefaultMQPullConsumer *) consumer)->setSessionCredentials(accessKey, secretKey, channel);
+    return OK;
+}
+
+int SetPullConsumerLogPath(CPullConsumer *consumer, const char *logPath) {
+    if (consumer == NULL) {
+        return NULL_POINTER;
+    }
+    //Todo, This api should be implemented by core api.
+    //((DefaultMQPullConsumer *) consumer)->setInstanceName(instanceName);
+    return OK;
+}
+
+int SetPullConsumerLogFileNumAndSize(CPullConsumer *consumer, int fileNum, long fileSize) {
+    if (consumer == NULL) {
+        return NULL_POINTER;
+    }
+    ((DefaultMQPullConsumer *) consumer)->setLogFileSizeAndNum(fileNum,fileSize);
+    return OK;
+}
+
+int SetPullConsumerLogLevel(CPullConsumer *consumer, CLogLevel level) {
+    if (consumer == NULL) {
+        return NULL_POINTER;
+    }
+    ((DefaultMQPullConsumer *) consumer)->setLogLevel((elogLevel)level);
+    return OK;
+}
+
+int fetchSubscribeMessageQueues(CPullConsumer *consumer, const char *topic, CMessageQueue *mqs , int size){
+    if (consumer == NULL) {
+        return NULL_POINTER;
+    }
+    //ToDo, Add implement
+    return OK;
+}
+CPullResult pull(const CMessageQueue *mq, const char *subExpression, long long offset, int maxNums){
+    CPullResult pullResult ;
+    memset(&pullResult,0, sizeof(CPullResult));
+    //ToDo, Add implement
+    return pullResult;
+}
 
 #ifdef __cplusplus
 };
diff --git a/src/extern/CPushConsumer.cpp b/src/extern/CPushConsumer.cpp
index e378122..5c9d5da 100644
--- a/src/extern/CPushConsumer.cpp
+++ b/src/extern/CPushConsumer.cpp
@@ -23,6 +23,7 @@
 
 using namespace rocketmq;
 using namespace std;
+
 class MessageListenerInner : public MessageListenerConcurrently {
 public:
     MessageListenerInner() {}
@@ -141,20 +142,63 @@ int UnregisterMessageCallback(CPushConsumer *consumer) {
     }
     return OK;
 }
-int SetPushConsumeThreadCount(CPushConsumer *consumer, int threadCount) {
+int SetPushConsumerThreadCount(CPushConsumer *consumer, int threadCount) {
     if (consumer == NULL || threadCount == 0) {
         return NULL_POINTER;
     }
     ((DefaultMQPushConsumer *) consumer)->setConsumeThreadCount(threadCount);
     return OK;
 }
-int SetPushConsumeMessageBatchMaxSize(CPushConsumer *consumer, int batchSize) {
+int SetPushConsumerMessageBatchMaxSize(CPushConsumer *consumer, int batchSize) {
     if (consumer == NULL || batchSize == 0) {
         return NULL_POINTER;
     }
     ((DefaultMQPushConsumer *) consumer)->setConsumeMessageBatchMaxSize(batchSize);
     return OK;
 }
+
+int SetPushConsumerInstanceName(CPushConsumer *consumer, const char *instanceName) {
+    if (consumer == NULL) {
+        return NULL_POINTER;
+    }
+    ((DefaultMQPushConsumer *) consumer)->setInstanceName(instanceName);
+    return OK;
+}
+
+int SetPushConsumerSessionCredentials(CPushConsumer *consumer, const char *accessKey, const char *secretKey,
+                                     const char *channel) {
+    if (consumer == NULL) {
+        return NULL_POINTER;
+    }
+    ((DefaultMQPushConsumer *) consumer)->setSessionCredentials(accessKey, secretKey, channel);
+    return OK;
+}
+
+int SetPushConsumerLogPath(CPushConsumer *consumer, const char *logPath) {
+    if (consumer == NULL) {
+        return NULL_POINTER;
+    }
+    //Todo, This api should be implemented by core api.
+    //((DefaultMQPushConsumer *) consumer)->setInstanceName(instanceName);
+    return OK;
+}
+
+int SetPushConsumerLogFileNumAndSize(CPushConsumer *consumer, int fileNum, long fileSize) {
+    if (consumer == NULL) {
+        return NULL_POINTER;
+    }
+    ((DefaultMQPushConsumer *) consumer)->setLogFileSizeAndNum(fileNum,fileSize);
+    return OK;
+}
+
+int SetPushConsumerLogLevel(CPushConsumer *consumer, CLogLevel level) {
+    if (consumer == NULL) {
+        return NULL_POINTER;
+    }
+    ((DefaultMQPushConsumer *) consumer)->setLogLevel((elogLevel)level);
+    return OK;
+}
+
 #ifdef __cplusplus
 };
 #endif


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services