You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2019/03/14 02:10:05 UTC
[rocketmq-client-cpp] branch master updated: [Issue#94] Support
send batch message to broker (#95)
This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git
The following commit(s) were added to refs/heads/master by this push:
new dcba721 [Issue#94] Support send batch message to broker (#95)
dcba721 is described below
commit dcba721fc710c03cffc95e18d4c95524e6b18f42
Author: donggang123 <jo...@163.com>
AuthorDate: Thu Mar 14 10:09:58 2019 +0800
[Issue#94] Support send batch message to broker (#95)
Support send batch message
---
example/BatchProducer.cpp | 130 ++++++++++++++++++++++
example/CMakeLists.txt | 16 +--
include/BatchMessage.h | 12 ++
include/DefaultMQProducer.h | 4 +
include/MQMessage.h | 209 +++++++++++++++++------------------
include/MQProducer.h | 2 +
include/SendResult.h | 2 +-
src/MQClientAPIImpl.cpp | 36 +++---
src/common/AsyncCallbackWrap.cpp | 66 +++++------
src/message/BatchMessage.cpp | 45 ++++++++
src/message/MQMessage.cpp | 2 +-
src/producer/DefaultMQProducer.cpp | 77 ++++++++++++-
src/producer/SendResult.cpp | 2 +-
src/producer/StringIdMaker.cpp | 34 +++---
src/protocol/CommandHeader.cpp | 2 +
src/protocol/CommandHeader.h | 4 +-
src/protocol/ConsumerRunningInfo.cpp | 2 +-
src/transport/ResponseFuture.cpp | 24 ++--
src/transport/TcpRemotingClient.cpp | 10 +-
test/CMakeLists.txt | 63 +++++------
test/src/BatchMessageTest.cpp | 72 ++++++++++++
test/src/MQDecoderTest.cpp | 44 ++++++++
test/src/StringIdMakerTest.cpp | 38 +++++++
23 files changed, 653 insertions(+), 243 deletions(-)
diff --git a/example/BatchProducer.cpp b/example/BatchProducer.cpp
new file mode 100644
index 0000000..3788422
--- /dev/null
+++ b/example/BatchProducer.cpp
@@ -0,0 +1,130 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <condition_variable>
+#include <iomanip>
+#include <iomanip>
+#include <iostream>
+#include <iostream>
+#include <mutex>
+#include <thread>
+#include <vector>
+#include "common.h"
+
+using namespace rocketmq;
+using namespace std;
+boost::atomic<bool> g_quit;
+std::mutex g_mtx;
+std::condition_variable g_finished;
+TpsReportService g_tps;
+
+void SyncProducerWorker(RocketmqSendAndConsumerArgs* info,
+ DefaultMQProducer* producer) {
+ while (!g_quit.load()) {
+ if (g_msgCount.load() <= 0) {
+ std::unique_lock<std::mutex> lck(g_mtx);
+ g_finished.notify_one();
+ break;
+ }
+
+ vector<MQMessage> msgs;
+ MQMessage msg1(info->topic, "*", info->body);
+ msg1.setProperty("property1", "value1");
+ MQMessage msg2(info->topic, "*", info->body);
+ msg2.setProperty("property1", "value1");
+ msg2.setProperty("property2", "value2");
+ MQMessage msg3(info->topic, "*", info->body);
+ msg3.setProperty("property1", "value1");
+ msg3.setProperty("property2", "value2");
+ msg3.setProperty("property3", "value3");
+ msgs.push_back(msg1);
+ msgs.push_back(msg2);
+ msgs.push_back(msg3);
+ try {
+ auto start = std::chrono::system_clock::now();
+ SendResult sendResult = producer->send(msgs);
+ g_tps.Increment();
+ --g_msgCount;
+ auto end = std::chrono::system_clock::now();
+ auto duration =
+ std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
+ if (duration.count() >= 500) {
+ std::cout << "send RT more than: " << duration.count()
+ << " ms with msgid: " << sendResult.getMsgId() << endl;
+ }
+ } catch (const MQException& e) {
+ std::cout << "send failed: " << e.what() << std::endl;
+ std::unique_lock<std::mutex> lck(g_mtx);
+ g_finished.notify_one();
+ return;
+ }
+ }
+}
+
+int main(int argc, char* argv[]) {
+ RocketmqSendAndConsumerArgs info;
+ if (!ParseArgs(argc, argv, &info)) {
+ exit(-1);
+ }
+ PrintRocketmqSendAndConsumerArgs(info);
+ DefaultMQProducer producer("please_rename_unique_group_name");
+ producer.setNamesrvAddr(info.namesrv);
+ producer.setNamesrvDomain(info.namesrv_domain);
+ producer.setGroupName(info.groupname);
+ producer.setInstanceName(info.groupname);
+ producer.setSessionCredentials("mq acesskey", "mq secretkey", "ALIYUN");
+ producer.setSendMsgTimeout(500);
+ producer.setTcpTransportTryLockTimeout(1000);
+ producer.setTcpTransportConnectTimeout(400);
+
+ producer.start();
+ std::vector<std::shared_ptr<std::thread>> work_pool;
+ auto start = std::chrono::system_clock::now();
+ int msgcount = g_msgCount.load();
+ g_tps.start();
+
+ int threadCount = info.thread_count;
+ for (int j = 0; j < threadCount; j++) {
+ std::shared_ptr<std::thread> th =
+ std::make_shared<std::thread>(SyncProducerWorker, &info, &producer);
+ work_pool.push_back(th);
+ }
+
+ {
+ std::unique_lock<std::mutex> lck(g_mtx);
+ g_finished.wait(lck);
+ g_quit.store(true);
+ }
+
+ auto end = std::chrono::system_clock::now();
+ auto duration =
+ std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
+
+ std::cout
+ << "per msg time: " << duration.count() / (double)msgcount << "ms \n"
+ << "========================finished==============================\n";
+
+ for (size_t th = 0; th != work_pool.size(); ++th) {
+ work_pool[th]->join();
+ }
+
+ producer.shutdown();
+
+ return 0;
+}
diff --git a/example/CMakeLists.txt b/example/CMakeLists.txt
index 90c9442..aecec5c 100755
--- a/example/CMakeLists.txt
+++ b/example/CMakeLists.txt
@@ -41,18 +41,18 @@ foreach(file ${files})
endif()
if (MSVC)
- if (BUILD_ROCKETMQ_SHARED)
- target_link_libraries (${basename} rocketmq_shared ${deplibs}
- ${Boost_LIBRARIES} ${LIBEVENT_LIBRARIES} ${JSONCPP_LIBRARIES})
- else()
- target_link_libraries (${basename} rocketmq_static ${deplibs}
- ${Boost_LIBRARIES} ${LIBEVENT_LIBRARIES} ${JSONCPP_LIBRARIES})
+ if (BUILD_ROCKETMQ_SHARED)
+ target_link_libraries (${basename} rocketmq_shared ${deplibs}
+ ${Boost_LIBRARIES} ${LIBEVENT_LIBRARIES} ${JSONCPP_LIBRARIES})
+ else()
+ target_link_libraries (${basename} rocketmq_static ${deplibs}
+ ${Boost_LIBRARIES} ${LIBEVENT_LIBRARIES} ${JSONCPP_LIBRARIES})
endif()
else()
if (BUILD_ROCKETMQ_SHARED)
- target_link_libraries (${basename} rocketmq_shared)
+ target_link_libraries (${basename} rocketmq_shared)
else()
- target_link_libraries (${basename} rocketmq_static)
+ target_link_libraries (${basename} rocketmq_static)
endif()
endif()
diff --git a/include/BatchMessage.h b/include/BatchMessage.h
new file mode 100644
index 0000000..9c52d2d
--- /dev/null
+++ b/include/BatchMessage.h
@@ -0,0 +1,12 @@
+#ifndef __BATCHMESSAGE_H__
+#define __BATCHMESSAGE_H__
+#include "MQMessage.h"
+#include <string>
+namespace rocketmq {
+ class BatchMessage : public MQMessage {
+ public:
+ static std::string encode(std::vector <MQMessage> &msgs);
+ static std::string encode(MQMessage &message);
+ };
+}
+#endif
\ No newline at end of file
diff --git a/include/DefaultMQProducer.h b/include/DefaultMQProducer.h
index 02f5651..17be13f 100755
--- a/include/DefaultMQProducer.h
+++ b/include/DefaultMQProducer.h
@@ -22,6 +22,7 @@
#include "MQProducer.h"
#include "RocketMQClient.h"
#include "SendResult.h"
+#include "BatchMessage.h"
namespace rocketmq {
//<!***************************************************************************
@@ -43,6 +44,8 @@ class ROCKETMQCLIENT_API DefaultMQProducer : public MQProducer {
virtual SendResult send(MQMessage& msg, MessageQueueSelector* selector,
void* arg, int autoRetryTimes,
bool bActiveBroker = false);
+ virtual SendResult send(std::vector<MQMessage>& msgs);
+ virtual SendResult send(std::vector<MQMessage>& msgs, const MQMessageQueue& mq);
virtual void send(MQMessage& msg, SendCallback* pSendCallback,
bool bSelectActiveBroker = false);
virtual void send(MQMessage& msg, const MQMessageQueue& mq,
@@ -95,6 +98,7 @@ class ROCKETMQCLIENT_API DefaultMQProducer : public MQProducer {
SendResult sendKernelImpl(MQMessage& msg, const MQMessageQueue& mq,
int communicationMode, SendCallback* pSendCallback);
bool tryToCompressMessage(MQMessage& msg);
+ BatchMessage buildBatchMessage(std::vector<MQMessage>& msgs);
private:
int m_sendMsgTimeout;
diff --git a/include/MQMessage.h b/include/MQMessage.h
index ef2ba10..e6f2298 100755
--- a/include/MQMessage.h
+++ b/include/MQMessage.h
@@ -26,113 +26,108 @@
namespace rocketmq {
//<!***************************************************************************
class ROCKETMQCLIENT_API MQMessage {
- public:
- MQMessage();
- MQMessage(const std::string &topic, const std::string &body);
- MQMessage(const std::string &topic, const std::string &tags, const std::string &body);
- MQMessage(const std::string &topic, const std::string &tags, const std::string &keys, const std::string &body);
- MQMessage(const std::string &topic,
- const std::string &tags,
- const std::string &keys,
- const int flag,
- const std::string &body,
- bool waitStoreMsgOK);
-
- virtual ~MQMessage();
- MQMessage(const MQMessage &other);
- MQMessage &operator=(const MQMessage &other);
-
- void setProperty(const std::string &name, const std::string &value);
- const std::string &getProperty(const std::string &name) const;
-
- const std::string &getTopic() const;
- void setTopic(const std::string &topic);
- void setTopic(const char *body, int len);
-
- const std::string &getTags() const;
- void setTags(const std::string &tags);
-
- const std::string &getKeys() const;
- void setKeys(const std::string &keys);
- void setKeys(const std::vector<std::string> &keys);
-
- int getDelayTimeLevel() const;
- void setDelayTimeLevel(int level);
-
- bool isWaitStoreMsgOK();
- void setWaitStoreMsgOK(bool waitStoreMsgOK);
-
- int getFlag() const;
- void setFlag(int flag);
-
- int getSysFlag() const;
- void setSysFlag(int sysFlag);
-
- const std::string &getBody() const;
-
- void setBody(const char *body, int len);
- void setBody(const std::string &body);
-
- std::map<std::string, std::string> getProperties() const;
- void setProperties(std::map<std::string, std::string> &properties);
-
- const std::string toString() const {
- std::stringstream ss;
- std::string tags = getTags();
- ss << "Message [topic=" << m_topic << ", flag=" << m_flag << ", tag=" << tags << "]";
- return ss.str();
- }
-
- protected:
- friend class MQDecoder;
- void setPropertyInternal(const std::string &name, const std::string &value);
- void setPropertiesInternal(std::map<std::string, std::string> &properties);
-
- void Init(const std::string &topic,
- const std::string &tags,
- const std::string &keys,
- const int flag,
- const std::string &body,
- bool waitStoreMsgOK);
-
- public:
- static const std::string PROPERTY_KEYS;
- static const std::string PROPERTY_TAGS;
- static const std::string PROPERTY_WAIT_STORE_MSG_OK;
- static const std::string PROPERTY_DELAY_TIME_LEVEL;
- static const std::string PROPERTY_RETRY_TOPIC;
- static const std::string PROPERTY_REAL_TOPIC;
- static const std::string PROPERTY_REAL_QUEUE_ID;
- static const std::string PROPERTY_TRANSACTION_PREPARED;
- static const std::string PROPERTY_PRODUCER_GROUP;
- static const std::string PROPERTY_MIN_OFFSET;
- static const std::string PROPERTY_MAX_OFFSET;
-
- static const std::string PROPERTY_BUYER_ID;
- static const std::string PROPERTY_ORIGIN_MESSAGE_ID;
- static const std::string PROPERTY_TRANSFER_FLAG;
- static const std::string PROPERTY_CORRECTION_FLAG;
- static const std::string PROPERTY_MQ2_FLAG;
- static const std::string PROPERTY_RECONSUME_TIME;
- static const std::string PROPERTY_MSG_REGION;
- static const std::string PROPERTY_TRACE_SWITCH;
- static const std::string PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX;
- static const std::string PROPERTY_MAX_RECONSUME_TIMES;
- static const std::string PROPERTY_CONSUME_START_TIMESTAMP;
- static const std::string PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET;
- static const std::string PROPERTY_TRANSACTION_CHECK_TIMES;
- static const std::string PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS;
-
- static const std::string KEY_SEPARATOR;
-
- protected:
- int m_sysFlag;
-
- private:
- std::string m_topic;
- int m_flag;
- std::string m_body;
- std::map<std::string, std::string> m_properties;
+
+ public:
+ MQMessage();
+ MQMessage(const std::string& topic, const std::string& body);
+ MQMessage(const std::string& topic, const std::string& tags, const std::string& body);
+ MQMessage(const std::string& topic, const std::string& tags, const std::string& keys,
+ const std::string& body);
+ MQMessage(const std::string& topic, const std::string& tags, const std::string& keys,
+ const int flag, const std::string& body, bool waitStoreMsgOK);
+
+ virtual ~MQMessage();
+ MQMessage(const MQMessage& other);
+ MQMessage& operator=(const MQMessage& other);
+
+ void setProperty(const std::string& name, const std::string& value) ;
+ const std::string & getProperty(const std::string& name) const;
+
+ const std::string &getTopic() const;
+ void setTopic(const std::string& topic);
+ void setTopic(const char* body, int len);
+
+ const std::string &getTags() const;
+ void setTags(const std::string& tags);
+
+ const std::string &getKeys() const;
+ void setKeys(const std::string& keys);
+ void setKeys(const std::vector<std::string>& keys);
+
+ int getDelayTimeLevel() const;
+ void setDelayTimeLevel(int level);
+
+ bool isWaitStoreMsgOK() const;
+ void setWaitStoreMsgOK(bool waitStoreMsgOK);
+
+ int getFlag() const;
+ void setFlag(int flag);
+
+ int getSysFlag() const;
+ void setSysFlag(int sysFlag);
+
+ const std::string &getBody() const;
+
+ void setBody(const char* body, int len);
+ void setBody(const std::string& body);
+
+ std::map<std::string, std::string> getProperties() const;
+ void setProperties(std::map<std::string, std::string>& properties);
+
+ const std::string toString() const {
+ std::stringstream ss;
+ std::string tags = getTags();
+ ss << "Message [topic=" << m_topic << ", flag=" << m_flag
+ << ", tag=" << tags << "]";
+ return ss.str();
+ }
+
+ protected:
+ friend class MQDecoder;
+ void setPropertyInternal(const std::string& name, const std::string& value);
+ void setPropertiesInternal(std::map<std::string, std::string>& properties);
+
+ void Init(const std::string& topic, const std::string& tags, const std::string& keys,
+ const int flag, const std::string& body, bool waitStoreMsgOK);
+
+ public:
+ static const std::string PROPERTY_KEYS;
+ static const std::string PROPERTY_TAGS;
+ static const std::string PROPERTY_WAIT_STORE_MSG_OK;
+ static const std::string PROPERTY_DELAY_TIME_LEVEL;
+ static const std::string PROPERTY_RETRY_TOPIC;
+ static const std::string PROPERTY_REAL_TOPIC;
+ static const std::string PROPERTY_REAL_QUEUE_ID;
+ static const std::string PROPERTY_TRANSACTION_PREPARED;
+ static const std::string PROPERTY_PRODUCER_GROUP;
+ static const std::string PROPERTY_MIN_OFFSET;
+ static const std::string PROPERTY_MAX_OFFSET;
+
+ static const std::string PROPERTY_BUYER_ID;
+ static const std::string PROPERTY_ORIGIN_MESSAGE_ID;
+ static const std::string PROPERTY_TRANSFER_FLAG;
+ static const std::string PROPERTY_CORRECTION_FLAG;
+ static const std::string PROPERTY_MQ2_FLAG;
+ static const std::string PROPERTY_RECONSUME_TIME;
+ static const std::string PROPERTY_MSG_REGION;
+ static const std::string PROPERTY_TRACE_SWITCH;
+ static const std::string PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX;
+ static const std::string PROPERTY_MAX_RECONSUME_TIMES;
+ static const std::string PROPERTY_CONSUME_START_TIMESTAMP;
+ static const std::string PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET;
+ static const std::string PROPERTY_TRANSACTION_CHECK_TIMES;
+ static const std::string PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS;
+
+ static const std::string KEY_SEPARATOR;
+
+ protected:
+ int m_sysFlag;
+
+ private:
+ std::string m_topic;
+ int m_flag;
+ std::string m_body;
+ std::map<std::string, std::string> m_properties;
};
//<!***************************************************************************
} // namespace rocketmq
diff --git a/include/MQProducer.h b/include/MQProducer.h
index e5df9ee..2673931 100755
--- a/include/MQProducer.h
+++ b/include/MQProducer.h
@@ -52,6 +52,8 @@ class ROCKETMQCLIENT_API MQProducer : public MQClient {
SendCallback* sendCallback) = 0;
virtual void send(MQMessage& msg, MessageQueueSelector* selector, void* arg,
SendCallback* sendCallback) = 0;
+ virtual SendResult send(std::vector<MQMessage>& msgs) = 0;
+ virtual SendResult send(std::vector<MQMessage>& msgs, const MQMessageQueue& mq) = 0;
virtual void sendOneway(MQMessage& msg, bool bSelectActiveBroker = false) = 0;
virtual void sendOneway(MQMessage& msg, const MQMessageQueue& mq) = 0;
virtual void sendOneway(MQMessage& msg, MessageQueueSelector* selector,
diff --git a/include/SendResult.h b/include/SendResult.h
index 23a2dfc..239f07a 100755
--- a/include/SendResult.h
+++ b/include/SendResult.h
@@ -34,7 +34,7 @@ enum SendStatus {
class ROCKETMQCLIENT_API SendResult {
public:
SendResult();
- SendResult(const SendStatus& sendStatus, const std::string& msgId, const std::string& offsetMsgId,
+ SendResult(const SendStatus& sendStatus, const std::string& msgId, const std::string& offsetMsgId,
const MQMessageQueue& messageQueue, int64 queueOffset);
virtual ~SendResult();
diff --git a/src/MQClientAPIImpl.cpp b/src/MQClientAPIImpl.cpp
index 045bfab..8ec426d 100644
--- a/src/MQClientAPIImpl.cpp
+++ b/src/MQClientAPIImpl.cpp
@@ -422,25 +422,25 @@ void MQClientAPIImpl::sendMessageAsync(const string& addr,
if (m_pRemotingClient->invokeAsync(addr, request, cbw, timeoutMilliseconds, maxRetryTimes, retrySendTimes) ==
false) {
LOG_WARN("invokeAsync failed to addr:%s,topic:%s, timeout:%lld, maxRetryTimes:%d, retrySendTimes:%d",
- addr.c_str(), msg.getTopic().data(), timeoutMilliseconds, maxRetryTimes, retrySendTimes);
- //when getTcp return false, need consider retrySendTimes
- int retry_time = retrySendTimes + 1;
- int64 time_out = timeoutMilliseconds - (UtilAll::currentTimeMillis() - begin_time);
- while (retry_time < maxRetryTimes && time_out > 0) {
- begin_time = UtilAll::currentTimeMillis();
- if (m_pRemotingClient->invokeAsync(addr, request, cbw, time_out, maxRetryTimes, retry_time) == false) {
- retry_time += 1;
- time_out = time_out - (UtilAll::currentTimeMillis() - begin_time);
- LOG_WARN("invokeAsync retry failed to addr:%s,topic:%s, timeout:%lld, maxRetryTimes:%d, retrySendTimes:%d",
- addr.c_str(), msg.getTopic().data(), time_out, maxRetryTimes, retry_time);
- continue;
- } else {
- return; //invokeAsync success
- }
- }
+ addr.c_str(), msg.getTopic().data(), timeoutMilliseconds, maxRetryTimes, retrySendTimes);
+ //when getTcp return false, need consider retrySendTimes
+ int retry_time = retrySendTimes + 1;
+ int64 time_out = timeoutMilliseconds - (UtilAll::currentTimeMillis() - begin_time);
+ while (retry_time < maxRetryTimes && time_out > 0) {
+ begin_time = UtilAll::currentTimeMillis();
+ if (m_pRemotingClient->invokeAsync(addr, request, cbw, time_out, maxRetryTimes, retry_time) == false) {
+ retry_time += 1;
+ time_out = time_out - (UtilAll::currentTimeMillis() - begin_time);
+ LOG_WARN("invokeAsync retry failed to addr:%s,topic:%s, timeout:%lld, maxRetryTimes:%d, retrySendTimes:%d",
+ addr.c_str(), msg.getTopic().data(), time_out, maxRetryTimes, retry_time);
+ continue;
+ } else {
+ return; //invokeAsync success
+ }
+ }
LOG_ERROR("sendMessageAsync failed to addr:%s,topic:%s, timeout:%lld, maxRetryTimes:%d, retrySendTimes:%d",
- addr.c_str(), msg.getTopic().data(), time_out, maxRetryTimes, retrySendTimes);
+ addr.c_str(), msg.getTopic().data(), time_out, maxRetryTimes, retrySendTimes);
if (cbw) {
cbw->onException();
@@ -554,7 +554,7 @@ SendResult MQClientAPIImpl::processSendResponse(const string& brokerName,
(SendMessageResponseHeader*)pResponse->getCommandHeader();
MQMessageQueue messageQueue(msg.getTopic(), brokerName,
responseHeader->queueId);
- string unique_msgId = msg.getProperty(MQMessage::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
+ string unique_msgId = msg.getProperty(MQMessage::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
return SendResult(sendStatus, unique_msgId, responseHeader->msgId, messageQueue,
responseHeader->queueOffset);
}
diff --git a/src/common/AsyncCallbackWrap.cpp b/src/common/AsyncCallbackWrap.cpp
index 42ce04e..b0ffa04 100755
--- a/src/common/AsyncCallbackWrap.cpp
+++ b/src/common/AsyncCallbackWrap.cpp
@@ -88,42 +88,42 @@ void SendCallbackWrap::operationComplete(ResponseFuture* pResponseFuture,
try {
SendResult ret = m_pClientAPI->processSendResponse(m_brokerName, m_msg, pResponse.get());
if (pCallback) {
- LOG_DEBUG("operationComplete: processSendResponse success, opaque:%d, maxRetryTime:%d, retrySendTimes:%d", opaque, pResponseFuture->getMaxRetrySendTimes(), pResponseFuture->getRetrySendTimes());
- pCallback->onSuccess(ret);
- }
+ LOG_DEBUG("operationComplete: processSendResponse success, opaque:%d, maxRetryTime:%d, retrySendTimes:%d", opaque, pResponseFuture->getMaxRetrySendTimes(), pResponseFuture->getRetrySendTimes());
+ pCallback->onSuccess(ret);
+ }
} catch (MQException& e) {
LOG_ERROR("operationComplete: processSendResponse exception: %s", e.what());
- //broker may return exception, need consider retry send
- int maxRetryTimes = pResponseFuture->getMaxRetrySendTimes();
- int retryTimes = pResponseFuture->getRetrySendTimes();
- if (pResponseFuture->getASyncFlag() && retryTimes < maxRetryTimes && maxRetryTimes > 1) {
-
- int64 left_timeout_ms = pResponseFuture->leftTime();
- string brokerAddr = pResponseFuture->getBrokerAddr();
- const RemotingCommand& requestCommand = pResponseFuture->getRequestCommand();
- retryTimes += 1;
- LOG_WARN("retry send, opaque:%d, sendTimes:%d, maxRetryTimes:%d, left_timeout:%lld, brokerAddr:%s, msg:%s",
- opaque, retryTimes, maxRetryTimes, left_timeout_ms, brokerAddr.data(), m_msg.toString().data());
-
- bool exception_flag = false;
- try {
- m_pClientAPI->sendMessageAsync(pResponseFuture->getBrokerAddr(), m_brokerName, m_msg, (RemotingCommand&)requestCommand, pCallback, left_timeout_ms, maxRetryTimes, retryTimes);
- } catch (MQClientException& e) {
- LOG_ERROR("retry send exception:%s, opaque:%d, retryTimes:%d, msg:%s, not retry send again", e.what(), opaque, retryTimes, m_msg.toString().data());
- exception_flag = true;
- }
-
- if (exception_flag == false) {
- return; //send retry again, here need return
- }
- }
-
- if (pCallback) {
- MQException exception("process send response error", -1, __FILE__,
- __LINE__);
- pCallback->onException(exception);
- }
+ //broker may return exception, need consider retry send
+ int maxRetryTimes = pResponseFuture->getMaxRetrySendTimes();
+ int retryTimes = pResponseFuture->getRetrySendTimes();
+ if (pResponseFuture->getASyncFlag() && retryTimes < maxRetryTimes && maxRetryTimes > 1) {
+
+ int64 left_timeout_ms = pResponseFuture->leftTime();
+ string brokerAddr = pResponseFuture->getBrokerAddr();
+ const RemotingCommand& requestCommand = pResponseFuture->getRequestCommand();
+ retryTimes += 1;
+ LOG_WARN("retry send, opaque:%d, sendTimes:%d, maxRetryTimes:%d, left_timeout:%lld, brokerAddr:%s, msg:%s",
+ opaque, retryTimes, maxRetryTimes, left_timeout_ms, brokerAddr.data(), m_msg.toString().data());
+
+ bool exception_flag = false;
+ try {
+ m_pClientAPI->sendMessageAsync(pResponseFuture->getBrokerAddr(), m_brokerName, m_msg, (RemotingCommand&)requestCommand, pCallback, left_timeout_ms, maxRetryTimes, retryTimes);
+ } catch (MQClientException& e) {
+ LOG_ERROR("retry send exception:%s, opaque:%d, retryTimes:%d, msg:%s, not retry send again", e.what(), opaque, retryTimes, m_msg.toString().data());
+ exception_flag = true;
+ }
+
+ if (exception_flag == false) {
+ return; //send retry again, here need return
+ }
+ }
+
+ if (pCallback) {
+ MQException exception("process send response error", -1, __FILE__,
+ __LINE__);
+ pCallback->onException(exception);
+ }
}
}
if (pCallback && pCallback->getSendCallbackType() == autoDeleteSendCallback) {
diff --git a/src/message/BatchMessage.cpp b/src/message/BatchMessage.cpp
new file mode 100644
index 0000000..c2b9ec2
--- /dev/null
+++ b/src/message/BatchMessage.cpp
@@ -0,0 +1,45 @@
+#include "BatchMessage.h"
+#include "MQDecoder.h"
+#include "StringIdMaker.h"
+
+using namespace std;
+namespace rocketmq {
+
+ std::string BatchMessage::encode(std::vector<MQMessage> &msgs) {
+ string encodedBody;
+ for (auto message : msgs) {
+ string unique_id = StringIdMaker::get_mutable_instance().get_unique_id();
+ message.setProperty(MQMessage::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, unique_id);
+ encodedBody.append(encode(message));
+ }
+ return encodedBody;
+ }
+
+ std::string BatchMessage::encode(MQMessage &message) {
+ string encodeMsg;
+ const string &body = message.getBody();
+ int bodyLen = body.length();
+ string properties = MQDecoder::messageProperties2String(message.getProperties());
+ short propertiesLength = (short) properties.length();
+ int storeSize = 20 + bodyLen + 2 + propertiesLength;
+ //TOTALSIZE|MAGICCOD|BODYCRC|FLAG|BODYLen|Body|propertiesLength|properties
+ int magicCode = 0;
+ int bodyCrc = 0;
+ int flag = message.getFlag();
+ int storeSize_net = htonl(storeSize);
+ int magicCode_net = htonl(magicCode);
+ int bodyCrc_net = htonl(bodyCrc);
+ int flag_net = htonl(flag);
+ int bodyLen_net = htonl(bodyLen);
+ int propertiesLength_net = htons(propertiesLength);
+ encodeMsg.append((char*)&storeSize_net, sizeof(int));
+ encodeMsg.append((char*)&magicCode_net, sizeof(int));
+ encodeMsg.append((char*)&bodyCrc_net, sizeof(int));
+ encodeMsg.append((char*)&flag_net, sizeof(int));
+ encodeMsg.append((char*)&bodyLen_net, sizeof(int));
+ encodeMsg.append(body.c_str(), body.length());
+ encodeMsg.append((char*)&propertiesLength_net, sizeof(short));
+ encodeMsg.append(properties.c_str(), propertiesLength);
+ return encodeMsg;
+ }
+}
diff --git a/src/message/MQMessage.cpp b/src/message/MQMessage.cpp
index cc5d16f..e816879 100755
--- a/src/message/MQMessage.cpp
+++ b/src/message/MQMessage.cpp
@@ -173,7 +173,7 @@ void MQMessage::setDelayTimeLevel(int level) {
setPropertyInternal(PROPERTY_DELAY_TIME_LEVEL, tmp);
}
-bool MQMessage::isWaitStoreMsgOK() {
+bool MQMessage::isWaitStoreMsgOK() const {
string tmp = getProperty(PROPERTY_WAIT_STORE_MSG_OK);
if (tmp.empty()) {
return true;
diff --git a/src/producer/DefaultMQProducer.cpp b/src/producer/DefaultMQProducer.cpp
index 4edf811..b7c3695 100755
--- a/src/producer/DefaultMQProducer.cpp
+++ b/src/producer/DefaultMQProducer.cpp
@@ -29,6 +29,8 @@
#include "TopicPublishInfo.h"
#include "Validators.h"
#include "StringIdMaker.h"
+#include "BatchMessage.h"
+#include <typeinfo>
namespace rocketmq {
@@ -127,6 +129,68 @@ void DefaultMQProducer::send(MQMessage& msg, SendCallback* pSendCallback,
}
}
+SendResult DefaultMQProducer::send(std::vector<MQMessage>& msgs) {
+ SendResult result;
+ try {
+ BatchMessage batchMessage = buildBatchMessage(msgs);
+ result = sendDefaultImpl(batchMessage, ComMode_SYNC, NULL);
+ } catch (MQException& e) {
+ LOG_ERROR(e.what());
+ throw e;
+ }
+ return result;
+}
+
+SendResult DefaultMQProducer::send(std::vector<MQMessage>& msgs, const MQMessageQueue& mq) {
+ SendResult result;
+ try {
+ BatchMessage batchMessage = buildBatchMessage(msgs);
+ result = sendKernelImpl(batchMessage, mq, ComMode_SYNC, NULL);
+ } catch (MQException& e) {
+ LOG_ERROR(e.what());
+ throw e;
+ }
+ return result;
+}
+
+BatchMessage DefaultMQProducer::buildBatchMessage(std::vector<MQMessage>& msgs) {
+
+ if (msgs.size() < 1) {
+ THROW_MQEXCEPTION(MQClientException, "msgs need one message at least", -1);
+ }
+ BatchMessage batchMessage;
+ bool firstFlag = true;
+ string topic;
+ bool waitStoreMsgOK = false;
+ for (auto& msg : msgs) {
+ Validators::checkMessage(msg, getMaxMessageSize());
+ if (firstFlag) {
+ topic = msg.getTopic();
+ waitStoreMsgOK = msg.isWaitStoreMsgOK();
+ firstFlag = false;
+
+ if (UtilAll::startsWith_retry(topic)) {
+ THROW_MQEXCEPTION(MQClientException, "Retry Group is not supported for batching", -1);
+ }
+ } else {
+
+ if (msg.getDelayTimeLevel() > 0) {
+ THROW_MQEXCEPTION(MQClientException, "TimeDelayLevel in not supported for batching", -1);
+ }
+ if (msg.getTopic() != topic) {
+ THROW_MQEXCEPTION(MQClientException, "msgs need one message at least", -1);
+ }
+ if (msg.isWaitStoreMsgOK() != waitStoreMsgOK) {
+ THROW_MQEXCEPTION(MQClientException, "msgs need one message at least", -2);
+ }
+ }
+ }
+ batchMessage.setBody(BatchMessage::encode(msgs));
+ batchMessage.setTopic(topic);
+ batchMessage.setWaitStoreMsgOK(waitStoreMsgOK);
+ return batchMessage;
+}
+
SendResult DefaultMQProducer::send(MQMessage& msg, const MQMessageQueue& mq) {
Validators::checkMessage(msg, getMaxMessageSize());
if (msg.getTopic() != mq.getTopic()) {
@@ -336,9 +400,13 @@ SendResult DefaultMQProducer::sendKernelImpl(MQMessage& msg,
if (!brokerAddr.empty()) {
try {
- //msgId is produced by client, offsetMsgId produced by broker. (same with java sdk)
- string unique_id = StringIdMaker::get_mutable_instance().get_unique_id();
- msg.setProperty(MQMessage::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, unique_id);
+ BatchMessage batchMessage;
+ bool isBatchMsg = (typeid(msg).name() == typeid(batchMessage).name());
+ //msgId is produced by client, offsetMsgId produced by broker. (same with java sdk)
+ if (!isBatchMsg) {
+ string unique_id = StringIdMaker::get_mutable_instance().get_unique_id();
+ msg.setProperty(MQMessage::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, unique_id);
+ }
LOG_DEBUG("produce before:%s to %s", msg.toString().c_str(), mq.toString().c_str());
@@ -353,6 +421,7 @@ SendResult DefaultMQProducer::sendKernelImpl(MQMessage& msg,
requestHeader->sysFlag = (msg.getSysFlag());
requestHeader->bornTimestamp = UtilAll::currentTimeMillis();
requestHeader->flag = (msg.getFlag());
+ requestHeader->batch = isBatchMsg;
requestHeader->properties =
(MQDecoder::messageProperties2String(msg.getProperties()));
@@ -505,7 +574,7 @@ void DefaultMQProducer::setRetryTimes4Async(int times)
{
if (times <= 0) {
LOG_WARN("set retry times illegal, use default value:1");
- m_retryTimes4Async = 1;
+ m_retryTimes4Async = 1;
return;
}
diff --git a/src/producer/SendResult.cpp b/src/producer/SendResult.cpp
index 7bc31ca..9eabf5d 100755
--- a/src/producer/SendResult.cpp
+++ b/src/producer/SendResult.cpp
@@ -42,7 +42,7 @@ SendResult& SendResult::operator=(const SendResult& other) {
if (this != &other) {
m_sendStatus = other.m_sendStatus;
m_msgId = other.m_msgId;
- m_offsetMsgId = other.m_offsetMsgId;
+ m_offsetMsgId = other.m_offsetMsgId;
m_messageQueue = other.m_messageQueue;
m_queueOffset = other.m_queueOffset;
}
diff --git a/src/producer/StringIdMaker.cpp b/src/producer/StringIdMaker.cpp
index 23abd08..06bd872 100644
--- a/src/producer/StringIdMaker.cpp
+++ b/src/producer/StringIdMaker.cpp
@@ -21,21 +21,21 @@ namespace rocketmq {
#ifdef WIN32
int gettimeofdayWin(struct timeval *tp, void *tzp)
{
- time_t clock;
- struct tm tm;
- SYSTEMTIME wtm;
- GetLocalTime(&wtm);
- tm.tm_year = wtm.wYear - 1900;
- tm.tm_mon = wtm.wMonth - 1;
- tm.tm_mday = wtm.wDay;
- tm.tm_hour = wtm.wHour;
- tm.tm_min = wtm.wMinute;
- tm.tm_sec = wtm.wSecond;
- tm. tm_isdst = -1;
- clock = mktime(&tm);
- tp->tv_sec = clock;
- tp->tv_usec = wtm.wMilliseconds * 1000;
- return (0);
+ time_t clock;
+ struct tm tm;
+ SYSTEMTIME wtm;
+ GetLocalTime(&wtm);
+ tm.tm_year = wtm.wYear - 1900;
+ tm.tm_mon = wtm.wMonth - 1;
+ tm.tm_mday = wtm.wDay;
+ tm.tm_hour = wtm.wHour;
+ tm.tm_min = wtm.wMinute;
+ tm.tm_sec = wtm.wSecond;
+ tm.tm_isdst = -1;
+ clock = mktime(&tm);
+ tp->tv_sec = clock;
+ tp->tv_usec = wtm.wMilliseconds * 1000;
+ return (0);
}
#endif
@@ -115,7 +115,7 @@ uint64_t StringIdMaker::get_curr_ms() {
#ifdef WIN32
gettimeofdayWin(&time_now, NULL); // WIN32
#else
- gettimeofday(&time_now, NULL); //LINUX
+ gettimeofday(&time_now, NULL); //LINUX
#endif
uint64_t ms_time = time_now.tv_sec * 1000 + time_now.tv_usec / 1000;
@@ -155,7 +155,7 @@ void StringIdMaker::set_start_and_next_tm() {
int StringIdMaker::atomic_incr(int id) {
#ifdef WIN32
- InterlockedIncrement((LONG*)&id);
+ InterlockedIncrement((LONG*)&id);
#else
__sync_add_and_fetch(&id, 1);
#endif
diff --git a/src/protocol/CommandHeader.cpp b/src/protocol/CommandHeader.cpp
index 366ac2e..6290ac7 100644
--- a/src/protocol/CommandHeader.cpp
+++ b/src/protocol/CommandHeader.cpp
@@ -80,6 +80,7 @@ void SendMessageRequestHeader::Encode(Json::Value& outData) {
outData["reconsumeTimes"] = UtilAll::to_string(reconsumeTimes);
outData["unitMode"] = UtilAll::to_string(unitMode);
#endif
+ outData["batch"] = batch;
}
int SendMessageRequestHeader::getReconsumeTimes() { return reconsumeTimes; }
@@ -121,6 +122,7 @@ void SendMessageRequestHeader::SetDeclaredFieldOfCommandHeader(
requestMap.insert(
pair<string, string>("unitMode", UtilAll::to_string(unitMode)));
#endif
+ requestMap.insert(pair<string, string>("batch", UtilAll::to_string(batch)));
}
//<!************************************************************************
diff --git a/src/protocol/CommandHeader.h b/src/protocol/CommandHeader.h
index 5a55c55..b5ffe14 100644
--- a/src/protocol/CommandHeader.h
+++ b/src/protocol/CommandHeader.h
@@ -90,7 +90,8 @@ class SendMessageRequestHeader : public CommandHeader {
bornTimestamp(0),
flag(0),
reconsumeTimes(0),
- unitMode(false) {}
+ unitMode(false),
+ batch(false){}
virtual ~SendMessageRequestHeader() {}
virtual void Encode(Json::Value& outData);
virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap);
@@ -109,6 +110,7 @@ class SendMessageRequestHeader : public CommandHeader {
string properties;
int reconsumeTimes;
bool unitMode;
+ bool batch;
};
//<!************************************************************************
diff --git a/src/protocol/ConsumerRunningInfo.cpp b/src/protocol/ConsumerRunningInfo.cpp
index f24b3fa..ab7f389 100755
--- a/src/protocol/ConsumerRunningInfo.cpp
+++ b/src/protocol/ConsumerRunningInfo.cpp
@@ -60,7 +60,7 @@ void ConsumerRunningInfo::setStatusTable(const map<string, ConsumeStatus>&
input_statusTable)
{
statusTable = input_statusTable;
-} */
+} */
const vector<SubscriptionData> ConsumerRunningInfo::getSubscriptionSet() const {
return subscriptionSet;
diff --git a/src/transport/ResponseFuture.cpp b/src/transport/ResponseFuture.cpp
index 9307616..a304e3f 100755
--- a/src/transport/ResponseFuture.cpp
+++ b/src/transport/ResponseFuture.cpp
@@ -152,8 +152,8 @@ void ResponseFuture::executeInvokeCallbackException() {
} else {
if (m_asyncCallbackStatus == asyncCallBackStatus_timeout) {
- //here no need retrySendTimes process because of it have timeout
- LOG_ERROR("send msg, callback timeout, opaque:%d, sendTimes:%d, maxRetryTimes:%d", getOpaque(), getRetrySendTimes(), getMaxRetrySendTimes());
+ //here no need retrySendTimes process because of it have timeout
+ LOG_ERROR("send msg, callback timeout, opaque:%d, sendTimes:%d, maxRetryTimes:%d", getOpaque(), getRetrySendTimes(), getMaxRetrySendTimes());
m_pCallbackWrap->onException();
} else {
@@ -172,36 +172,36 @@ bool ResponseFuture::isTimeOut() const {
}
int ResponseFuture::getMaxRetrySendTimes() const {
- return m_maxRetrySendTimes;
+ return m_maxRetrySendTimes;
}
int ResponseFuture::getRetrySendTimes() const {
- return m_retrySendTimes;
+ return m_retrySendTimes;
}
void ResponseFuture::setMaxRetrySendTimes(int maxRetryTimes) {
- m_maxRetrySendTimes = maxRetryTimes;
+ m_maxRetrySendTimes = maxRetryTimes;
}
void ResponseFuture::setRetrySendTimes(int retryTimes) {
- m_retrySendTimes = retryTimes;
+ m_retrySendTimes = retryTimes;
}
void ResponseFuture::setBrokerAddr(const std::string& brokerAddr) {
- m_brokerAddr = brokerAddr;
+ m_brokerAddr = brokerAddr;
}
void ResponseFuture::setRequestCommand(const RemotingCommand& requestCommand) {
- m_requestCommand = requestCommand;
+ m_requestCommand = requestCommand;
}
const RemotingCommand& ResponseFuture::getRequestCommand() {
- return m_requestCommand;
+ return m_requestCommand;
}
std::string ResponseFuture::getBrokerAddr() const {
- return m_brokerAddr;
+ return m_brokerAddr;
}
int64 ResponseFuture::leftTime() const {
- int64 diff = UtilAll::currentTimeMillis() - m_beginTimestamp;
- return m_timeout - diff;
+ int64 diff = UtilAll::currentTimeMillis() - m_beginTimestamp;
+ return m_timeout - diff;
}
RemotingCommand* ResponseFuture::getCommand() const {
diff --git a/src/transport/TcpRemotingClient.cpp b/src/transport/TcpRemotingClient.cpp
index 603c17f..7b9dff9 100755
--- a/src/transport/TcpRemotingClient.cpp
+++ b/src/transport/TcpRemotingClient.cpp
@@ -217,10 +217,10 @@ bool TcpRemotingClient::invokeAsync(const string& addr,
int opaque = request.getOpaque();
boost::shared_ptr<ResponseFuture> responseFuture(
new ResponseFuture(code, opaque, this, timeoutMilliseconds, true, cbw));
- responseFuture->setMaxRetrySendTimes(maxRetrySendTimes);
- responseFuture->setRetrySendTimes(retrySendTimes);
- responseFuture->setBrokerAddr(addr);
- responseFuture->setRequestCommand(request);
+ responseFuture->setMaxRetrySendTimes(maxRetrySendTimes);
+ responseFuture->setRetrySendTimes(retrySendTimes);
+ responseFuture->setBrokerAddr(addr);
+ responseFuture->setRequestCommand(request);
addAsyncResponseFuture(opaque, responseFuture);
if (cbw) {
boost::asio::deadline_timer* t = new boost::asio::deadline_timer(
@@ -593,7 +593,7 @@ void TcpRemotingClient::processResponseCommand(
pfuture->setAsyncResponseFlag();
pfuture->setAsyncCallBackStatus(asyncCallBackStatus_response);
cancelTimerCallback(opaque);
- pfuture->executeInvokeCallback();
+ pfuture->executeInvokeCallback();
}
}
}
diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt
index 8fcd892..73a00d0 100755
--- a/test/CMakeLists.txt
+++ b/test/CMakeLists.txt
@@ -15,7 +15,6 @@
project(test)
-
SET(SUB_DIRS)
file(GLOB children ${CMAKE_SOURCE_DIR}/src/*)
FOREACH(child ${children})
@@ -32,7 +31,6 @@ set(EXECUTABLE_OUTPUT_PATH ${CMAKE_SOURCE_DIR}/test/bin)
include_directories(${CMAKE_SOURCE_DIR}/include)
include_directories(${Boost_INCLUDE_DIRS})
-
set(Gtest_INCLUDE_DIR ${CMAKE_SOURCE_DIR}/bin/include/gtest)
set(Gmock_INCLUDE_DIR ${CMAKE_SOURCE_DIR}/bin/include/gmock)
@@ -55,34 +53,32 @@ message(status "ROCKETMQ_LIBRARIES ${ROCKETMQ_LIBRARIES}")
set(CMAKE_BUILD_TYPE "Debug")
-
function(compile files)
- foreach(file ${files})
- get_filename_component(basename ${file} NAME_WE)
- add_executable(${basename} ${file})
- if(MSVC)
- if(CMAKE_CONFIGURATION_TYPES STREQUAL "Release")
- set_target_properties( ${basename} PROPERTIES LINK_FLAGS "/NODEFAULTLIB:LIBCMT" )
- else()
- set_target_properties( ${basename} PROPERTIES LINK_FLAGS "/NODEFAULTLIB:LIBCMTD" )
- endif()
- endif()
-
- if (MSVC)
- if (BUILD_ROCKETMQ_SHARED)
- target_link_libraries (${basename} rocketmq_shared ${deplibs}
- ${Boost_LIBRARIES} ${LIBEVENT_LIBRARIES} ${JSONCPP_LIBRARIES} ${x`})
- else()
- target_link_libraries (${basename} rocketmq_static ${deplibs}
- ${Boost_LIBRARIES} ${LIBEVENT_LIBRARIES} ${JSONCPP_LIBRARIES} ${Gtest_LIBRARIES})
- endif()
- else()
- target_link_libraries (${basename} rocketmq_shared ${deplibs})
- target_link_libraries (${basename} rocketmq_shared ${Gtest_LIBRARIES})
- target_link_libraries (${basename} rocketmq_shared ${Gmock_LIBRARIES})
- endif()
-
- endforeach()
+ foreach(file ${files})
+ get_filename_component(basename ${file} NAME_WE)
+ add_executable(${basename} ${file})
+ if(MSVC)
+ if(CMAKE_CONFIGURATION_TYPES STREQUAL "Release")
+ set_target_properties( ${basename} PROPERTIES LINK_FLAGS "/NODEFAULTLIB:LIBCMT" )
+ else()
+ set_target_properties( ${basename} PROPERTIES LINK_FLAGS "/NODEFAULTLIB:LIBCMTD" )
+ endif()
+ endif()
+
+ if (MSVC)
+ if (BUILD_ROCKETMQ_SHARED)
+ target_link_libraries (${basename} rocketmq_shared ${deplibs}
+ ${Boost_LIBRARIES} ${LIBEVENT_LIBRARIES} ${JSONCPP_LIBRARIES} ${x`})
+ else()
+ target_link_libraries (${basename} rocketmq_static ${deplibs}
+ ${Boost_LIBRARIES} ${LIBEVENT_LIBRARIES} ${JSONCPP_LIBRARIES} ${Gtest_LIBRARIES})
+ endif()
+ else()
+ target_link_libraries (${basename} rocketmq_shared ${deplibs})
+ target_link_libraries (${basename} rocketmq_shared ${Gtest_LIBRARIES})
+ target_link_libraries (${basename} rocketmq_shared ${Gmock_LIBRARIES})
+ endif()
+ endforeach()
endfunction()
file(GLOB files "src/*.c*")
@@ -90,9 +86,8 @@ compile("${files}")
file(GLOB files "src/*")
foreach(file ${files})
- if(IS_DIRECTORY ${file})
- file(GLOB filess "${file}/*.c*")
- compile("${filess}")
- endif()
+ if(IS_DIRECTORY ${file})
+ file(GLOB filess "${file}/*.c*")
+ compile("${filess}")
+ endif()
endforeach()
-
diff --git a/test/src/BatchMessageTest.cpp b/test/src/BatchMessageTest.cpp
new file mode 100644
index 0000000..81bd7bb
--- /dev/null
+++ b/test/src/BatchMessageTest.cpp
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "gtest/gtest.h"
+#include "gmock/gmock.h"
+#include <unistd.h>
+#include <stdio.h>
+#include <iostream>
+#include "BatchMessage.h"
+#include "MQMessage.h"
+#include <map>
+
+using namespace std;
+using namespace rocketmq;
+using ::testing::InitGoogleTest;
+using ::testing::InitGoogleMock;
+using testing::Return;
+
+TEST(BatchMessageEncodeTest, encodeMQMessage) {
+ MQMessage msg1("topic", "*", "test");
+ //const map<string,string>& properties = msg1.getProperties();
+ //for (auto& pair : properties) {
+ // std::cout << pair.first << " : " << pair.second << std::endl;
+ //}
+
+ EXPECT_EQ(msg1.getProperties().size(), 2);
+ EXPECT_EQ(msg1.getBody().size(), 4);
+ //20 + bodyLen + 2 + propertiesLength;
+ string encodeMessage = BatchMessage::encode(msg1);
+ EXPECT_EQ(encodeMessage.size(), 43);
+
+ msg1.setProperty(MQMessage::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, "1");
+ encodeMessage = BatchMessage::encode(msg1);
+ EXPECT_EQ(encodeMessage.size(), 54);
+}
+
+TEST(BatchMessageEncodeTest, encodeMQMessages) {
+ std::vector<MQMessage> msgs;
+ MQMessage msg1("topic", "*", "test1");
+ //const map<string,string>& properties = msg1.getProperties();
+ //for (auto& pair : properties) {
+ // std::cout << pair.first << " : " << pair.second << std::endl;
+ //}
+ msgs.push_back(msg1);
+ //20 + bodyLen + 2 + propertiesLength;
+ string encodeMessage = BatchMessage::encode(msgs);
+ EXPECT_EQ(encodeMessage.size(), 86);
+ MQMessage msg2("topic", "*", "test2");
+ MQMessage msg3("topic", "*", "test3");
+ msgs.push_back(msg2);
+ msgs.push_back(msg3);
+ encodeMessage = BatchMessage::encode(msgs);
+ EXPECT_EQ(encodeMessage.size(), 258);//86*3
+}
+
+int main(int argc, char* argv[]) {
+ InitGoogleMock(&argc, argv);
+ return RUN_ALL_TESTS();
+}
diff --git a/test/src/MQDecoderTest.cpp b/test/src/MQDecoderTest.cpp
new file mode 100644
index 0000000..263fe30
--- /dev/null
+++ b/test/src/MQDecoderTest.cpp
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "gtest/gtest.h"
+#include "gmock/gmock.h"
+#include <unistd.h>
+#include <stdio.h>
+#include "BatchMessage.h"
+#include "MQMessage.h"
+#include <map>
+#include "MQDecoder.h"
+
+using namespace std;
+using namespace rocketmq;
+using ::testing::InitGoogleTest;
+using ::testing::InitGoogleMock;
+using testing::Return;
+
+TEST(MQDecoderTest, messageProperties2String) {
+ map<string, string> properties;
+ string property = MQDecoder::messageProperties2String(properties);
+ EXPECT_EQ(property.size(), 0);
+ properties["aaa"] = "aaa";
+ property = MQDecoder::messageProperties2String(properties);
+ EXPECT_EQ(property.size(), 8);
+}
+
+int main(int argc, char* argv[]) {
+ InitGoogleMock(&argc, argv);
+ return RUN_ALL_TESTS();
+}
diff --git a/test/src/StringIdMakerTest.cpp b/test/src/StringIdMakerTest.cpp
new file mode 100644
index 0000000..8889d28
--- /dev/null
+++ b/test/src/StringIdMakerTest.cpp
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "gtest/gtest.h"
+#include "gmock/gmock.h"
+#include "StringIdMaker.h"
+#include <map>
+#include <iostream>
+
+using namespace std;
+using namespace rocketmq;
+using ::testing::InitGoogleTest;
+using ::testing::InitGoogleMock;
+using testing::Return;
+
+TEST(StringIdMakerTest, get_unique_id) {
+ string unique_id = StringIdMaker::get_mutable_instance().get_unique_id();
+ cout << "unique_id: " << unique_id << endl;
+ EXPECT_EQ(unique_id.size(), 32);
+}
+
+int main(int argc, char* argv[]) {
+ InitGoogleMock(&argc, argv);
+ return RUN_ALL_TESTS();
+}