You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2019/01/03 08:09:22 UTC

[GitHub] ShannonDing closed pull request #20: Fixed deadlock, and add args field for python callback.

ShannonDing closed pull request #20: Fixed deadlock, and add args field for python callback.
URL: https://github.com/apache/rocketmq-client-python/pull/20
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/.gitignore b/.gitignore
index 990936c..b232131 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,2 +1,8 @@
 .idea/
-cmake-build-debug/
+cmake-build-*/
+
+*.pyc
+*.so
+
+bin/
+
diff --git a/doc/Introduction.md b/doc/Introduction.md
index 371ad97..628079e 100644
--- a/doc/Introduction.md
+++ b/doc/Introduction.md
@@ -73,8 +73,10 @@
 ----------
 ## How to use
 - set LD_LIBRARY_PATH
-  ``````
+  ```
   export LD_LIBRARY_PATH=/usr/local/lib
+  ```
+  
 - import module
   ```
   from librocketmqclientpython import *
@@ -90,28 +92,28 @@
 - producer must invoke following interface:
   ```
   - producer = CreateProducer("please_rename_unique_group_name");
-  - SetProducerNameServerAddress(producer,"please_rename_unique_name_server")
+  - SetProducerNameServerAddress(producer, "please_rename_unique_name_server")
   - StartProducer(producer)
-  - SendMessageSync(producer,msg)
+  - SendMessageSync(producer, msg)
   - ShutdownProducer(producer)
   - DestroyProducer(producer)
   ```
 - how to consumer messages
   ```
-  - def consumerMessage(msg):
-  - topic = GetMessageTopic(msg)
-  - body = GetMessageBody(msg)
-  - tags = GetMessageTags(msg)
-  - msgid = GetMessageId(msg)
-  - handle message
-  - return 0
+  - def consumerMessage(msg, args):
+  -     topic = GetMessageTopic(msg)
+  -     body = GetMessageBody(msg)
+  -     tags = GetMessageTags(msg)
+  -     msgid = GetMessageId(msg)
+  -     # handle message...
+  -     return 0
   ```
 - pushconsumer must invoke following interface:
   ```
   - consumer = CreatePushConsumer("please_rename_unique_group_name_1");
-  - SetPushConsumerNameServerAddress(consumer,"please_rename_unique_name_server")
+  - SetPushConsumerNameServerAddress(consumer, "please_rename_unique_name_server")
   - Subscribe(consumer, "your_topic", "*")
-  - RegisterMessageCallback(consumer,consumerMessage)
+  - RegisterMessageCallback(consumer, consumerMessage, args)
   - StartPushConsumer(consumer)
   - ShutdownPushConsumer(consumer)
   - DestroyPushConsumer(consumer)
@@ -122,3 +124,4 @@
   - python testProducer.py
 - push consumer
   - python testConsumer.py
+
diff --git a/doc/api-doc/consumer-push.md b/doc/api-doc/consumer-push.md
index a783e47..6ee498e 100644
--- a/doc/api-doc/consumer-push.md
+++ b/doc/api-doc/consumer-push.md
@@ -31,13 +31,14 @@
     topic: topic name
     tag: topic tag
 
-* RegisterMessageCallback(consumer, pyCallBack) <br />
+* RegisterMessageCallback(consumer, pyCallBack, pyArgs) <br />
   - function description<br />
     set callback for push consumer instance <br />
 
   - input <br />
     consumer: consumer intance<br />
-    pyCallBack: py callback method. when message pulled, they would be send to a pyCallback method
+    pyCallBack: py callback method. when message pulled, they would be send to a pyCallback method<br />
+    pyArgs: the arguments will be passed to pyCallBack
 
 * SetPushConsumerThreadCount(consumer, threadCount)
   - function description<br />
diff --git a/sample/testConsumer.py b/sample/testConsumer.py
index 93665ed..03ca587 100644
--- a/sample/testConsumer.py
+++ b/sample/testConsumer.py
@@ -18,8 +18,10 @@
 import base
 import time
 from librocketmqclientpython import *
+
 totalMsg = 0
-def consumerMessage(msg):
+
+def consumerMessage(msg, args):
      global totalMsg
      totalMsg += 1
      print(">>ConsumerMessage Called:",totalMsg)
@@ -33,11 +35,12 @@ def consumerMessage(msg):
 
 consumer = CreatePushConsumer("awtTest_Producer_Python_Test")
 print(consumer)
-SetPushConsumerNameServerAddress(consumer,"172.17.0.2:9876")
-SetPushConsumerThreadCount(consumer,1)
+SetPushConsumerNameServerAddress(consumer, "172.17.0.2:9876")
+SetPushConsumerThreadCount(consumer, 1)
 Subscribe(consumer, "T_TestTopic", "*")
-RegisterMessageCallback(consumer,consumerMessage)
+RegisterMessageCallback(consumer, consumerMessage, None)
 StartPushConsumer(consumer)
+
 i = 1
 while i <= 60:
     print(i)
diff --git a/src/PythonWrapper.cpp b/src/PythonWrapper.cpp
index 8c7c1e2..919ba5b 100644
--- a/src/PythonWrapper.cpp
+++ b/src/PythonWrapper.cpp
@@ -30,24 +30,37 @@ using namespace std;
 const char *VERSION =
         "PYTHON_CLIENT_VERSION: " PYTHON_CLIENT_VERSION ", BUILD DATE: " PYCLI_BUILD_DATE " ";
 
-map<CPushConsumer *, PyObject *> g_CallBackMap;
+map<CPushConsumer *, pair<PyObject *, object>> g_CallBackMap;
 
 class PyThreadStateLock {
 public:
-    PyThreadStateLock(void) {
+    PyThreadStateLock() {
         state = PyGILState_Ensure();
     }
 
-    ~PyThreadStateLock(void) {
-        if (state == PyGILState_LOCKED) {
-            PyGILState_Release(state);
-        }
+    ~PyThreadStateLock() {
+        // NOTE: must paired with PyGILState_Ensure, otherwise it will cause deadlock!!!
+        PyGILState_Release(state);
     }
 
 private:
     PyGILState_STATE state;
 };
 
+class PyThreadStateUnlock {
+public:
+    PyThreadStateUnlock() : _save(NULL) {
+        Py_UNBLOCK_THREADS
+    }
+
+    ~PyThreadStateUnlock() {
+        Py_BLOCK_THREADS
+    }
+
+private:
+    PyThreadState *_save;
+};
+
 #ifdef __cplusplus
 extern "C" {
 #endif
@@ -146,22 +159,25 @@ const char *PyGetSendResultMsgID(CSendResult &sendResult) {
 }
 //consumer
 void *PyCreatePushConsumer(const char *groupId) {
-    //Py_Initialize();
-    PyEval_InitThreads();
-//    PyEval_ReleaseThread(PyThreadState_Get());
+    PyEval_InitThreads();  // ensure create GIL, for call Python callback from C.
     return (void *) CreatePushConsumer(groupId);
 }
 int PyDestroyPushConsumer(void *consumer) {
-    return DestroyPushConsumer((CPushConsumer *) consumer);
+    CPushConsumer *consumerInner = (CPushConsumer *) consumer;
+    map<CPushConsumer *, pair<PyObject *, object>>::iterator iter;
+    iter = g_CallBackMap.find(consumerInner);
+    if (iter != g_CallBackMap.end()) {
+        UnregisterMessageCallback(consumerInner);
+        g_CallBackMap.erase(iter);
+    }
+    return DestroyPushConsumer(consumerInner);
 }
 int PyStartPushConsumer(void *consumer) {
     return StartPushConsumer((CPushConsumer *) consumer);
 }
 int PyShutdownPushConsumer(void *consumer) {
-    int ret = ShutdownPushConsumer((CPushConsumer *) consumer);
-    //PyGILState_Ensure();
-    //Py_Finalize();
-    return ret;
+    PyThreadStateUnlock PyThreadUnlock;  // ShutdownPushConsumer is a block call, ensure thread don't hold GIL.
+    return ShutdownPushConsumer((CPushConsumer *) consumer);
 }
 int PySetPushConsumerNameServerAddress(void *consumer, const char *namesrv) {
     return SetPushConsumerNameServerAddress((CPushConsumer *) consumer, namesrv);
@@ -172,29 +188,27 @@ int PySetPushConsumerNameServerDomain(void *consumer, const char *domain){
 int PySubscribe(void *consumer, const char *topic, const char *expression) {
     return Subscribe((CPushConsumer *) consumer, topic, expression);
 }
-int PyRegisterMessageCallback(void *consumer, PyObject *pCallback) {
+int PyRegisterMessageCallback(void *consumer, PyObject *pCallback, object args) {
     CPushConsumer *consumerInner = (CPushConsumer *) consumer;
-    g_CallBackMap[consumerInner] = pCallback;
+    g_CallBackMap[consumerInner] = make_pair(pCallback, std::move(args));
     return RegisterMessageCallback(consumerInner, &PythonMessageCallBackInner);
 }
 
 int PythonMessageCallBackInner(CPushConsumer *consumer, CMessageExt *msg) {
-
-    class PyThreadStateLock PyThreadLock;
-    PyMessageExt message;
-    message.pMessageExt = msg;
-    map<CPushConsumer *, PyObject *>::iterator iter;
+    PyThreadStateLock PyThreadLock;  // ensure hold GIL, before call python callback
+    PyMessageExt message = { .pMessageExt = msg };
+    map<CPushConsumer *, pair<PyObject *, object>>::iterator iter;
     iter = g_CallBackMap.find(consumer);
     if (iter != g_CallBackMap.end()) {
-        PyObject * pCallback = iter->second;
+        pair<PyObject *, object> callback = iter->second;
+        PyObject * pCallback = callback.first;
+        object& args = callback.second;
         if (pCallback != NULL) {
-            int status =
-                    boost::python::call<int>(pCallback, message);
+            int status = boost::python::call<int>(pCallback, message, args);
             return status;
         }
     }
     return 1;
-
 }
 
 int PySetPushConsumerThreadCount(void *consumer, int threadCount) {
@@ -212,7 +226,7 @@ int PySetPushConsumerSessionCredentials(void *consumer, const char *accessKey, c
 }
 
 //push consumer
-int PySetPullConsumerNameServerDomain(void *consumer, const char *domain){
+int PySetPullConsumerNameServerDomain(void *consumer, const char *domain) {
     return SetPullConsumerNameServerDomain((CPullConsumer *) consumer, domain);
 }
 //version
diff --git a/src/PythonWrapper.h b/src/PythonWrapper.h
index 324676b..2ad8255 100644
--- a/src/PythonWrapper.h
+++ b/src/PythonWrapper.h
@@ -89,7 +89,7 @@ int PyShutdownPushConsumer(void *consumer);
 int PySetPushConsumerNameServerAddress(void *consumer, const char *namesrv);
 int PySetPushConsumerNameServerDomain(void *consumer, const char *domain);
 int PySubscribe(void *consumer, const char *topic, const char *expression);
-int PyRegisterMessageCallback(void *consumer, PyObject *pCallback);
+int PyRegisterMessageCallback(void *consumer, PyObject *pCallback, object args);
 int PythonMessageCallBackInner(CPushConsumer *consumer, CMessageExt *msg);
 int PySetPushConsumerThreadCount(void *consumer, int threadCount);
 int PySetPushConsumerMessageBatchMaxSize(void *consumer, int batchSize);
diff --git a/test/TestConsumeMessages.py b/test/TestConsumeMessages.py
index 6b9f6d2..da4b0b6 100644
--- a/test/TestConsumeMessages.py
+++ b/test/TestConsumeMessages.py
@@ -34,7 +34,7 @@ def sigint_handler(signum, frame):
     sys.exit(0)
 
 
-def consumer_message(msg):
+def consumer_message(msg, args):
     global totalMsg
     totalMsg += 1
     print 'total count %d' % totalMsg
@@ -55,7 +55,7 @@ def init_producer(_group, _topic, _tag):
     SetPushConsumerNameServerAddress(consumer, name_srv)
     SetPushConsumerThreadCount(consumer, 1)
     Subscribe(consumer, _topic, _tag)
-    RegisterMessageCallback(consumer, consumerMessage)
+    RegisterMessageCallback(consumer, consumer_message, None)
     StartPushConsumer(consumer)
     print 'consumer is ready...'
     return consumer


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services