You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by vo...@apache.org on 2020/10/19 01:27:38 UTC
[rocketmq-client-python] branch master updated: Add a function
which shows how to use rocketmq in multi-threaded scenarios properly to
handle exception such as Namer Server Cluster and Broker Cluster restart
This is an automated email from the ASF dual-hosted git repository.
vongosling pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-python.git
The following commit(s) were added to refs/heads/master by this push:
new 89ad6bd Add a function which shows how to use rocketmq in multi-threaded scenarios properly to handle exception such as Namer Server Cluster and Broker Cluster restart
new 48f7cf9 Merge pull request #100 from tom0392/multi-threaded
89ad6bd is described below
commit 89ad6bde4351ecafa582770a3826ac9c660134b9
Author: tangzhongrui <ta...@cmss.chinamobile.com>
AuthorDate: Sat Oct 17 10:02:49 2020 +0800
Add a function which shows how to use rocketmq in multi-threaded scenarios properly to handle exception such as Namer Server Cluster and Broker Cluster restart
---
samples/producer.py | 35 ++++++++++++++++++++++++++++++++++-
1 file changed, 34 insertions(+), 1 deletion(-)
diff --git a/samples/producer.py b/samples/producer.py
index fb90b7b..f69534c 100644
--- a/samples/producer.py
+++ b/samples/producer.py
@@ -19,11 +19,12 @@
from rocketmq.client import Producer, Message, TransactionMQProducer, TransactionStatus
import time
+import threading
topic = 'TopicTest'
gid = 'test'
name_srv = '127.0.0.1:9876'
-
+MUTEX = threading.Lock()
def create_message():
msg = Message(topic)
@@ -46,6 +47,38 @@ def send_message_sync(count):
producer.shutdown()
+def send_message_multi_threaded(retry_time):
+ producer = Producer(gid)
+ producer.set_name_server_address(name_srv)
+ msg = create_message()
+
+ global MUTEX
+ MUTEX.acquire()
+ try:
+ producer.start()
+ except Exception as e:
+ print('ProducerStartFailed:', e)
+ MUTEX.release()
+ return
+
+ try:
+ for i in range(retry_time):
+ ret = producer.send_sync(msg)
+ if ret.status == 0:
+ print('send message status: ' + str(ret.status) + ' msgId: ' + ret.msg_id)
+ break
+ else:
+ print('send message to MQ failed.')
+ if i == (retry_time - 1):
+ print('send message to MQ failed after retries.')
+ except Exception as e:
+ print('ProducerSendSyncFailed:', e)
+ finally:
+ producer.shutdown()
+ MUTEX.release()
+ return
+
+
def send_orderly_with_sharding_key(count):
producer = Producer(gid, True)
producer.set_name_server_address(name_srv)