You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2019/03/14 02:10:05 UTC

[rocketmq-client-cpp] branch master updated: [Issue#94] Support send batch message to broker (#95)

This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git


The following commit(s) were added to refs/heads/master by this push:
     new dcba721  [Issue#94] Support send batch message to broker (#95)
dcba721 is described below

commit dcba721fc710c03cffc95e18d4c95524e6b18f42
Author: donggang123 <jo...@163.com>
AuthorDate: Thu Mar 14 10:09:58 2019 +0800

    [Issue#94] Support send batch message to broker (#95)
    
    Support send batch message
---
 example/BatchProducer.cpp            | 130 ++++++++++++++++++++++
 example/CMakeLists.txt               |  16 +--
 include/BatchMessage.h               |  12 ++
 include/DefaultMQProducer.h          |   4 +
 include/MQMessage.h                  | 209 +++++++++++++++++------------------
 include/MQProducer.h                 |   2 +
 include/SendResult.h                 |   2 +-
 src/MQClientAPIImpl.cpp              |  36 +++---
 src/common/AsyncCallbackWrap.cpp     |  66 +++++------
 src/message/BatchMessage.cpp         |  45 ++++++++
 src/message/MQMessage.cpp            |   2 +-
 src/producer/DefaultMQProducer.cpp   |  77 ++++++++++++-
 src/producer/SendResult.cpp          |   2 +-
 src/producer/StringIdMaker.cpp       |  34 +++---
 src/protocol/CommandHeader.cpp       |   2 +
 src/protocol/CommandHeader.h         |   4 +-
 src/protocol/ConsumerRunningInfo.cpp |   2 +-
 src/transport/ResponseFuture.cpp     |  24 ++--
 src/transport/TcpRemotingClient.cpp  |  10 +-
 test/CMakeLists.txt                  |  63 +++++------
 test/src/BatchMessageTest.cpp        |  72 ++++++++++++
 test/src/MQDecoderTest.cpp           |  44 ++++++++
 test/src/StringIdMakerTest.cpp       |  38 +++++++
 23 files changed, 653 insertions(+), 243 deletions(-)

diff --git a/example/BatchProducer.cpp b/example/BatchProducer.cpp
new file mode 100644
index 0000000..3788422
--- /dev/null
+++ b/example/BatchProducer.cpp
@@ -0,0 +1,130 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <condition_variable>
+#include <iomanip>
+#include <iomanip>
+#include <iostream>
+#include <iostream>
+#include <mutex>
+#include <thread>
+#include <vector>
+#include "common.h"
+
+using namespace rocketmq;
+using namespace std;
+boost::atomic<bool> g_quit;
+std::mutex g_mtx;
+std::condition_variable g_finished;
+TpsReportService g_tps;
+
+void SyncProducerWorker(RocketmqSendAndConsumerArgs* info,
+                        DefaultMQProducer* producer) {
+    while (!g_quit.load()) {
+        if (g_msgCount.load() <= 0) {
+            std::unique_lock<std::mutex> lck(g_mtx);
+            g_finished.notify_one();
+            break;
+        }
+
+        vector<MQMessage> msgs;
+        MQMessage msg1(info->topic, "*", info->body);
+        msg1.setProperty("property1", "value1");
+        MQMessage msg2(info->topic, "*", info->body);
+        msg2.setProperty("property1", "value1");
+        msg2.setProperty("property2", "value2");
+        MQMessage msg3(info->topic, "*", info->body);
+        msg3.setProperty("property1", "value1");
+        msg3.setProperty("property2", "value2");
+        msg3.setProperty("property3", "value3");
+        msgs.push_back(msg1);
+        msgs.push_back(msg2);
+        msgs.push_back(msg3);
+        try {
+            auto start = std::chrono::system_clock::now();
+            SendResult sendResult = producer->send(msgs);
+            g_tps.Increment();
+            --g_msgCount;
+            auto end = std::chrono::system_clock::now();
+            auto duration =
+                    std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
+            if (duration.count() >= 500) {
+                std::cout << "send RT more than: " << duration.count()
+                          << " ms with msgid: " << sendResult.getMsgId() << endl;
+            }
+        } catch (const MQException& e) {
+            std::cout << "send failed: " << e.what() << std::endl;
+            std::unique_lock<std::mutex> lck(g_mtx);
+            g_finished.notify_one();
+            return;
+        }
+    }
+}
+
+int main(int argc, char* argv[]) {
+    RocketmqSendAndConsumerArgs info;
+    if (!ParseArgs(argc, argv, &info)) {
+        exit(-1);
+    }
+    PrintRocketmqSendAndConsumerArgs(info);
+    DefaultMQProducer producer("please_rename_unique_group_name");
+    producer.setNamesrvAddr(info.namesrv);
+    producer.setNamesrvDomain(info.namesrv_domain);
+    producer.setGroupName(info.groupname);
+    producer.setInstanceName(info.groupname);
+    producer.setSessionCredentials("mq acesskey", "mq secretkey", "ALIYUN");
+    producer.setSendMsgTimeout(500);
+    producer.setTcpTransportTryLockTimeout(1000);
+    producer.setTcpTransportConnectTimeout(400);
+
+    producer.start();
+    std::vector<std::shared_ptr<std::thread>> work_pool;
+    auto start = std::chrono::system_clock::now();
+    int msgcount = g_msgCount.load();
+    g_tps.start();
+
+    int threadCount = info.thread_count;
+    for (int j = 0; j < threadCount; j++) {
+        std::shared_ptr<std::thread> th =
+                std::make_shared<std::thread>(SyncProducerWorker, &info, &producer);
+        work_pool.push_back(th);
+    }
+
+    {
+        std::unique_lock<std::mutex> lck(g_mtx);
+        g_finished.wait(lck);
+        g_quit.store(true);
+    }
+
+    auto end = std::chrono::system_clock::now();
+    auto duration =
+            std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
+
+    std::cout
+            << "per msg time: " << duration.count() / (double)msgcount << "ms \n"
+            << "========================finished==============================\n";
+
+    for (size_t th = 0; th != work_pool.size(); ++th) {
+        work_pool[th]->join();
+    }
+
+    producer.shutdown();
+
+    return 0;
+}
diff --git a/example/CMakeLists.txt b/example/CMakeLists.txt
index 90c9442..aecec5c 100755
--- a/example/CMakeLists.txt
+++ b/example/CMakeLists.txt
@@ -41,18 +41,18 @@ foreach(file ${files})
     endif()
 
     if (MSVC)
-    	if (BUILD_ROCKETMQ_SHARED)
-        	target_link_libraries (${basename}  rocketmq_shared ${deplibs}
-        	${Boost_LIBRARIES} ${LIBEVENT_LIBRARIES} ${JSONCPP_LIBRARIES})
-    	else()
-        	target_link_libraries (${basename}  rocketmq_static ${deplibs}
-        	${Boost_LIBRARIES} ${LIBEVENT_LIBRARIES} ${JSONCPP_LIBRARIES})
+        if (BUILD_ROCKETMQ_SHARED)
+            target_link_libraries (${basename}  rocketmq_shared ${deplibs}
+            ${Boost_LIBRARIES} ${LIBEVENT_LIBRARIES} ${JSONCPP_LIBRARIES})
+        else()
+            target_link_libraries (${basename}  rocketmq_static ${deplibs}
+            ${Boost_LIBRARIES} ${LIBEVENT_LIBRARIES} ${JSONCPP_LIBRARIES})
         endif()
     else()
         if (BUILD_ROCKETMQ_SHARED)
-    	    target_link_libraries (${basename} rocketmq_shared)
+            target_link_libraries (${basename} rocketmq_shared)
         else()
-    	    target_link_libraries (${basename} rocketmq_static)
+            target_link_libraries (${basename} rocketmq_static)
         endif()
     endif()
 
diff --git a/include/BatchMessage.h b/include/BatchMessage.h
new file mode 100644
index 0000000..9c52d2d
--- /dev/null
+++ b/include/BatchMessage.h
@@ -0,0 +1,12 @@
+#ifndef __BATCHMESSAGE_H__
+#define __BATCHMESSAGE_H__
+#include "MQMessage.h"
+#include <string>
+namespace rocketmq {
+    class BatchMessage : public MQMessage {
+    public:
+        static std::string encode(std::vector <MQMessage> &msgs);
+        static std::string encode(MQMessage &message);
+    };
+}
+#endif
\ No newline at end of file
diff --git a/include/DefaultMQProducer.h b/include/DefaultMQProducer.h
index 02f5651..17be13f 100755
--- a/include/DefaultMQProducer.h
+++ b/include/DefaultMQProducer.h
@@ -22,6 +22,7 @@
 #include "MQProducer.h"
 #include "RocketMQClient.h"
 #include "SendResult.h"
+#include "BatchMessage.h"
 
 namespace rocketmq {
 //<!***************************************************************************
@@ -43,6 +44,8 @@ class ROCKETMQCLIENT_API DefaultMQProducer : public MQProducer {
   virtual SendResult send(MQMessage& msg, MessageQueueSelector* selector,
                           void* arg, int autoRetryTimes,
                           bool bActiveBroker = false);
+  virtual SendResult send(std::vector<MQMessage>& msgs);
+  virtual SendResult send(std::vector<MQMessage>& msgs, const MQMessageQueue& mq);
   virtual void send(MQMessage& msg, SendCallback* pSendCallback,
                     bool bSelectActiveBroker = false);
   virtual void send(MQMessage& msg, const MQMessageQueue& mq,
@@ -95,6 +98,7 @@ class ROCKETMQCLIENT_API DefaultMQProducer : public MQProducer {
   SendResult sendKernelImpl(MQMessage& msg, const MQMessageQueue& mq,
                             int communicationMode, SendCallback* pSendCallback);
   bool tryToCompressMessage(MQMessage& msg);
+  BatchMessage buildBatchMessage(std::vector<MQMessage>& msgs);
 
  private:
   int m_sendMsgTimeout;
diff --git a/include/MQMessage.h b/include/MQMessage.h
index ef2ba10..e6f2298 100755
--- a/include/MQMessage.h
+++ b/include/MQMessage.h
@@ -26,113 +26,108 @@
 namespace rocketmq {
 //<!***************************************************************************
 class ROCKETMQCLIENT_API MQMessage {
-   public:
-    MQMessage();
-    MQMessage(const std::string &topic, const std::string &body);
-    MQMessage(const std::string &topic, const std::string &tags, const std::string &body);
-    MQMessage(const std::string &topic, const std::string &tags, const std::string &keys, const std::string &body);
-    MQMessage(const std::string &topic,
-              const std::string &tags,
-              const std::string &keys,
-              const int flag,
-              const std::string &body,
-              bool waitStoreMsgOK);
-
-    virtual ~MQMessage();
-    MQMessage(const MQMessage &other);
-    MQMessage &operator=(const MQMessage &other);
-
-    void setProperty(const std::string &name, const std::string &value);
-    const std::string &getProperty(const std::string &name) const;
-
-    const std::string &getTopic() const;
-    void setTopic(const std::string &topic);
-    void setTopic(const char *body, int len);
-
-    const std::string &getTags() const;
-    void setTags(const std::string &tags);
-
-    const std::string &getKeys() const;
-    void setKeys(const std::string &keys);
-    void setKeys(const std::vector<std::string> &keys);
-
-    int getDelayTimeLevel() const;
-    void setDelayTimeLevel(int level);
-
-    bool isWaitStoreMsgOK();
-    void setWaitStoreMsgOK(bool waitStoreMsgOK);
-
-    int getFlag() const;
-    void setFlag(int flag);
-
-    int getSysFlag() const;
-    void setSysFlag(int sysFlag);
-
-    const std::string &getBody() const;
-
-    void setBody(const char *body, int len);
-    void setBody(const std::string &body);
-
-    std::map<std::string, std::string> getProperties() const;
-    void setProperties(std::map<std::string, std::string> &properties);
-
-    const std::string toString() const {
-        std::stringstream ss;
-        std::string tags = getTags();
-        ss << "Message [topic=" << m_topic << ", flag=" << m_flag << ", tag=" << tags << "]";
-        return ss.str();
-    }
-
-   protected:
-    friend class MQDecoder;
-    void setPropertyInternal(const std::string &name, const std::string &value);
-    void setPropertiesInternal(std::map<std::string, std::string> &properties);
-
-    void Init(const std::string &topic,
-              const std::string &tags,
-              const std::string &keys,
-              const int flag,
-              const std::string &body,
-              bool waitStoreMsgOK);
-
-   public:
-    static const std::string PROPERTY_KEYS;
-    static const std::string PROPERTY_TAGS;
-    static const std::string PROPERTY_WAIT_STORE_MSG_OK;
-    static const std::string PROPERTY_DELAY_TIME_LEVEL;
-    static const std::string PROPERTY_RETRY_TOPIC;
-    static const std::string PROPERTY_REAL_TOPIC;
-    static const std::string PROPERTY_REAL_QUEUE_ID;
-    static const std::string PROPERTY_TRANSACTION_PREPARED;
-    static const std::string PROPERTY_PRODUCER_GROUP;
-    static const std::string PROPERTY_MIN_OFFSET;
-    static const std::string PROPERTY_MAX_OFFSET;
-
-    static const std::string PROPERTY_BUYER_ID;
-    static const std::string PROPERTY_ORIGIN_MESSAGE_ID;
-    static const std::string PROPERTY_TRANSFER_FLAG;
-    static const std::string PROPERTY_CORRECTION_FLAG;
-    static const std::string PROPERTY_MQ2_FLAG;
-    static const std::string PROPERTY_RECONSUME_TIME;
-    static const std::string PROPERTY_MSG_REGION;
-    static const std::string PROPERTY_TRACE_SWITCH;
-    static const std::string PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX;
-    static const std::string PROPERTY_MAX_RECONSUME_TIMES;
-    static const std::string PROPERTY_CONSUME_START_TIMESTAMP;
-    static const std::string PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET;
-    static const std::string PROPERTY_TRANSACTION_CHECK_TIMES;
-    static const std::string PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS;
-
-    static const std::string KEY_SEPARATOR;
-
-   protected:
-    int m_sysFlag;
-
-   private:
-    std::string m_topic;
-    int m_flag;
-    std::string m_body;
-    std::map<std::string, std::string> m_properties;
+
+ public:
+  MQMessage();
+  MQMessage(const std::string& topic, const std::string& body);
+  MQMessage(const std::string& topic, const std::string& tags, const std::string& body);
+  MQMessage(const std::string& topic, const std::string& tags, const std::string& keys,
+            const std::string& body);
+  MQMessage(const std::string& topic, const std::string& tags, const std::string& keys,
+            const int flag, const std::string& body, bool waitStoreMsgOK);
+
+  virtual ~MQMessage();
+  MQMessage(const MQMessage& other);
+  MQMessage& operator=(const MQMessage& other);
+
+  void setProperty(const std::string& name, const std::string& value) ;
+  const std::string & getProperty(const std::string& name) const;
+
+  const std::string &getTopic() const;
+  void setTopic(const std::string& topic);
+  void setTopic(const char* body, int len);
+
+  const std::string &getTags() const;
+  void setTags(const std::string& tags);
+
+  const std::string &getKeys() const;
+  void setKeys(const std::string& keys);
+  void setKeys(const std::vector<std::string>& keys);
+
+  int getDelayTimeLevel() const;
+  void setDelayTimeLevel(int level);
+
+  bool isWaitStoreMsgOK() const;
+  void setWaitStoreMsgOK(bool waitStoreMsgOK);
+
+  int getFlag() const;
+  void setFlag(int flag);
+
+  int getSysFlag() const;
+  void setSysFlag(int sysFlag);
+
+  const std::string &getBody() const;
+
+  void setBody(const char* body, int len);
+  void setBody(const std::string& body);
+
+  std::map<std::string, std::string> getProperties() const;
+  void setProperties(std::map<std::string, std::string>& properties);
+
+  const std::string toString() const {
+    std::stringstream ss;
+    std::string tags = getTags();
+    ss << "Message [topic=" << m_topic << ", flag=" << m_flag
+       << ", tag=" << tags << "]";
+    return ss.str();
+  }
+
+ protected:
+  friend class MQDecoder;
+  void setPropertyInternal(const std::string& name, const std::string& value);
+  void setPropertiesInternal(std::map<std::string, std::string>& properties);
+
+  void Init(const std::string& topic, const std::string& tags, const std::string& keys,
+            const int flag, const std::string& body, bool waitStoreMsgOK);
+
+ public:
+  static const std::string PROPERTY_KEYS;
+  static const std::string PROPERTY_TAGS;
+  static const std::string PROPERTY_WAIT_STORE_MSG_OK;
+  static const std::string PROPERTY_DELAY_TIME_LEVEL;
+  static const std::string PROPERTY_RETRY_TOPIC;
+  static const std::string PROPERTY_REAL_TOPIC;
+  static const std::string PROPERTY_REAL_QUEUE_ID;
+  static const std::string PROPERTY_TRANSACTION_PREPARED;
+  static const std::string PROPERTY_PRODUCER_GROUP;
+  static const std::string PROPERTY_MIN_OFFSET;
+  static const std::string PROPERTY_MAX_OFFSET;
+  
+  static const std::string PROPERTY_BUYER_ID;
+  static const std::string PROPERTY_ORIGIN_MESSAGE_ID;
+  static const std::string PROPERTY_TRANSFER_FLAG;
+  static const std::string PROPERTY_CORRECTION_FLAG;
+  static const std::string PROPERTY_MQ2_FLAG;
+  static const std::string PROPERTY_RECONSUME_TIME;
+  static const std::string PROPERTY_MSG_REGION;
+  static const std::string PROPERTY_TRACE_SWITCH;
+  static const std::string PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX;
+  static const std::string PROPERTY_MAX_RECONSUME_TIMES;
+  static const std::string PROPERTY_CONSUME_START_TIMESTAMP;
+  static const std::string PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET;
+  static const std::string PROPERTY_TRANSACTION_CHECK_TIMES;
+  static const std::string PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS;
+
+  static const std::string KEY_SEPARATOR;
+
+ protected:
+  int m_sysFlag;
+
+ private:
+  std::string m_topic;
+  int m_flag;
+  std::string m_body;
+  std::map<std::string, std::string> m_properties;
 };
 //<!***************************************************************************
 }  // namespace rocketmq
diff --git a/include/MQProducer.h b/include/MQProducer.h
index e5df9ee..2673931 100755
--- a/include/MQProducer.h
+++ b/include/MQProducer.h
@@ -52,6 +52,8 @@ class ROCKETMQCLIENT_API MQProducer : public MQClient {
                     SendCallback* sendCallback) = 0;
   virtual void send(MQMessage& msg, MessageQueueSelector* selector, void* arg,
                     SendCallback* sendCallback) = 0;
+  virtual SendResult send(std::vector<MQMessage>& msgs) = 0;
+  virtual SendResult send(std::vector<MQMessage>& msgs, const MQMessageQueue& mq) = 0;
   virtual void sendOneway(MQMessage& msg, bool bSelectActiveBroker = false) = 0;
   virtual void sendOneway(MQMessage& msg, const MQMessageQueue& mq) = 0;
   virtual void sendOneway(MQMessage& msg, MessageQueueSelector* selector,
diff --git a/include/SendResult.h b/include/SendResult.h
index 23a2dfc..239f07a 100755
--- a/include/SendResult.h
+++ b/include/SendResult.h
@@ -34,7 +34,7 @@ enum SendStatus {
 class ROCKETMQCLIENT_API SendResult {
  public:
   SendResult();
-  SendResult(const SendStatus& sendStatus, const std::string& msgId, const std::string& offsetMsgId, 
+  SendResult(const SendStatus& sendStatus, const std::string& msgId, const std::string& offsetMsgId,
              const MQMessageQueue& messageQueue, int64 queueOffset);
 
   virtual ~SendResult();
diff --git a/src/MQClientAPIImpl.cpp b/src/MQClientAPIImpl.cpp
index 045bfab..8ec426d 100644
--- a/src/MQClientAPIImpl.cpp
+++ b/src/MQClientAPIImpl.cpp
@@ -422,25 +422,25 @@ void MQClientAPIImpl::sendMessageAsync(const string& addr,
   if (m_pRemotingClient->invokeAsync(addr, request, cbw, timeoutMilliseconds, maxRetryTimes, retrySendTimes) ==
     false) {
     LOG_WARN("invokeAsync failed to addr:%s,topic:%s, timeout:%lld, maxRetryTimes:%d, retrySendTimes:%d", 
-	  addr.c_str(), msg.getTopic().data(), timeoutMilliseconds, maxRetryTimes, retrySendTimes);
-	  //when getTcp return false, need consider retrySendTimes
-	  int retry_time = retrySendTimes + 1;
-	  int64 time_out = timeoutMilliseconds - (UtilAll::currentTimeMillis() - begin_time);
-	  while (retry_time < maxRetryTimes && time_out > 0) {
-		  begin_time = UtilAll::currentTimeMillis();
-		  if (m_pRemotingClient->invokeAsync(addr, request, cbw, time_out, maxRetryTimes, retry_time) == false) {
-		    retry_time += 1;
-		    time_out = time_out - (UtilAll::currentTimeMillis() - begin_time);
-			  LOG_WARN("invokeAsync retry failed to addr:%s,topic:%s, timeout:%lld, maxRetryTimes:%d, retrySendTimes:%d", 
-				addr.c_str(), msg.getTopic().data(), time_out, maxRetryTimes, retry_time);
-			  continue;
-		  } else {
-			  return; //invokeAsync success
-		  }
-	  }
+      addr.c_str(), msg.getTopic().data(), timeoutMilliseconds, maxRetryTimes, retrySendTimes);
+      //when getTcp return false, need consider retrySendTimes
+      int retry_time = retrySendTimes + 1;
+      int64 time_out = timeoutMilliseconds - (UtilAll::currentTimeMillis() - begin_time);
+      while (retry_time < maxRetryTimes && time_out > 0) {
+          begin_time = UtilAll::currentTimeMillis();
+          if (m_pRemotingClient->invokeAsync(addr, request, cbw, time_out, maxRetryTimes, retry_time) == false) {
+              retry_time += 1;
+              time_out = time_out - (UtilAll::currentTimeMillis() - begin_time);
+              LOG_WARN("invokeAsync retry failed to addr:%s,topic:%s, timeout:%lld, maxRetryTimes:%d, retrySendTimes:%d", 
+                addr.c_str(), msg.getTopic().data(), time_out, maxRetryTimes, retry_time);
+              continue;
+          } else {
+              return; //invokeAsync success
+          }
+      }
 
     LOG_ERROR("sendMessageAsync failed to addr:%s,topic:%s, timeout:%lld, maxRetryTimes:%d, retrySendTimes:%d", 
-	  addr.c_str(), msg.getTopic().data(), time_out, maxRetryTimes, retrySendTimes);
+      addr.c_str(), msg.getTopic().data(), time_out, maxRetryTimes, retrySendTimes);
 
     if (cbw) {
       cbw->onException();
@@ -554,7 +554,7 @@ SendResult MQClientAPIImpl::processSendResponse(const string& brokerName,
         (SendMessageResponseHeader*)pResponse->getCommandHeader();
     MQMessageQueue messageQueue(msg.getTopic(), brokerName,
                                 responseHeader->queueId);
-	string unique_msgId = msg.getProperty(MQMessage::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
+    string unique_msgId = msg.getProperty(MQMessage::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
     return SendResult(sendStatus, unique_msgId, responseHeader->msgId, messageQueue,
                       responseHeader->queueOffset);
   }
diff --git a/src/common/AsyncCallbackWrap.cpp b/src/common/AsyncCallbackWrap.cpp
index 42ce04e..b0ffa04 100755
--- a/src/common/AsyncCallbackWrap.cpp
+++ b/src/common/AsyncCallbackWrap.cpp
@@ -88,42 +88,42 @@ void SendCallbackWrap::operationComplete(ResponseFuture* pResponseFuture,
     try {
         SendResult ret = m_pClientAPI->processSendResponse(m_brokerName, m_msg, pResponse.get());
         if (pCallback) { 
-			LOG_DEBUG("operationComplete: processSendResponse success, opaque:%d, maxRetryTime:%d, retrySendTimes:%d", opaque, pResponseFuture->getMaxRetrySendTimes(), pResponseFuture->getRetrySendTimes());
-			pCallback->onSuccess(ret); 
-		}
+            LOG_DEBUG("operationComplete: processSendResponse success, opaque:%d, maxRetryTime:%d, retrySendTimes:%d", opaque, pResponseFuture->getMaxRetrySendTimes(), pResponseFuture->getRetrySendTimes());
+            pCallback->onSuccess(ret); 
+        }
     } catch (MQException& e) {
         LOG_ERROR("operationComplete: processSendResponse exception: %s", e.what());
 
-		//broker may return exception, need consider retry send
-		int maxRetryTimes = pResponseFuture->getMaxRetrySendTimes();
-		int retryTimes = pResponseFuture->getRetrySendTimes();
-		if (pResponseFuture->getASyncFlag() && retryTimes < maxRetryTimes && maxRetryTimes > 1) {
-
-			int64 left_timeout_ms = pResponseFuture->leftTime(); 
-			string brokerAddr = pResponseFuture->getBrokerAddr();
-			const RemotingCommand& requestCommand = pResponseFuture->getRequestCommand();
-			retryTimes += 1;
-			LOG_WARN("retry send, opaque:%d, sendTimes:%d, maxRetryTimes:%d, left_timeout:%lld, brokerAddr:%s, msg:%s", 
-					opaque, retryTimes, maxRetryTimes, left_timeout_ms, brokerAddr.data(), m_msg.toString().data());
-
-			bool exception_flag = false;
-			try {
-				m_pClientAPI->sendMessageAsync(pResponseFuture->getBrokerAddr(), m_brokerName, m_msg, (RemotingCommand&)requestCommand, pCallback, left_timeout_ms, maxRetryTimes, retryTimes);
-			} catch (MQClientException& e) {
-				LOG_ERROR("retry send exception:%s, opaque:%d, retryTimes:%d, msg:%s, not retry send again", e.what(), opaque, retryTimes, m_msg.toString().data());
-				exception_flag = true;
-			}
-
-			if (exception_flag == false) {
-				return; //send retry again, here need return
-			}
-		}
-	  
-		if (pCallback) {
-			MQException exception("process send response error", -1, __FILE__,
-			                      __LINE__);
-			pCallback->onException(exception);
-		}
+        //broker may return exception, need consider retry send
+        int maxRetryTimes = pResponseFuture->getMaxRetrySendTimes();
+        int retryTimes = pResponseFuture->getRetrySendTimes();
+        if (pResponseFuture->getASyncFlag() && retryTimes < maxRetryTimes && maxRetryTimes > 1) {
+
+            int64 left_timeout_ms = pResponseFuture->leftTime(); 
+            string brokerAddr = pResponseFuture->getBrokerAddr();
+            const RemotingCommand& requestCommand = pResponseFuture->getRequestCommand();
+            retryTimes += 1;
+            LOG_WARN("retry send, opaque:%d, sendTimes:%d, maxRetryTimes:%d, left_timeout:%lld, brokerAddr:%s, msg:%s", 
+                    opaque, retryTimes, maxRetryTimes, left_timeout_ms, brokerAddr.data(), m_msg.toString().data());
+
+            bool exception_flag = false;
+            try {
+                m_pClientAPI->sendMessageAsync(pResponseFuture->getBrokerAddr(), m_brokerName, m_msg, (RemotingCommand&)requestCommand, pCallback, left_timeout_ms, maxRetryTimes, retryTimes);
+            } catch (MQClientException& e) {
+                LOG_ERROR("retry send exception:%s, opaque:%d, retryTimes:%d, msg:%s, not retry send again", e.what(), opaque, retryTimes, m_msg.toString().data());
+                exception_flag = true;
+            }
+
+            if (exception_flag == false) {
+                return; //send retry again, here need return
+            }
+        }
+      
+        if (pCallback) {
+            MQException exception("process send response error", -1, __FILE__,
+                                  __LINE__);
+            pCallback->onException(exception);
+        }
     }
   }
   if (pCallback && pCallback->getSendCallbackType() == autoDeleteSendCallback) {
diff --git a/src/message/BatchMessage.cpp b/src/message/BatchMessage.cpp
new file mode 100644
index 0000000..c2b9ec2
--- /dev/null
+++ b/src/message/BatchMessage.cpp
@@ -0,0 +1,45 @@
+#include "BatchMessage.h"
+#include "MQDecoder.h"
+#include "StringIdMaker.h"
+
+using namespace std;
+namespace rocketmq {
+
+    std::string BatchMessage::encode(std::vector<MQMessage> &msgs) {
+        string encodedBody;
+        for (auto message : msgs) {
+            string unique_id = StringIdMaker::get_mutable_instance().get_unique_id();
+            message.setProperty(MQMessage::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, unique_id);
+            encodedBody.append(encode(message));
+        }
+        return encodedBody;
+    }
+
+    std::string BatchMessage::encode(MQMessage &message) {
+        string encodeMsg;
+        const string &body = message.getBody();
+        int bodyLen = body.length();
+        string properties = MQDecoder::messageProperties2String(message.getProperties());
+        short propertiesLength = (short) properties.length();
+        int storeSize = 20 + bodyLen + 2 + propertiesLength;
+        //TOTALSIZE|MAGICCOD|BODYCRC|FLAG|BODYLen|Body|propertiesLength|properties
+        int magicCode = 0;
+        int bodyCrc = 0;
+        int flag = message.getFlag();
+        int storeSize_net = htonl(storeSize);
+        int magicCode_net = htonl(magicCode);
+        int bodyCrc_net = htonl(bodyCrc);
+        int flag_net = htonl(flag);
+        int bodyLen_net = htonl(bodyLen);
+        int propertiesLength_net = htons(propertiesLength);
+        encodeMsg.append((char*)&storeSize_net, sizeof(int));
+        encodeMsg.append((char*)&magicCode_net, sizeof(int));
+        encodeMsg.append((char*)&bodyCrc_net, sizeof(int));
+        encodeMsg.append((char*)&flag_net, sizeof(int));
+        encodeMsg.append((char*)&bodyLen_net, sizeof(int));
+        encodeMsg.append(body.c_str(), body.length());
+        encodeMsg.append((char*)&propertiesLength_net, sizeof(short));
+        encodeMsg.append(properties.c_str(), propertiesLength);
+        return encodeMsg;
+    }
+}
diff --git a/src/message/MQMessage.cpp b/src/message/MQMessage.cpp
index cc5d16f..e816879 100755
--- a/src/message/MQMessage.cpp
+++ b/src/message/MQMessage.cpp
@@ -173,7 +173,7 @@ void MQMessage::setDelayTimeLevel(int level) {
   setPropertyInternal(PROPERTY_DELAY_TIME_LEVEL, tmp);
 }
 
-bool MQMessage::isWaitStoreMsgOK() {
+bool MQMessage::isWaitStoreMsgOK() const {
   string tmp = getProperty(PROPERTY_WAIT_STORE_MSG_OK);
   if (tmp.empty()) {
     return true;
diff --git a/src/producer/DefaultMQProducer.cpp b/src/producer/DefaultMQProducer.cpp
index 4edf811..b7c3695 100755
--- a/src/producer/DefaultMQProducer.cpp
+++ b/src/producer/DefaultMQProducer.cpp
@@ -29,6 +29,8 @@
 #include "TopicPublishInfo.h"
 #include "Validators.h"
 #include "StringIdMaker.h"
+#include "BatchMessage.h"
+#include <typeinfo>
 
 namespace rocketmq {
 
@@ -127,6 +129,68 @@ void DefaultMQProducer::send(MQMessage& msg, SendCallback* pSendCallback,
   }
 }
 
+SendResult DefaultMQProducer::send(std::vector<MQMessage>& msgs) {
+  SendResult result;
+  try {
+    BatchMessage batchMessage = buildBatchMessage(msgs);
+    result = sendDefaultImpl(batchMessage, ComMode_SYNC, NULL);
+  } catch (MQException& e) {
+    LOG_ERROR(e.what());
+    throw e;
+  }
+  return result;
+}
+
+SendResult DefaultMQProducer::send(std::vector<MQMessage>& msgs, const MQMessageQueue& mq) {
+  SendResult result;
+  try {
+    BatchMessage batchMessage = buildBatchMessage(msgs);
+    result = sendKernelImpl(batchMessage, mq, ComMode_SYNC, NULL);
+  } catch (MQException& e) {
+    LOG_ERROR(e.what());
+    throw e;
+  }
+  return result;
+}
+
+BatchMessage DefaultMQProducer::buildBatchMessage(std::vector<MQMessage>& msgs) {
+
+  if (msgs.size() < 1) {
+    THROW_MQEXCEPTION(MQClientException, "msgs need one message at least", -1);
+  }
+  BatchMessage batchMessage;
+  bool firstFlag = true;
+  string topic;
+  bool waitStoreMsgOK = false;
+  for (auto& msg : msgs) {
+    Validators::checkMessage(msg, getMaxMessageSize());
+    if (firstFlag) {
+      topic = msg.getTopic();
+      waitStoreMsgOK = msg.isWaitStoreMsgOK();
+      firstFlag = false;
+
+      if (UtilAll::startsWith_retry(topic)) {
+        THROW_MQEXCEPTION(MQClientException, "Retry Group is not supported for batching", -1);
+      }
+    } else {
+
+      if (msg.getDelayTimeLevel() > 0) {
+        THROW_MQEXCEPTION(MQClientException, "TimeDelayLevel in not supported for batching", -1);
+      }
+      if (msg.getTopic() != topic) {
+        THROW_MQEXCEPTION(MQClientException, "msgs need one message at least", -1);
+      }
+      if (msg.isWaitStoreMsgOK() != waitStoreMsgOK) {
+        THROW_MQEXCEPTION(MQClientException, "msgs need one message at least", -2);
+      }
+    }
+  }
+  batchMessage.setBody(BatchMessage::encode(msgs));
+  batchMessage.setTopic(topic);
+  batchMessage.setWaitStoreMsgOK(waitStoreMsgOK);
+  return batchMessage;
+}
+
 SendResult DefaultMQProducer::send(MQMessage& msg, const MQMessageQueue& mq) {
   Validators::checkMessage(msg, getMaxMessageSize());
   if (msg.getTopic() != mq.getTopic()) {
@@ -336,9 +400,13 @@ SendResult DefaultMQProducer::sendKernelImpl(MQMessage& msg,
 
   if (!brokerAddr.empty()) {
     try {
-	  //msgId is produced by client, offsetMsgId produced by broker. (same with java sdk)
-	  string unique_id = StringIdMaker::get_mutable_instance().get_unique_id();
-	  msg.setProperty(MQMessage::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, unique_id);
+      BatchMessage batchMessage;
+      bool isBatchMsg = (typeid(msg).name() == typeid(batchMessage).name());
+      //msgId is produced by client, offsetMsgId produced by broker. (same with java sdk)
+      if (!isBatchMsg) {
+        string unique_id = StringIdMaker::get_mutable_instance().get_unique_id();
+        msg.setProperty(MQMessage::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, unique_id);
+      }
 
       LOG_DEBUG("produce before:%s to %s", msg.toString().c_str(), mq.toString().c_str());
 
@@ -353,6 +421,7 @@ SendResult DefaultMQProducer::sendKernelImpl(MQMessage& msg,
       requestHeader->sysFlag = (msg.getSysFlag());
       requestHeader->bornTimestamp = UtilAll::currentTimeMillis();
       requestHeader->flag = (msg.getFlag());
+      requestHeader->batch = isBatchMsg;
       requestHeader->properties =
           (MQDecoder::messageProperties2String(msg.getProperties()));
 
@@ -505,7 +574,7 @@ void DefaultMQProducer::setRetryTimes4Async(int times)
 {
   if (times <= 0) {
     LOG_WARN("set retry times illegal, use default value:1");
-	m_retryTimes4Async = 1;
+    m_retryTimes4Async = 1;
     return;
   }
 
diff --git a/src/producer/SendResult.cpp b/src/producer/SendResult.cpp
index 7bc31ca..9eabf5d 100755
--- a/src/producer/SendResult.cpp
+++ b/src/producer/SendResult.cpp
@@ -42,7 +42,7 @@ SendResult& SendResult::operator=(const SendResult& other) {
   if (this != &other) {
     m_sendStatus = other.m_sendStatus;
     m_msgId = other.m_msgId;
-	m_offsetMsgId = other.m_offsetMsgId;
+    m_offsetMsgId = other.m_offsetMsgId;
     m_messageQueue = other.m_messageQueue;
     m_queueOffset = other.m_queueOffset;
   }
diff --git a/src/producer/StringIdMaker.cpp b/src/producer/StringIdMaker.cpp
index 23abd08..06bd872 100644
--- a/src/producer/StringIdMaker.cpp
+++ b/src/producer/StringIdMaker.cpp
@@ -21,21 +21,21 @@ namespace rocketmq {
 #ifdef WIN32
 int gettimeofdayWin(struct timeval *tp, void *tzp)
 {
-	time_t clock;
-	struct tm tm;
-	SYSTEMTIME wtm;
-	GetLocalTime(&wtm);
-	tm.tm_year	= wtm.wYear - 1900;
-	tm.tm_mon = wtm.wMonth - 1;
-	tm.tm_mday = wtm.wDay;
-	tm.tm_hour = wtm.wHour;
-	tm.tm_min = wtm.wMinute;
-	tm.tm_sec = wtm.wSecond;
-	tm. tm_isdst = -1;
-	clock = mktime(&tm);
-	tp->tv_sec = clock;
-	tp->tv_usec = wtm.wMilliseconds * 1000;
-	return (0);
+  time_t clock;
+  struct tm tm;
+  SYSTEMTIME wtm;
+  GetLocalTime(&wtm);
+  tm.tm_year	= wtm.wYear - 1900;
+  tm.tm_mon = wtm.wMonth - 1;
+  tm.tm_mday = wtm.wDay;
+  tm.tm_hour = wtm.wHour;
+  tm.tm_min = wtm.wMinute;
+  tm.tm_sec = wtm.wSecond;
+  tm.tm_isdst = -1;
+  clock = mktime(&tm);
+  tp->tv_sec = clock;
+  tp->tv_usec = wtm.wMilliseconds * 1000;
+  return (0);
 }
 #endif
 
@@ -115,7 +115,7 @@ uint64_t StringIdMaker::get_curr_ms() {
 #ifdef  WIN32
   gettimeofdayWin(&time_now, NULL); //  WIN32
 #else
-  gettimeofday(&time_now, NULL);	//LINUX
+  gettimeofday(&time_now, NULL);    //LINUX
 #endif
 
   uint64_t ms_time = time_now.tv_sec * 1000 + time_now.tv_usec / 1000;
@@ -155,7 +155,7 @@ void StringIdMaker::set_start_and_next_tm() {
 
 int StringIdMaker::atomic_incr(int id) {
   #ifdef WIN32
-	  InterlockedIncrement((LONG*)&id);
+    InterlockedIncrement((LONG*)&id);
   #else
     __sync_add_and_fetch(&id, 1);
   #endif
diff --git a/src/protocol/CommandHeader.cpp b/src/protocol/CommandHeader.cpp
index 366ac2e..6290ac7 100644
--- a/src/protocol/CommandHeader.cpp
+++ b/src/protocol/CommandHeader.cpp
@@ -80,6 +80,7 @@ void SendMessageRequestHeader::Encode(Json::Value& outData) {
   outData["reconsumeTimes"] = UtilAll::to_string(reconsumeTimes);
   outData["unitMode"] = UtilAll::to_string(unitMode);
 #endif
+  outData["batch"] = batch;
 }
 
 int SendMessageRequestHeader::getReconsumeTimes() { return reconsumeTimes; }
@@ -121,6 +122,7 @@ void SendMessageRequestHeader::SetDeclaredFieldOfCommandHeader(
   requestMap.insert(
       pair<string, string>("unitMode", UtilAll::to_string(unitMode)));
 #endif
+  requestMap.insert(pair<string, string>("batch", UtilAll::to_string(batch)));
 }
 
 //<!************************************************************************
diff --git a/src/protocol/CommandHeader.h b/src/protocol/CommandHeader.h
index 5a55c55..b5ffe14 100644
--- a/src/protocol/CommandHeader.h
+++ b/src/protocol/CommandHeader.h
@@ -90,7 +90,8 @@ class SendMessageRequestHeader : public CommandHeader {
         bornTimestamp(0),
         flag(0),
         reconsumeTimes(0),
-        unitMode(false) {}
+        unitMode(false),
+        batch(false){}
   virtual ~SendMessageRequestHeader() {}
   virtual void Encode(Json::Value& outData);
   virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap);
@@ -109,6 +110,7 @@ class SendMessageRequestHeader : public CommandHeader {
   string properties;
   int reconsumeTimes;
   bool unitMode;
+  bool batch;
 };
 
 //<!************************************************************************
diff --git a/src/protocol/ConsumerRunningInfo.cpp b/src/protocol/ConsumerRunningInfo.cpp
index f24b3fa..ab7f389 100755
--- a/src/protocol/ConsumerRunningInfo.cpp
+++ b/src/protocol/ConsumerRunningInfo.cpp
@@ -60,7 +60,7 @@ void ConsumerRunningInfo::setStatusTable(const map<string, ConsumeStatus>&
 input_statusTable)
 {
 statusTable = input_statusTable;
-}	*/
+}    */
 
 const vector<SubscriptionData> ConsumerRunningInfo::getSubscriptionSet() const {
   return subscriptionSet;
diff --git a/src/transport/ResponseFuture.cpp b/src/transport/ResponseFuture.cpp
index 9307616..a304e3f 100755
--- a/src/transport/ResponseFuture.cpp
+++ b/src/transport/ResponseFuture.cpp
@@ -152,8 +152,8 @@ void ResponseFuture::executeInvokeCallbackException() {
   } else {
     if (m_asyncCallbackStatus == asyncCallBackStatus_timeout) {
 
-	//here no need retrySendTimes process because of it have timeout
-	LOG_ERROR("send msg, callback timeout, opaque:%d, sendTimes:%d, maxRetryTimes:%d", getOpaque(), getRetrySendTimes(), getMaxRetrySendTimes());
+    //here no need retrySendTimes process because of it have timeout
+    LOG_ERROR("send msg, callback timeout, opaque:%d, sendTimes:%d, maxRetryTimes:%d", getOpaque(), getRetrySendTimes(), getMaxRetrySendTimes());
 
       m_pCallbackWrap->onException();
     } else {
@@ -172,36 +172,36 @@ bool ResponseFuture::isTimeOut() const {
 }
 
 int ResponseFuture::getMaxRetrySendTimes() const {
-	return m_maxRetrySendTimes;
+    return m_maxRetrySendTimes;
 } 
 int ResponseFuture::getRetrySendTimes() const {
-	return m_retrySendTimes;
+    return m_retrySendTimes;
 }
 
 void ResponseFuture::setMaxRetrySendTimes(int maxRetryTimes) {
-	m_maxRetrySendTimes = maxRetryTimes;
+    m_maxRetrySendTimes = maxRetryTimes;
 }
 void ResponseFuture::setRetrySendTimes(int retryTimes) {
-	m_retrySendTimes = retryTimes;
+    m_retrySendTimes = retryTimes;
 }
 
 void ResponseFuture::setBrokerAddr(const std::string& brokerAddr) {
-	m_brokerAddr = brokerAddr;
+    m_brokerAddr = brokerAddr;
 }
 void ResponseFuture::setRequestCommand(const RemotingCommand& requestCommand) {
-	m_requestCommand = requestCommand;
+    m_requestCommand = requestCommand;
 }
 
 const RemotingCommand& ResponseFuture::getRequestCommand() {
-	return m_requestCommand;
+    return m_requestCommand;
 }
 std::string ResponseFuture::getBrokerAddr() const {
-	return m_brokerAddr;
+    return m_brokerAddr;
 }
 
 int64 ResponseFuture::leftTime() const {
-	int64 diff = UtilAll::currentTimeMillis() - m_beginTimestamp;
-	return m_timeout - diff;
+    int64 diff = UtilAll::currentTimeMillis() - m_beginTimestamp;
+    return m_timeout - diff;
 }
 
 RemotingCommand* ResponseFuture::getCommand() const {
diff --git a/src/transport/TcpRemotingClient.cpp b/src/transport/TcpRemotingClient.cpp
index 603c17f..7b9dff9 100755
--- a/src/transport/TcpRemotingClient.cpp
+++ b/src/transport/TcpRemotingClient.cpp
@@ -217,10 +217,10 @@ bool TcpRemotingClient::invokeAsync(const string& addr,
     int opaque = request.getOpaque();
     boost::shared_ptr<ResponseFuture> responseFuture(
         new ResponseFuture(code, opaque, this, timeoutMilliseconds, true, cbw));
-	responseFuture->setMaxRetrySendTimes(maxRetrySendTimes);
-	responseFuture->setRetrySendTimes(retrySendTimes);
-	responseFuture->setBrokerAddr(addr);
-	responseFuture->setRequestCommand(request);	
+    responseFuture->setMaxRetrySendTimes(maxRetrySendTimes);
+    responseFuture->setRetrySendTimes(retrySendTimes);
+    responseFuture->setBrokerAddr(addr);
+    responseFuture->setRequestCommand(request);    
     addAsyncResponseFuture(opaque, responseFuture);
     if (cbw) {
       boost::asio::deadline_timer* t = new boost::asio::deadline_timer(
@@ -593,7 +593,7 @@ void TcpRemotingClient::processResponseCommand(
       pfuture->setAsyncResponseFlag();
       pfuture->setAsyncCallBackStatus(asyncCallBackStatus_response);
       cancelTimerCallback(opaque);
-      pfuture->executeInvokeCallback();	  
+      pfuture->executeInvokeCallback();      
     }
   }
 }
diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt
index 8fcd892..73a00d0 100755
--- a/test/CMakeLists.txt
+++ b/test/CMakeLists.txt
@@ -15,7 +15,6 @@
 
 project(test)
 
-
 SET(SUB_DIRS)
 file(GLOB children ${CMAKE_SOURCE_DIR}/src/*)
 FOREACH(child ${children})
@@ -32,7 +31,6 @@ set(EXECUTABLE_OUTPUT_PATH ${CMAKE_SOURCE_DIR}/test/bin)
 include_directories(${CMAKE_SOURCE_DIR}/include)
 include_directories(${Boost_INCLUDE_DIRS})
 
-
 set(Gtest_INCLUDE_DIR ${CMAKE_SOURCE_DIR}/bin/include/gtest)
 set(Gmock_INCLUDE_DIR ${CMAKE_SOURCE_DIR}/bin/include/gmock)
 
@@ -55,34 +53,32 @@ message(status "ROCKETMQ_LIBRARIES ${ROCKETMQ_LIBRARIES}")
 
 set(CMAKE_BUILD_TYPE "Debug")
 
-
 function(compile files)
-	foreach(file ${files})
-	    get_filename_component(basename ${file} NAME_WE)
-	    add_executable(${basename} ${file})
-	    if(MSVC)
-	        if(CMAKE_CONFIGURATION_TYPES STREQUAL "Release")
-	            set_target_properties( ${basename} PROPERTIES LINK_FLAGS "/NODEFAULTLIB:LIBCMT" )
-	        else()
-	            set_target_properties( ${basename} PROPERTIES LINK_FLAGS "/NODEFAULTLIB:LIBCMTD" )
-	        endif()
-	    endif()
-	
-	    if (MSVC) 
-	    	if (BUILD_ROCKETMQ_SHARED)
-	        	target_link_libraries (${basename}  rocketmq_shared ${deplibs}
-	        	${Boost_LIBRARIES} ${LIBEVENT_LIBRARIES} ${JSONCPP_LIBRARIES} ${x`})
-	    	else()
-	        	target_link_libraries (${basename}  rocketmq_static ${deplibs}
-	        	${Boost_LIBRARIES} ${LIBEVENT_LIBRARIES} ${JSONCPP_LIBRARIES} ${Gtest_LIBRARIES})
-	        endif()
-	    else()
-	            target_link_libraries (${basename}  rocketmq_shared ${deplibs})
-	            target_link_libraries (${basename}  rocketmq_shared ${Gtest_LIBRARIES})
-	            target_link_libraries (${basename}  rocketmq_shared ${Gmock_LIBRARIES})
-	    endif()
-	    
-	endforeach()
+    foreach(file ${files})
+        get_filename_component(basename ${file} NAME_WE)
+        add_executable(${basename} ${file})
+        if(MSVC)
+            if(CMAKE_CONFIGURATION_TYPES STREQUAL "Release")
+                set_target_properties( ${basename} PROPERTIES LINK_FLAGS "/NODEFAULTLIB:LIBCMT" )
+            else()
+                set_target_properties( ${basename} PROPERTIES LINK_FLAGS "/NODEFAULTLIB:LIBCMTD" )
+            endif()
+        endif()
+
+        if (MSVC) 
+            if (BUILD_ROCKETMQ_SHARED)
+                target_link_libraries (${basename}  rocketmq_shared ${deplibs}
+                ${Boost_LIBRARIES} ${LIBEVENT_LIBRARIES} ${JSONCPP_LIBRARIES} ${x`})
+            else()
+                target_link_libraries (${basename}  rocketmq_static ${deplibs}
+                ${Boost_LIBRARIES} ${LIBEVENT_LIBRARIES} ${JSONCPP_LIBRARIES} ${Gtest_LIBRARIES})
+            endif()
+        else()
+            target_link_libraries (${basename}  rocketmq_shared ${deplibs})
+            target_link_libraries (${basename}  rocketmq_shared ${Gtest_LIBRARIES})
+            target_link_libraries (${basename}  rocketmq_shared ${Gmock_LIBRARIES})
+        endif()  
+    endforeach()
 endfunction()
 
 file(GLOB files "src/*.c*")
@@ -90,9 +86,8 @@ compile("${files}")
 
 file(GLOB files "src/*")
 foreach(file ${files})
-	if(IS_DIRECTORY ${file})
-		 file(GLOB filess "${file}/*.c*")
-		 compile("${filess}")
-	endif()
+    if(IS_DIRECTORY ${file})
+        file(GLOB filess "${file}/*.c*")
+        compile("${filess}")
+    endif()
 endforeach()
-
diff --git a/test/src/BatchMessageTest.cpp b/test/src/BatchMessageTest.cpp
new file mode 100644
index 0000000..81bd7bb
--- /dev/null
+++ b/test/src/BatchMessageTest.cpp
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "gtest/gtest.h"
+#include "gmock/gmock.h"
+#include <unistd.h>
+#include <stdio.h>
+#include <iostream>
+#include "BatchMessage.h"
+#include "MQMessage.h"
+#include <map>
+
+using namespace std;
+using namespace rocketmq;
+using ::testing::InitGoogleTest;
+using ::testing::InitGoogleMock;
+using testing::Return;
+
+TEST(BatchMessageEncodeTest, encodeMQMessage) {
+    MQMessage msg1("topic", "*", "test");
+    //const map<string,string>& properties = msg1.getProperties();
+    //for (auto& pair : properties) {
+    //    std::cout << pair.first << " : " << pair.second << std::endl;
+    //}
+
+    EXPECT_EQ(msg1.getProperties().size(), 2);
+    EXPECT_EQ(msg1.getBody().size(), 4);
+    //20 + bodyLen + 2 + propertiesLength;
+    string encodeMessage = BatchMessage::encode(msg1);
+    EXPECT_EQ(encodeMessage.size(), 43);
+
+    msg1.setProperty(MQMessage::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, "1");
+    encodeMessage = BatchMessage::encode(msg1);
+    EXPECT_EQ(encodeMessage.size(), 54);
+}
+
+TEST(BatchMessageEncodeTest, encodeMQMessages) {
+    std::vector<MQMessage> msgs;
+    MQMessage msg1("topic", "*", "test1");
+    //const map<string,string>& properties = msg1.getProperties();
+    //for (auto& pair : properties) {
+    //    std::cout << pair.first << " : " << pair.second << std::endl;
+    //}
+    msgs.push_back(msg1);
+    //20 + bodyLen + 2 + propertiesLength;
+    string encodeMessage = BatchMessage::encode(msgs);
+    EXPECT_EQ(encodeMessage.size(), 86);
+    MQMessage msg2("topic", "*", "test2");
+    MQMessage msg3("topic", "*", "test3");
+    msgs.push_back(msg2);
+    msgs.push_back(msg3);
+    encodeMessage = BatchMessage::encode(msgs);
+    EXPECT_EQ(encodeMessage.size(), 258);//86*3
+}
+
+int main(int argc, char* argv[]) {
+    InitGoogleMock(&argc, argv);
+    return RUN_ALL_TESTS();
+}
diff --git a/test/src/MQDecoderTest.cpp b/test/src/MQDecoderTest.cpp
new file mode 100644
index 0000000..263fe30
--- /dev/null
+++ b/test/src/MQDecoderTest.cpp
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "gtest/gtest.h"
+#include "gmock/gmock.h"
+#include <unistd.h>
+#include <stdio.h>
+#include "BatchMessage.h"
+#include "MQMessage.h"
+#include <map>
+#include "MQDecoder.h"
+
+using namespace std;
+using namespace rocketmq;
+using ::testing::InitGoogleTest;
+using ::testing::InitGoogleMock;
+using testing::Return;
+
+TEST(MQDecoderTest, messageProperties2String) {
+    map<string, string> properties;
+    string property = MQDecoder::messageProperties2String(properties);
+    EXPECT_EQ(property.size(), 0);
+    properties["aaa"] = "aaa";
+    property = MQDecoder::messageProperties2String(properties);
+    EXPECT_EQ(property.size(), 8);
+}
+
+int main(int argc, char* argv[]) {
+    InitGoogleMock(&argc, argv);
+    return RUN_ALL_TESTS();
+}
diff --git a/test/src/StringIdMakerTest.cpp b/test/src/StringIdMakerTest.cpp
new file mode 100644
index 0000000..8889d28
--- /dev/null
+++ b/test/src/StringIdMakerTest.cpp
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "gtest/gtest.h"
+#include "gmock/gmock.h"
+#include "StringIdMaker.h"
+#include <map>
+#include <iostream>
+
+using namespace std;
+using namespace rocketmq;
+using ::testing::InitGoogleTest;
+using ::testing::InitGoogleMock;
+using testing::Return;
+
+TEST(StringIdMakerTest, get_unique_id) {
+    string unique_id = StringIdMaker::get_mutable_instance().get_unique_id();
+    cout << "unique_id: " << unique_id << endl;
+    EXPECT_EQ(unique_id.size(), 32);
+}
+
+int main(int argc, char* argv[]) {
+    InitGoogleMock(&argc, argv);
+    return RUN_ALL_TESTS();
+}