You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2018/11/22 07:08:56 UTC

[GitHub] vongosling closed pull request #12: Add C-Style API RegisterMessageCallbackOrderly and UnregisterMessageCallbackOrderly

vongosling closed pull request #12: Add C-Style  API RegisterMessageCallbackOrderly and UnregisterMessageCallbackOrderly
URL: https://github.com/apache/rocketmq-client-cpp/pull/12
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/include/CPushConsumer.h b/include/CPushConsumer.h
index 17d82096..b63a9f5a 100644
--- a/include/CPushConsumer.h
+++ b/include/CPushConsumer.h
@@ -35,7 +35,6 @@ typedef enum E_CConsumeStatus{
 
 typedef int(*MessageCallBack)(CPushConsumer *, CMessageExt *);
 
-
 CPushConsumer *CreatePushConsumer(const char *groupId);
 int DestroyPushConsumer(CPushConsumer *consumer);
 int StartPushConsumer(CPushConsumer *consumer);
@@ -45,6 +44,9 @@ const char *GetPushConsumerGroupID(CPushConsumer *consumer);
 int SetPushConsumerNameServerAddress(CPushConsumer *consumer, const char *namesrv);
 int Subscribe(CPushConsumer *consumer, const char *topic, const char *expression);
 int RegisterMessageCallback(CPushConsumer *consumer, MessageCallBack pCallback);
+int RegisterMessageCallbackOrderly(CPushConsumer *consumer, MessageCallBack pCallback);
+int UnregisterMessageCallbackOrderly(CPushConsumer *consumer);
+int UnregisterMessageCallback(CPushConsumer *consumer);
 int SetPushConsumerThreadCount(CPushConsumer *consumer, int threadCount);
 int SetPushConsumerMessageBatchMaxSize(CPushConsumer *consumer, int batchSize);
 int SetPushConsumerInstanceName(CPushConsumer *consumer, const char *instanceName);
diff --git a/src/extern/CPushConsumer.cpp b/src/extern/CPushConsumer.cpp
index 5c9d5da1..1738b5f2 100644
--- a/src/extern/CPushConsumer.cpp
+++ b/src/extern/CPushConsumer.cpp
@@ -54,8 +54,31 @@ class MessageListenerInner : public MessageListenerConcurrently {
     CPushConsumer *m_pconsumer;
 };
 
-map<CPushConsumer *, MessageListenerInner *> g_ListenerMap;
+class MessageListenerOrderlyInner:public MessageListenerOrderly {
+public:
+    MessageListenerOrderlyInner(CPushConsumer *consumer, MessageCallBack pCallback){
+        m_pconsumer = consumer;
+        m_pMsgReceiveCallback = pCallback;
+    }
+    ConsumeStatus consumeMessage(const std::vector<MQMessageExt> &msgs) {
+        if (m_pMsgReceiveCallback == NULL) {
+            return RECONSUME_LATER;
+        }
+        for (size_t i = 0; i < msgs.size(); ++i) {
+            MQMessageExt *msg = const_cast<MQMessageExt *>(&msgs[i]);
+            CMessageExt *message = (CMessageExt *) (msg);
+            if (m_pMsgReceiveCallback(m_pconsumer, message) != E_CONSUME_SUCCESS)
+                return RECONSUME_LATER;
+        }
+        return CONSUME_SUCCESS;
+    }
+private:
+    MessageCallBack m_pMsgReceiveCallback;
+    CPushConsumer *m_pconsumer;
+};
 
+map<CPushConsumer *, MessageListenerInner *> g_ListenerMap;
+map<CPushConsumer *, MessageListenerOrderlyInner *> g_OrderListenerMap;
 #ifdef __cplusplus
 extern "C" {
 #endif
@@ -117,6 +140,7 @@ int Subscribe(CPushConsumer *consumer, const char *topic, const char *expression
     ((DefaultMQPushConsumer *) consumer)->subscribe(topic, expression);
     return OK;
 }
+
 int RegisterMessageCallback(CPushConsumer *consumer, MessageCallBack pCallback) {
     if (consumer == NULL || pCallback == NULL) {
         return NULL_POINTER;
@@ -126,6 +150,33 @@ int RegisterMessageCallback(CPushConsumer *consumer, MessageCallBack pCallback)
     g_ListenerMap[consumer] = listenerInner;
     return OK;
 }
+
+int RegisterMessageCallbackOrderly(CPushConsumer *consumer, MessageCallBack pCallback) {
+    if(consumer == NULL || pCallback == NULL){
+        return NULL_POINTER;
+    }
+    MessageListenerOrderlyInner *messageListenerOrderlyInner = new MessageListenerOrderlyInner(consumer,pCallback);
+    ((DefaultMQPushConsumer *) consumer)->registerMessageListener(messageListenerOrderlyInner);
+    g_OrderListenerMap[consumer] = messageListenerOrderlyInner;
+}
+
+
+int UnregisterMessageCallbackOrderly(CPushConsumer *consumer) {
+    if (consumer == NULL) {
+            return NULL_POINTER;
+    }
+    map<CPushConsumer *,MessageListenerOrderlyInner *>::iterator iter;
+    iter = g_OrderListenerMap.find(consumer);
+    if(iter != g_OrderListenerMap.end()){
+        MessageListenerOrderlyInner *listenerInner = iter->second;
+        if(listenerInner != NULL){
+            delete listenerInner;
+        }
+        g_OrderListenerMap.erase(iter);
+    }
+    return OK;
+}
+
 int UnregisterMessageCallback(CPushConsumer *consumer) {
     if (consumer == NULL) {
         return NULL_POINTER;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services