You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2019/07/30 09:38:30 UTC

[rocketmq-ons-cpp] 03/35: Add ONS API

This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-ons-cpp.git

commit e3411416a516d652b591450abc0441e21216e7af
Author: ShannonDing <li...@163.com>
AuthorDate: Tue Jul 23 21:19:24 2019 +0800

    Add ONS API
---
 src/main/c/native/rocketmq.h                    |  63 +++++++
 src/main/cpp/demos/CMakeLists.Release           |  39 ++++
 src/main/cpp/demos/CMakeLists.txt               |  17 ++
 src/main/cpp/demos/ConsumerDemo.cpp             |  63 +++++++
 src/main/cpp/demos/MultiThreadProducerDemo.cpp  |  75 ++++++++
 src/main/cpp/demos/OrderConsumerDemo.cpp        |  59 ++++++
 src/main/cpp/demos/OrderProducerDemo.cpp        |  54 ++++++
 src/main/cpp/demos/ProducerAsyncDemo.cpp        |  93 ++++++++++
 src/main/cpp/demos/ProducerDemo.cpp             |  56 ++++++
 src/main/cpp/demos/ProducerOnewayDemo.cpp       |  49 +++++
 src/main/cpp/demos/TransactionProducerDemo.cpp  |  75 ++++++++
 src/main/cpp/include/Action.h                   |  14 ++
 src/main/cpp/include/ConsumeContext.h           |  13 ++
 src/main/cpp/include/ConsumeOrderContext.h      |  13 ++
 src/main/cpp/include/LocalTransactionChecker.h  |  16 ++
 src/main/cpp/include/LocalTransactionExecuter.h |  16 ++
 src/main/cpp/include/Message.h                  | 100 ++++++++++
 src/main/cpp/include/MessageListener.h          |  19 ++
 src/main/cpp/include/MessageOrderListener.h     |  20 ++
 src/main/cpp/include/MessageQueueONS.h          |  39 ++++
 src/main/cpp/include/MessageQueueSelectorONS.h  |  17 ++
 src/main/cpp/include/ONSCallback.h              |  16 ++
 src/main/cpp/include/ONSChannel.h               |  15 ++
 src/main/cpp/include/ONSClient.h                |  20 ++
 src/main/cpp/include/ONSClientException.h       |  25 +++
 src/main/cpp/include/ONSFactory.h               | 103 ++++++++++
 src/main/cpp/include/OrderAction.h              |  11 ++
 src/main/cpp/include/OrderConsumer.h            |  19 ++
 src/main/cpp/include/OrderProducer.h            |  23 +++
 src/main/cpp/include/Producer.h                 |  35 ++++
 src/main/cpp/include/PullConsumer.h             |  38 ++++
 src/main/cpp/include/PullResultONS.h            |  40 ++++
 src/main/cpp/include/PushConsumer.h             |  20 ++
 src/main/cpp/include/SendResultONS.h            |  19 ++
 src/main/cpp/include/TransactionProducer.h      |  29 +++
 src/main/cpp/include/TransactionStatus.h        |  13 ++
 src/test/cpp/MessageTest.cpp                    | 237 ++++++++++++++++++++++++
 37 files changed, 1573 insertions(+)

diff --git a/src/main/c/native/rocketmq.h b/src/main/c/native/rocketmq.h
new file mode 100644
index 0000000..ad48a98
--- /dev/null
+++ b/src/main/c/native/rocketmq.h
@@ -0,0 +1,63 @@
+#ifndef __ROCKETMQ_H__
+#define __ROCKETMQ_H__
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+
+typedef struct message_struct {
+    char *topic;
+    char *tags;
+    char *body;
+    unsigned int body_size;
+    char *key;
+    char *user_prop;
+    char *system_prop;
+} message;
+
+typedef struct send_result_struct {
+    char *message_id;
+    int error_no;
+    char *error_msg;
+} send_result;
+
+typedef struct factory_property_struct {
+    char *group_id;
+    char *access_key;
+    char *access_secret;
+    char *name_srv_addr;
+    char *name_srv_domain;
+    char *message_model;
+    char *send_msg_timeout_millis;
+    char *consume_thread_nums;
+    char *ons_channel;
+    char *max_msg_cache_size;
+    char *ons_trace_switch;
+    char *consumer_instance_name;
+    char *language_identifier;
+    char *instance_id;
+    int use_domain;
+} factory_property;
+
+typedef struct callback_func_struct {
+    char *send_callback_ons;
+
+    void (*on_success)(void *thread, char *message_id, char *send_callback_ons);
+
+    void (*on_exception)(void *thread, char *m_msg, int m_error, char *send_callback_ons);
+} callback_func;
+
+typedef struct subscription_struct {
+    char *topic;
+    char *sub_expression;
+
+    int (*on_message)(void *thread, void *opaque, char *topic, char *user_props, char *sys_props, char *body, int len);
+
+    void *opaque;
+} subscription;
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif //__ROCKETMQ_H__
\ No newline at end of file
diff --git a/src/main/cpp/demos/CMakeLists.Release b/src/main/cpp/demos/CMakeLists.Release
new file mode 100644
index 0000000..7f47ee5
--- /dev/null
+++ b/src/main/cpp/demos/CMakeLists.Release
@@ -0,0 +1,39 @@
+cmake_minimum_required(VERSION 3.0)
+project(onsclient4cpp_demo VERSION 1.0
+        LANGUAGES C CXX)
+set(CMAKE_CXX_STANDARD 11)
+include_directories(../include)
+
+find_library(ROCKETMQ_CLIENT_CORE
+        NAMES rocketmq_client_core
+        HINTS ../lib)
+
+if (${ROCKETMQ_CLIENT_CORE-NOTFOUNT})
+    message("find_library for rocketmq_client_core failed")
+endif ()
+
+find_library(ONS_CLIENT
+        NAMES onsclient4cpp
+        HINTS ../lib)
+
+if (${ONS_CLIENT-NOTFOUNT})
+    message("find_library for rocketmq_client_core failed")
+endif ()
+
+macro(add_demo name source_file)
+    add_executable(${name} ${source_file})
+    target_link_libraries(${name} pthread ${ROCKETMQ_CLIENT_CORE} ${ONS_CLIENT})
+    set_target_properties(${name} PROPERTIES RUNTIME_OUTPUT_DIRECTORY ${CMAKE_SOURCE_DIR}/bin)
+endmacro()
+
+add_demo(producer_demo ProducerDemo.cpp)
+add_demo(order_producer_demo OrderProducerDemo.cpp)
+add_demo(multi_thread_producer_demo MultiThreadProducerDemo.cpp)
+add_demo(producer_async_demo ProducerAsyncDemo.cpp)
+add_demo(producer_oneway_demo ProducerOnewayDemo.cpp)
+add_demo(consumer_demo ConsumerDemo.cpp)
+add_demo(order_consumer_demo OrderConsumerDemo.cpp)
+add_demo(transaction_producer_demo TransactionProducerDemo.cpp)
+
+
+
diff --git a/src/main/cpp/demos/CMakeLists.txt b/src/main/cpp/demos/CMakeLists.txt
new file mode 100644
index 0000000..6ffd443
--- /dev/null
+++ b/src/main/cpp/demos/CMakeLists.txt
@@ -0,0 +1,17 @@
+macro(add_demo name source_file)
+    add_executable(${name} ${source_file})
+    target_link_libraries(${name} pthread ${CMAKE_PROJECT_NAME})
+    set_target_properties(${name} PROPERTIES RUNTIME_OUTPUT_DIRECTORY ${CMAKE_SOURCE_DIR}/bin/demos)
+endmacro()
+
+add_demo(producer_demo ProducerDemo.cpp)
+add_demo(order_producer_demo OrderProducerDemo.cpp)
+add_demo(multi_thread_producer_demo MultiThreadProducerDemo.cpp)
+add_demo(producer_async_demo ProducerAsyncDemo.cpp)
+add_demo(producer_oneway_demo ProducerOnewayDemo.cpp)
+add_demo(consumer_demo ConsumerDemo.cpp)
+add_demo(order_consumer_demo OrderConsumerDemo.cpp)
+add_demo(transaction_producer_demo TransactionProducerDemo.cpp)
+
+
+
diff --git a/src/main/cpp/demos/ConsumerDemo.cpp b/src/main/cpp/demos/ConsumerDemo.cpp
new file mode 100644
index 0000000..0df672a
--- /dev/null
+++ b/src/main/cpp/demos/ConsumerDemo.cpp
@@ -0,0 +1,63 @@
+#include "ONSFactory.h"
+
+#include <iostream>
+#include <thread>
+#include <mutex>
+
+using namespace ons;
+
+std::mutex console_mtx;
+
+class ExampleMessageListener : public MessageListener {
+public:
+    Action consume(Message& message, ConsumeContext& context) {
+        //此处为具体的消息处理过程,确认消息被处理成功请返回 CommitMessage,
+        //如果有消费异常,或者期望重新消费,可以返回 ReconsumeLater,消息将会在一段时间后重新投递
+        std::lock_guard<std::mutex> lk(console_mtx);
+        std::cout << "Received a message. Topic: " << message.getTopic() << ", MsgId: "
+        << message.getMsgID() << std::endl;
+        return CommitMessage;
+    }
+};
+
+int main(int argc, char* argv[]) {
+    std::cout << "=======Before consuming messages=======" << std::endl;
+    ONSFactoryProperty factoryInfo;
+    //Request from ONS console, this should be GroupID here.
+    //请填写在阿里云ONS控制台上申请的GroupID,从实例化的版本开始,ProducerId和CounsumerId已经统一,此处设置是为了接口保持向前兼容。
+    factoryInfo.setFactoryProperty(ONSFactoryProperty::ConsumerId, "GID_XXX");
+    //Request from ONS console
+    //请填写阿里云ONS控制台上申请的topic
+    factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics, "Your topic");
+    //Your Access Key from your account.
+    //请填写你的账户的AK
+    factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "Your Access Key");
+    //Your Secret Key from your account.
+    //请填写你的账户的SK
+    factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "Your Secret Key");
+    //This is the endpoint from ONS console
+    //请填写阿里云ONS控制台上对应实例的接入点
+    factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR,
+                                   "http://xxxxxxxxxxxxxxxx.aliyuncs.com:80");
+
+    PushConsumer *consumer = ONSFactory::getInstance()->createPushConsumer(factoryInfo);
+
+    std::string topic(factoryInfo.getPublishTopics());
+    std::string tag("Your Tag");
+
+    //register your own listener here to handle the messages received.
+    //请注册自定义侦听函数用来处理接收到的消息,并返回响应的处理结果。
+    ExampleMessageListener *messageListener = new ExampleMessageListener();
+    consumer->subscribe(topic.c_str(), tag.c_str(), messageListener);
+
+    //Start this consumer
+    //准备工作完成,必须调用启动函数,才可以正常工作。
+    consumer->start();
+
+    //Keep main thread running until process finished.
+    //请保持线程常驻,不要执行shutdown操作
+    std::this_thread::sleep_for(std::chrono::milliseconds(60 * 1000));
+    consumer->shutdown();
+    std::cout << "=======After consuming messages======" << std::endl;
+    return 0;
+}
\ No newline at end of file
diff --git a/src/main/cpp/demos/MultiThreadProducerDemo.cpp b/src/main/cpp/demos/MultiThreadProducerDemo.cpp
new file mode 100644
index 0000000..867a736
--- /dev/null
+++ b/src/main/cpp/demos/MultiThreadProducerDemo.cpp
@@ -0,0 +1,75 @@
+#include <iostream>
+#include <chrono>
+#include <thread>
+#include <mutex>
+#include <vector>
+
+#include "ONSFactory.h"
+
+using namespace std;
+using namespace ons;
+
+int main() {
+    std::cout << "=======Before sending messages=======" << std::endl;
+    ONSFactoryProperty factoryInfo;
+    //Request from ONS console, this should be GroupID here.
+    //请填写在阿里云ONS控制台上申请的GroupID,从实例化的版本开始,ProducerId和CounsumerId已经统一,此处设置是为了接口保持向前兼容。
+    factoryInfo.setFactoryProperty(ONSFactoryProperty::ProducerId, "GID_XXX");
+    //Request from ONS console
+    //请填写阿里云ONS控制台上申请的topic
+    factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics, "Your topic");
+    //Your Access Key from your account.
+    //请填写你的账户的AK
+    factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "Your Access Key");
+    //Your Secret Key from your account.
+    //请填写你的账户的SK
+    factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "Your Secret Key");
+    //This is the endpoint from ONS console
+    //请填写阿里云ONS控制台上对应实例的接入点
+    factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR,
+                                   "http://xxxxxxxxxxxxxxxx.aliyuncs.com:80");
+    Producer *producer = nullptr;
+    producer = ONSFactory::getInstance()->createProducer(factoryInfo);
+    producer->start();
+    Message msg(
+            factoryInfo.getPublishTopics(),
+            "Your Tag",
+            "Your Key",
+            "This message body."
+    );
+
+    std::mutex console_mutex;
+
+    auto lambda = [&]() {
+        auto start = std::chrono::system_clock::now();
+        int count = 32;
+        for (int i = 0; i < count; ++i) {
+            SendResultONS sendResult = producer->send(msg);
+            std::cout << "Message ID: " << sendResult.getMessageId() << std::endl;
+        }
+        auto interval = std::chrono::system_clock::now() - start;
+        {
+            std::lock_guard<std::mutex> lk(console_mutex);
+            std::cout << "Send " << count << " messages OK, costs "
+                      << std::chrono::duration_cast<std::chrono::milliseconds>(interval).count() << "ms" << std::endl;
+        }
+    };
+
+    std::vector<std::thread> threads;
+    int thread_num = 3;
+
+    threads.reserve(thread_num);
+    for (int i = 0; i < thread_num; ++i) {
+        threads.emplace_back(lambda);
+    }
+
+    for (auto& t : threads) {
+        if (t.joinable()) {
+            t.join();
+        }
+    }
+
+    producer->shutdown();
+    std::cout << "=======After sending messages=======" << std::endl;
+    return 0;
+}
\ No newline at end of file
diff --git a/src/main/cpp/demos/OrderConsumerDemo.cpp b/src/main/cpp/demos/OrderConsumerDemo.cpp
new file mode 100644
index 0000000..28f5ee5
--- /dev/null
+++ b/src/main/cpp/demos/OrderConsumerDemo.cpp
@@ -0,0 +1,59 @@
+#include "ONSFactory.h"
+
+#include <iostream>
+#include <thread>
+#include <mutex>
+
+using namespace ons;
+
+std::mutex console_mtx;
+
+class ExampleMessageListener : public MessageOrderListener {
+public:
+    OrderAction consume(Message &message, ConsumeOrderContext &context) {
+        //此处为具体的消息处理过程,确认消息被处理成功请返回 Success,
+        //如果有消费异常,或者期望重新消费,可以返回 Suspend,消息将会在一段时间后重新投递
+        std::lock_guard<std::mutex> lk(console_mtx);
+        std::cout << "Received a message. Topic: " << message.getTopic() << ", MsgId: "
+                  << message.getMsgID() << std::endl;
+        return Success;
+    }
+};
+
+int main(int argc, char *argv[]) {
+    std::cout << "=======Before consuming messages=======" << std::endl;
+    ONSFactoryProperty factoryInfo;
+    //Request from ONS console, this should be GroupID here.
+    //请填写在阿里云ONS控制台上申请的GroupID,从实例化的版本开始,ProducerId和CounsumerId已经统一,此处设置是为了接口保持向前兼容。
+    factoryInfo.setFactoryProperty(ONSFactoryProperty::ConsumerId, "GID_XXX");
+    //Request from ONS console
+    //请填写阿里云ONS控制台上申请的topic
+    factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics, "Your topic");
+    //Your Access Key from your account.
+    //请填写你的账户的AK
+    factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "Your Access Key");
+    //Your Secret Key from your account.
+    //请填写你的账户的SK
+    factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "Your Secret Key");
+    //This is the endpoint from ONS console
+    //请填写阿里云ONS控制台上对应实例的接入点
+    factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR,
+                                   "http://xxxxxxxxxxxxxxxx.aliyuncs.com:80");
+    OrderConsumer *consumer = ONSFactory::getInstance()->createOrderConsumer(factoryInfo);
+
+    std::string topic(factoryInfo.getPublishTopics());
+    std::string tag("Your Tag");
+    ExampleMessageListener *messageListener = new ExampleMessageListener();
+    consumer->subscribe(topic.c_str(), tag.c_str(), messageListener);
+
+    //Start this consumer
+    //准备工作完成,必须调用启动函数,才可以正常工作。
+    consumer->start();
+
+    //Keep main thread running until process finished.
+    //请保持线程常驻,不要执行shutdown操作
+    std::this_thread::sleep_for(std::chrono::milliseconds(60 * 1000));
+    consumer->shutdown();
+    std::cout << "=======After consuming messages======" << std::endl;
+    return 0;
+}
\ No newline at end of file
diff --git a/src/main/cpp/demos/OrderProducerDemo.cpp b/src/main/cpp/demos/OrderProducerDemo.cpp
new file mode 100644
index 0000000..6bf6ed4
--- /dev/null
+++ b/src/main/cpp/demos/OrderProducerDemo.cpp
@@ -0,0 +1,54 @@
+#include <iostream>
+#include <chrono>
+#include "ONSFactory.h"
+
+using namespace std;
+using namespace ons;
+
+int main() {
+    std::cout << "=======Before sending messages=======" << std::endl;
+    ONSFactoryProperty factoryInfo;
+    //Request from ONS console, this should be GroupID here.
+    //请填写在阿里云ONS控制台上申请的GroupID,从实例化的版本开始,ProducerId和CounsumerId已经统一,此处设置是为了接口保持向前兼容。
+    factoryInfo.setFactoryProperty(ONSFactoryProperty::ProducerId, "GID_XXX");
+    //Request from ONS console
+    //请填写阿里云ONS控制台上申请的topic
+    factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics, "Your topic");
+    //Your Access Key from your account.
+    //请填写你的账户的AK
+    factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "Your Access Key");
+    //Your Secret Key from your account.
+    //请填写你的账户的SK
+    factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "Your Secret Key");
+    //This is the endpoint from ONS console
+    //请填写阿里云ONS控制台上对应实例的接入点
+    factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR,
+                                   "http://xxxxxxxxxxxxxxxx.aliyuncs.com:80");
+    OrderProducer *producer = nullptr;
+    producer = ONSFactory::getInstance()->createOrderProducer(factoryInfo);
+    producer->start();
+    Message msg(
+            factoryInfo.getPublishTopics(),
+            "Your Tag",
+            "Your Key",
+            "This message body."
+    );
+
+    auto start = std::chrono::system_clock::now();
+    int count = 32;
+    for (int i = 0; i < count; ++i) {
+        try {
+            SendResultONS sendResult = producer->send(msg,"Your Sharding Key");
+            std::cout << "Topic: " << msg.getTopic() << ", Message ID: " << sendResult.getMessageId() << std::endl;
+        } catch (ONSClientException e) {
+            std::cout << "ErrorCode: " << e.GetError() << " Exception:" << e.GetMsg() << std::endl;
+        }
+    }
+    auto interval = std::chrono::system_clock::now() - start;
+    std::cout << "Send " << count << " messages OK, costs "
+              << std::chrono::duration_cast<std::chrono::milliseconds>(interval).count() << "ms" << std::endl;
+
+    producer->shutdown();
+    std::cout << "=======After sending messages=======" << std::endl;
+    return 0;
+}
\ No newline at end of file
diff --git a/src/main/cpp/demos/ProducerAsyncDemo.cpp b/src/main/cpp/demos/ProducerAsyncDemo.cpp
new file mode 100644
index 0000000..451ab8a
--- /dev/null
+++ b/src/main/cpp/demos/ProducerAsyncDemo.cpp
@@ -0,0 +1,93 @@
+#include <iostream>
+#include <chrono>
+#include <mutex>
+#include <condition_variable>
+#include "ONSFactory.h"
+#include"ONSCallback.h"
+
+using namespace std;
+using namespace ons;
+
+std::mutex m1;
+std::mutex m2;
+std::condition_variable cv;
+
+class MyCallback : public SendCallbackONS {
+public:
+
+    void onSuccess(SendResultONS &sendResult) override {
+        std::lock_guard<std::mutex> lg(m2);
+        success_num++;
+        std::cout << "send success, message_id: " << sendResult.getMessageId() << ", total: " << success_num
+                  << std::endl;
+        if (success_num + failed_num == total) {
+            cv.notify_all();
+        }
+    }
+
+    void onException(ONSClientException &e) override {
+        std::lock_guard<std::mutex> lg(m2);
+        failed_num++;
+        std::cout << "send failure, total: " << failed_num << std::endl;
+        std::cout << e.what() << std::endl;
+        if (success_num + failed_num == total) {
+            cv.notify_all();
+        }
+    }
+
+    static int success_num;
+    static int failed_num;
+    static int total;
+};
+
+int MyCallback::success_num = 0;
+int MyCallback::failed_num = 0;
+int MyCallback::total = 32;
+
+
+int main() {
+    std::cout << "=======before send message=======" << std::endl;
+    ONSFactoryProperty factoryInfo;
+    //Request from ONS console, this should be GroupID here.
+    //请填写在阿里云ONS控制台上申请的GroupID,从实例化的版本开始,ProducerId和CounsumerId已经统一,此处设置是为了接口保持向前兼容。
+    factoryInfo.setFactoryProperty(ONSFactoryProperty::ProducerId, "GID_XXX");
+    //Request from ONS console
+    //请填写阿里云ONS控制台上申请的topic
+    factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics, "Your topic");
+    //Your Access Key from your account.
+    //请填写你的账户的AK
+    factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "Your Access Key");
+    //Your Secret Key from your account.
+    //请填写你的账户的SK
+    factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "Your Secret Key");
+    //This is the endpoint from ONS console
+    //请填写阿里云ONS控制台上对应实例的接入点
+    factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR,
+                                   "http://xxxxxxxxxxxxxxxx.aliyuncs.com:80");
+    Producer *producer = nullptr;
+    producer = ONSFactory::getInstance()->createProducer(factoryInfo);
+    producer->start();
+    Message msg(
+            factoryInfo.getPublishTopics(),
+            "Your Tag",
+            "Your Key",
+            "This message body."
+    );
+
+    auto start = std::chrono::system_clock::now();
+    MyCallback m_callback;
+    for (int i = 0; i < MyCallback::total; ++i) {
+        producer->sendAsync(msg, &m_callback);
+    }
+
+    {
+        std::unique_lock<std::mutex> lk(m1);
+        cv.wait(lk);
+    }
+
+    producer->shutdown();
+    std::cout << "=======after sending messages=======" << std::endl;
+
+
+    return 0;
+}
\ No newline at end of file
diff --git a/src/main/cpp/demos/ProducerDemo.cpp b/src/main/cpp/demos/ProducerDemo.cpp
new file mode 100644
index 0000000..0b19174
--- /dev/null
+++ b/src/main/cpp/demos/ProducerDemo.cpp
@@ -0,0 +1,56 @@
+#include <iostream>
+#include <chrono>
+#include "ONSFactory.h"
+
+using namespace std;
+using namespace ons;
+
+int main() {
+    std::cout << "=======Before sending messages=======" << std::endl;
+    ONSFactoryProperty factoryInfo;
+    //Request from ONS console, this should be GroupID here.
+    //请填写在阿里云ONS控制台上申请的GroupID,从实例化的版本开始,ProducerId和CounsumerId已经统一,此处设置是为了接口保持向前兼容。
+    factoryInfo.setFactoryProperty(ONSFactoryProperty::ProducerId, "GID_XXX");
+    //Request from ONS console
+    //请填写阿里云ONS控制台上申请的topic
+    factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics, "Your topic");
+    //Your Access Key from your account.
+    //请填写你的账户的AK
+    factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "Your Access Key");
+    //Your Secret Key from your account.
+    //请填写你的账户的SK
+    factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "Your Secret Key");
+    //This is the endpoint from ONS console
+    //请填写阿里云ONS控制台上对应实例的接入点
+    factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR,
+                                   "http://xxxxxxxxxxxxxxxx.aliyuncs.com:80");
+    Producer *producer = nullptr;
+    producer = ONSFactory::getInstance()->createProducer(factoryInfo);
+    producer->start();
+    Message msg(
+            factoryInfo.getPublishTopics(),
+            "Your Tag",
+            "Your Key",
+            "This message body."
+    );
+
+    auto start = std::chrono::system_clock::now();
+    int count = 32;
+    for (int i = 0; i < count; ++i) {
+        try {
+            SendResultONS sendResult = producer->send(msg);
+            std::cout << "Message ID: " << sendResult.getMessageId() << std::endl;
+        } catch (ONSClientException e) {
+            std::cout << "ErrorCode: " << e.GetError() << " Exception:" << e.GetMsg() << std::endl;
+        }
+    }
+    auto interval = std::chrono::system_clock::now() - start;
+    std::cout << "Send " << count << " messages OK, costs "
+              << std::chrono::duration_cast<std::chrono::milliseconds>(interval).count() << "ms" << std::endl;
+
+    //Keep main thread running until process finished.
+    //请保持线程常驻,不要执行shutdown操作
+    producer->shutdown();
+    std::cout << "=======After sending messages=======" << std::endl;
+    return 0;
+}
\ No newline at end of file
diff --git a/src/main/cpp/demos/ProducerOnewayDemo.cpp b/src/main/cpp/demos/ProducerOnewayDemo.cpp
new file mode 100644
index 0000000..031bff3
--- /dev/null
+++ b/src/main/cpp/demos/ProducerOnewayDemo.cpp
@@ -0,0 +1,49 @@
+#include <iostream>
+#include <chrono>
+#include "ONSFactory.h"
+
+using namespace std;
+using namespace ons;
+
+int main() {
+    std::cout << "=======Before sending messages=======" << std::endl;
+    ONSFactoryProperty factoryInfo;
+    //Request from ONS console, this should be GroupID here.
+    //请填写在阿里云ONS控制台上申请的GroupID,从实例化的版本开始,ProducerId和CounsumerId已经统一,此处设置是为了接口保持向前兼容。
+    factoryInfo.setFactoryProperty(ONSFactoryProperty::ProducerId, "GID_XXX");
+    //Request from ONS console
+    //请填写阿里云ONS控制台上申请的topic
+    factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics, "Your topic");
+    //Your Access Key from your account.
+    //请填写你的账户的AK
+    factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "Your Access Key");
+    //Your Secret Key from your account.
+    //请填写你的账户的SK
+    factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "Your Secret Key");
+    //This is the endpoint from ONS console
+    //请填写阿里云ONS控制台上对应实例的接入点
+    factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR,
+                                   "http://xxxxxxxxxxxxxxxx.aliyuncs.com:80");
+    Producer *producer = nullptr;
+    producer = ONSFactory::getInstance()->createProducer(factoryInfo);
+    producer->start();
+    Message msg(
+            factoryInfo.getPublishTopics(),
+            "Your Tag",
+            "Your Key",
+            "This message body."
+    );
+
+    auto start = std::chrono::system_clock::now();
+    int count = 32;
+    for (int i = 0; i < count; ++i) {
+         producer->sendOneway(msg);
+    }
+    auto interval = std::chrono::system_clock::now() - start;
+    std::cout << "Send " << count << " messages OK, costs "
+              << std::chrono::duration_cast<std::chrono::milliseconds>(interval).count() << "ms" << std::endl;
+
+    producer->shutdown();
+    std::cout << "=======After sending messages=======" << std::endl;
+    return 0;
+}
\ No newline at end of file
diff --git a/src/main/cpp/demos/TransactionProducerDemo.cpp b/src/main/cpp/demos/TransactionProducerDemo.cpp
new file mode 100644
index 0000000..6044589
--- /dev/null
+++ b/src/main/cpp/demos/TransactionProducerDemo.cpp
@@ -0,0 +1,75 @@
+#include <iostream>
+#include <chrono>
+#include "ONSFactory.h"
+#include"LocalTransactionChecker.h"
+#include "LocalTransactionExecuter.h"
+
+
+using namespace std;
+using namespace ons;
+
+class LocalTransactionCheckerImpl : public LocalTransactionChecker {
+    virtual TransactionStatus check(Message &msg) {
+        cout << "checker::commit transaction" << endl;
+        return CommitTransaction;
+    }
+};
+
+class LocalTransactionExecuterImpl : public LocalTransactionExecuter {
+    virtual TransactionStatus execute(Message &msg) {
+        cout << "executer::commit transaction of msgid: " << msg.getMsgID() << endl;
+        return CommitTransaction;
+    }
+};
+
+
+int main() {
+    std::cout << "=======Before sending message=======" << std::endl;
+    ONSFactoryProperty factoryInfo;
+    //Request from ONS console, this should be GroupID here.
+    //请填写在阿里云ONS控制台上申请的GroupID,从实例化的版本开始,ProducerId和CounsumerId已经统一,此处设置是为了接口保持向前兼容。
+    factoryInfo.setFactoryProperty(ONSFactoryProperty::ProducerId, "GID_XXX");
+    //Request from ONS console
+    //请填写阿里云ONS控制台上申请的topic
+    factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics, "Your topic");
+    //Your Access Key from your account.
+    //请填写你的账户的AK
+    factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "Your Access Key");
+    //Your Secret Key from your account.
+    //请填写你的账户的SK
+    factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "Your Secret Key");
+    //This is the endpoint from ONS console
+    //请填写阿里云ONS控制台上对应实例的接入点
+    factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR,
+                                   "http://xxxxxxxxxxxxxxxx.aliyuncs.com:80");
+    TransactionProducer *producer = nullptr;
+    LocalTransactionCheckerImpl *checker = new LocalTransactionCheckerImpl();
+    producer = ONSFactory::getInstance()->createTransactionProducer(factoryInfo, checker);
+    producer->start();
+    Message msg(
+            factoryInfo.getPublishTopics(),
+            "Your Tag",
+            "Your Key",
+            "This message body."
+    );
+    auto start = std::chrono::system_clock::now();
+    int count = 32;
+    LocalTransactionExecuterImpl executer;
+    for (int i = 0; i < count; ++i) {
+        try {
+            SendResultONS sendResult = producer->send(msg, &executer);
+            std::cout << "Message ID: " << sendResult.getMessageId() << std::endl;
+        }
+        catch (ONSClientException &e) {
+            cout << e.GetMsg() << endl;
+        }
+        cout << endl;
+    }
+    auto interval = std::chrono::system_clock::now() - start;
+    std::cout << "Send " << count << " messages OK, costs "
+              << std::chrono::duration_cast<std::chrono::milliseconds>(interval).count() << "ms" << std::endl;
+
+//    producer->shutdown();
+    std::cout << "=======After sending message=======" << std::endl;
+    return 0;
+}
\ No newline at end of file
diff --git a/src/main/cpp/include/Action.h b/src/main/cpp/include/Action.h
new file mode 100644
index 0000000..289f440
--- /dev/null
+++ b/src/main/cpp/include/Action.h
@@ -0,0 +1,14 @@
+#ifndef __ACTION_H__
+#define __ACTION_H__
+
+#include "ONSClient.h"
+
+// consuming result
+enum Action {
+  // consume success, application could continue to consume next message
+  CommitMessage,
+  // consume fail, server will deliver this message later, application could
+  // continue to consume next message
+  ReconsumeLater
+};
+#endif
diff --git a/src/main/cpp/include/ConsumeContext.h b/src/main/cpp/include/ConsumeContext.h
new file mode 100644
index 0000000..abb59de
--- /dev/null
+++ b/src/main/cpp/include/ConsumeContext.h
@@ -0,0 +1,13 @@
+#ifndef __CONSUMECONTEXT_H__
+#define __CONSUMECONTEXT_H__
+#include "ONSClient.h"
+
+namespace ons {
+
+class  ONSCLIENT_API ConsumeContext {
+ public:
+  ConsumeContext() {}
+  virtual ~ConsumeContext() {}
+};
+}
+#endif
diff --git a/src/main/cpp/include/ConsumeOrderContext.h b/src/main/cpp/include/ConsumeOrderContext.h
new file mode 100644
index 0000000..a196d5d
--- /dev/null
+++ b/src/main/cpp/include/ConsumeOrderContext.h
@@ -0,0 +1,13 @@
+#ifndef __CONSUMEORDERLYCONTEXT_H__
+#define __CONSUMEORDERLYCONTEXT_H__
+#include "ONSClient.h"
+
+namespace ons {
+
+class ONSCLIENT_API ConsumeOrderContext {
+ public:
+  ConsumeOrderContext() {}
+  virtual ~ConsumeOrderContext() {}
+};
+}
+#endif
diff --git a/src/main/cpp/include/LocalTransactionChecker.h b/src/main/cpp/include/LocalTransactionChecker.h
new file mode 100644
index 0000000..8601671
--- /dev/null
+++ b/src/main/cpp/include/LocalTransactionChecker.h
@@ -0,0 +1,16 @@
+#ifndef __LOCALTRANSACTIONCHECKER_H__
+#define __LOCALTRANSACTIONCHECKER_H__
+
+#include "Message.h"
+#include "TransactionStatus.h"
+
+namespace ons {
+class LocalTransactionChecker {
+ public:
+	 LocalTransactionChecker() {}
+  virtual TransactionStatus check(Message& msg) = 0;
+  virtual ~LocalTransactionChecker() {}
+};
+}
+
+#endif
diff --git a/src/main/cpp/include/LocalTransactionExecuter.h b/src/main/cpp/include/LocalTransactionExecuter.h
new file mode 100644
index 0000000..1d7ea4f
--- /dev/null
+++ b/src/main/cpp/include/LocalTransactionExecuter.h
@@ -0,0 +1,16 @@
+#ifndef __LOCALTRANSACTIONEXECUTER_H__
+#define __LOCALTRANSACTIONEXECUTER_H__
+
+#include "Message.h"
+#include "TransactionStatus.h"
+
+namespace ons {
+class LocalTransactionExecuter {
+ public:
+	 LocalTransactionExecuter() {}
+     virtual TransactionStatus execute(Message& msg) = 0;
+     virtual ~LocalTransactionExecuter() {}
+};
+}
+
+#endif
diff --git a/src/main/cpp/include/Message.h b/src/main/cpp/include/Message.h
new file mode 100755
index 0000000..8038e29
--- /dev/null
+++ b/src/main/cpp/include/Message.h
@@ -0,0 +1,100 @@
+#ifndef __MESSAGE_H__
+#define __MESSAGE_H__
+
+#include <map>
+#include <sstream>
+#include <vector>
+#include "ONSClient.h"
+
+namespace ons {
+
+class  SystemPropKey {
+ public:
+  SystemPropKey() {}
+  ~SystemPropKey() {}
+  static const char* TAG;
+  static const char* KEY;
+  static const char* MSGID;
+  static const char* RECONSUMETIMES;
+  static const char* STARTDELIVERTIME;
+};
+
+class  ONSCLIENT_API Message {
+ public:
+  Message();
+  Message(const std::string& topic, const std::string& tags, const std::string& byte_body);
+  Message(const char* topic, const char* tags, const char* byte_body);
+  Message(const char* topic, size_t topic_size, const char* tags, size_t tags_size, const char* body, size_t body_size);
+  Message(const char* topic, const char* tags, const char* keys, const char* body);
+
+  virtual ~Message();
+
+  Message(const Message& other);
+  Message& operator=(const Message& other);
+
+  // userProperties was used to save user specific parameters which doesn't
+  // belong to SystemPropKey
+  void putUserProperties(const char* key, const char* value);
+	const char* getUserProperties(const char* key) const;
+  void setUserProperties(std::map<std::string, std::string>& userProperty);
+  std::map<std::string, std::string> getUserProperties() const;
+
+  // systemProperties only save parameters defined in SystemPropKey, please do
+  // not add other parameters into systemProperties, else it was not saved.
+  void putSystemProperties(const char* key, const char* value);
+	const char* getSystemProperties(const char* key) const;
+  void setSystemProperties(std::map<std::string, std::string>& systemProperty);
+  std::map<std::string, std::string> getSystemProperties() const;
+
+	const char* getTopic() const;
+  void setTopic(const char* topic);
+
+	const char* getTag() const;
+  void setTag(const char* tags);
+
+	const char* getKey() const;
+  void setKey(const char* keys);
+
+	const char* getMsgID() const;
+  void setMsgID(const char* msgId);
+
+  const long long getStartDeliverTime() const;
+  void setStartDeliverTime(long long level);
+
+  const char* getBody() const;
+  const char* getByteBody(int *len) const;
+  const std::string getMsgBody() const;
+  const size_t getBodySize() const;
+  void setMsgBody(const std::string msgbody);
+  void setBody(unsigned char* byte_msgbody, int len);
+
+  const int getReconsumeTimes() const;
+  void setReconsumeTimes(int reconsumeTimes);
+
+  long long getStoreTimestamp() const;
+  void setStoreTimestamp(long long storeTimestamp);
+
+  const std::string toString() const;
+
+  const std::string toSystemString() const;
+
+  const std::string toUserString() const;
+
+  long long getQueueOffset() const;
+  void setQueueOffset(long long queueOffset);
+ protected:
+  void Init(const std::string& topic, const std::string& tags,
+            const std::string& keys, const std::string& body);
+
+ private:
+  std::string topic;
+  std::string body;
+  size_t body_size;
+  long long m_storeTimestamp;
+  long long m_queueOffset;
+  std::map<std::string, std::string> systemProperties;
+  std::map<std::string, std::string> userProperties;
+};
+
+}  //<!end namespace;
+#endif
diff --git a/src/main/cpp/include/MessageListener.h b/src/main/cpp/include/MessageListener.h
new file mode 100644
index 0000000..5733d2f
--- /dev/null
+++ b/src/main/cpp/include/MessageListener.h
@@ -0,0 +1,19 @@
+#ifndef __MESSAGELISTENER_H__
+#define __MESSAGELISTENER_H__
+
+#include "Action.h"
+#include "ConsumeContext.h"
+#include "Message.h"
+
+namespace ons {
+
+class  ONSCLIENT_API MessageListener {
+ public:
+  MessageListener() {}
+  virtual ~MessageListener() {}
+
+  // interface of consuming message, should be realized by application
+  virtual Action consume(Message& message, ConsumeContext& context) = 0;
+};
+}
+#endif
diff --git a/src/main/cpp/include/MessageOrderListener.h b/src/main/cpp/include/MessageOrderListener.h
new file mode 100644
index 0000000..848dd20
--- /dev/null
+++ b/src/main/cpp/include/MessageOrderListener.h
@@ -0,0 +1,20 @@
+#ifndef __MESSAGEORDERLYLISTENER_H__
+#define __MESSAGEORDERLYLISTENER_H__
+
+#include "ConsumeOrderContext.h"
+#include "Message.h"
+#include "OrderAction.h"
+
+namespace ons {
+
+class  MessageOrderListener {
+ public:
+  MessageOrderListener() {}
+  virtual ~MessageOrderListener() {}
+
+  // interface of consuming message, should be realized by application
+  virtual OrderAction consume(Message& message,
+                              ConsumeOrderContext& context) = 0;
+};
+}
+#endif
diff --git a/src/main/cpp/include/MessageQueueONS.h b/src/main/cpp/include/MessageQueueONS.h
new file mode 100755
index 0000000..bd8a8dc
--- /dev/null
+++ b/src/main/cpp/include/MessageQueueONS.h
@@ -0,0 +1,39 @@
+#ifndef __MESSAGEQUEUEONS_H__
+#define __MESSAGEQUEUEONS_H__
+
+#include <iomanip>
+#include <sstream>
+#include <string>
+
+using namespace std;
+
+namespace ons {
+class MessageQueueONS {
+ public:
+  MessageQueueONS();
+  MessageQueueONS(const string& topic, const string& brokerName, int queueId);
+
+  MessageQueueONS(const MessageQueueONS& other);
+  MessageQueueONS& operator=(const MessageQueueONS& other);
+
+  string getTopic() const;
+  void setTopic(const string& topic);
+
+  string getBrokerName() const;
+  void setBrokerName(const string& brokerName);
+
+  int getQueueId() const;
+  void setQueueId(int queueId);
+
+  bool operator==(const MessageQueueONS& mq) const;
+  bool operator<(const MessageQueueONS& mq) const;
+  int compareTo(const MessageQueueONS& mq) const;
+
+ private:
+  string m_topic;
+  string m_brokerName;
+  int m_queueId;
+};
+
+}  //<!end namespace;
+#endif
diff --git a/src/main/cpp/include/MessageQueueSelectorONS.h b/src/main/cpp/include/MessageQueueSelectorONS.h
new file mode 100644
index 0000000..a82a285
--- /dev/null
+++ b/src/main/cpp/include/MessageQueueSelectorONS.h
@@ -0,0 +1,17 @@
+#ifndef _MESSAGEQUEUESELECTOR_H_
+#define _MESSAGEQUEUESELECTOR_H_
+
+#include "Message.h"
+#include "MessageQueueONS.h"
+
+namespace ons {
+
+class MessageQueueSelectorONS {
+ public:
+  virtual ~MessageQueueSelectorONS() {}
+  virtual MessageQueueONS select(const vector<MessageQueueONS>& mqs,
+                                 const Message& msg, void* arg) = 0;
+};
+
+}  //<!end namespace;
+#endif  //<! _MQSELECTOR_H_
diff --git a/src/main/cpp/include/ONSCallback.h b/src/main/cpp/include/ONSCallback.h
new file mode 100644
index 0000000..d529426
--- /dev/null
+++ b/src/main/cpp/include/ONSCallback.h
@@ -0,0 +1,16 @@
+#ifndef __ONSCALLBACK_H__
+#define __ONSCALLBACK_H__
+
+#include "ONSClientException.h"
+#include "SendResultONS.h"
+
+namespace ons {
+    class SendCallbackONS {
+    public:
+        virtual ~SendCallbackONS() {}
+        virtual void onSuccess(SendResultONS& sendResult) {};
+        virtual void onException(ONSClientException& e) {};
+    };
+
+}  // end of namespace SendResultONS
+#endif  // end of _SENDCALLBACK_H_
diff --git a/src/main/cpp/include/ONSChannel.h b/src/main/cpp/include/ONSChannel.h
new file mode 100644
index 0000000..6f538af
--- /dev/null
+++ b/src/main/cpp/include/ONSChannel.h
@@ -0,0 +1,15 @@
+#ifndef __ONSCHANNEL_H__
+#define __ONSCHANNEL_H__
+
+namespace ons {
+
+    enum ONSChannel {
+        CLOUD,
+        ALIYUN,
+        ALL,
+        LOCAL,
+        INNER
+    };
+}
+
+#endif
diff --git a/src/main/cpp/include/ONSClient.h b/src/main/cpp/include/ONSClient.h
new file mode 100644
index 0000000..4ebc595
--- /dev/null
+++ b/src/main/cpp/include/ONSClient.h
@@ -0,0 +1,20 @@
+#ifndef __ONSCLIENT_H__
+#define __ONSCLIENT_H__
+
+#ifdef WIN32
+#ifdef ONSCLIENT_EXPORTS
+
+#ifndef SWIG
+#define ONSCLIENT_API __declspec(dllexport)
+#else
+#define ONSCLIENT_API
+#endif
+
+#else
+#define ONSCLIENT_API __declspec(dllimport)
+#endif
+#else
+#define ONSCLIENT_API
+#endif
+
+#endif
diff --git a/src/main/cpp/include/ONSClientException.h b/src/main/cpp/include/ONSClientException.h
new file mode 100644
index 0000000..76922f3
--- /dev/null
+++ b/src/main/cpp/include/ONSClientException.h
@@ -0,0 +1,25 @@
+#ifndef __ONSCLIENTEXCEPTION_H__
+#define __ONSCLIENTEXCEPTION_H__
+
+#include <exception>
+#include <string>
+#include "ONSClient.h"
+
+namespace ons {
+
+class ONSCLIENT_API ONSClientException : public std::exception {
+ public:
+  ONSClientException() throw();
+  virtual ~ONSClientException() throw();
+  ONSClientException(std::string msg, int error) throw();
+  const char* GetMsg() const throw();
+  const char* what() const throw();
+  int GetError() const throw();
+
+ private:
+  std::string m_msg;
+  int m_error;
+};
+}
+
+#endif
diff --git a/src/main/cpp/include/ONSFactory.h b/src/main/cpp/include/ONSFactory.h
new file mode 100755
index 0000000..ecb531a
--- /dev/null
+++ b/src/main/cpp/include/ONSFactory.h
@@ -0,0 +1,103 @@
+#ifndef __ONSFACTORY_H_
+#define __ONSFACTORY_H_
+
+#include "LocalTransactionChecker.h"
+#include "ONSChannel.h"
+#include "ONSClientException.h"
+#include "OrderConsumer.h"
+#include "OrderProducer.h"
+#include "Producer.h"
+#include "PullConsumer.h"
+#include "PushConsumer.h"
+#include "TransactionProducer.h"
+
+namespace ons {
+class ONSCLIENT_API ONSFactoryProperty {
+ public:
+  ONSFactoryProperty();
+  virtual ~ONSFactoryProperty();
+  bool checkValidityOfFactoryProperties(const std::string& key,
+                                        const std::string& value) throw(ons::ONSClientException);
+  const char* getLogPath() const;
+  void setSendMsgTimeout(const int value);
+  void setSendMsgRetryTimes(const int value);
+  void setMaxMsgCacheSize(const int size);
+  void setOnsTraceSwitch(bool onswitch);
+  void setOnsChannel(ONSChannel onsChannel) throw(ons::ONSClientException);
+  void setFactoryProperty(const char* key, const char* value) throw(ons::ONSClientException);
+  void setFactoryProperties(std::map<std::string, std::string> factoryProperties);
+  std::map<std::string, std::string> getFactoryProperties() const;
+  const char* getProducerId() const;
+  const char* getConsumerId() const;
+  const char* getGroupId() const;
+  const char* getPublishTopics() const;
+  const char* getMessageModel() const;
+  const int getSendMsgTimeout() const;
+  const int getSendMsgRetryTimes() const;
+  const int getConsumeThreadNums() const;
+  const int getMaxMsgCacheSize() const;
+  const ONSChannel getOnsChannel() const;
+  const char* getChannel() const;
+  const char* getMessageContent() const;
+  const char* getNameSrvAddr() const;
+  const char* getNameSrvDomain() const;
+  const char* getAccessKey() const;
+  const char* getSecretKey() const;
+  const char* getConsumerInstanceName() const;
+  bool getOnsTraceSwitch() const;
+  const char* getInstanceId() const;
+
+ public:
+  static const char* LogPath;
+  static const char* ProducerId;
+  static const char* ConsumerId;
+  static const char* GroupId;
+  static const char* PublishTopics;
+  static const char* MsgContent;
+  static const char* ONSAddr;
+  static const char* AccessKey;
+  static const char* SecretKey;
+  static const char* MessageModel;
+  static const char* BROADCASTING;
+  static const char* CLUSTERING;
+  static const char* SendMsgTimeoutMillis;
+  static const char* NAMESRV_ADDR;
+  static const char* ConsumeThreadNums;
+  static const char* OnsChannel;
+  static const char* MaxMsgCacheSize;
+  static const char* OnsTraceSwitch;
+  static const char* SendMsgRetryTimes;
+  static const char* ConsumerInstanceName;
+  static const char* InstanceId;
+
+ private:
+  std::map<std::string, std::string> m_onsFactoryProperties;
+};
+
+class ONSCLIENT_API ONSFactoryAPI {
+ public:
+  ONSFactoryAPI();
+  virtual ~ONSFactoryAPI();
+
+  virtual ons::Producer* createProducer(ons::ONSFactoryProperty factoryProperty) throw(ons::ONSClientException);
+  virtual ons::OrderProducer* createOrderProducer(ons::ONSFactoryProperty factoryProperty) throw(
+      ons::ONSClientException);
+  virtual ons::OrderConsumer* createOrderConsumer(ons::ONSFactoryProperty factoryProperty) throw(
+      ons::ONSClientException);
+  virtual ons::TransactionProducer* createTransactionProducer(
+      ons::ONSFactoryProperty factoryProperty, ons::LocalTransactionChecker* checker) throw(ons::ONSClientException);
+  virtual ons::PullConsumer* createPullConsumer(ons::ONSFactoryProperty factoryProperty) throw(ons::ONSClientException);
+  virtual ons::PushConsumer* createPushConsumer(ons::ONSFactoryProperty factoryProperty) throw(ons::ONSClientException);
+};
+
+class ONSCLIENT_API ONSFactory {
+ public:
+  virtual ~ONSFactory();
+  static ons::ONSFactoryAPI* getInstance();
+
+ private:
+  ONSFactory();
+  static ons::ONSFactoryAPI* onsFactoryInstance;
+};
+}  // namespace ons
+#endif
diff --git a/src/main/cpp/include/OrderAction.h b/src/main/cpp/include/OrderAction.h
new file mode 100644
index 0000000..297d285
--- /dev/null
+++ b/src/main/cpp/include/OrderAction.h
@@ -0,0 +1,11 @@
+#ifndef __ORDERACTION_H__
+#define __ORDERACTION_H__
+
+// order consuming result
+enum OrderAction {
+  // consume success, application could continue to consume next message
+  Success,
+  // consume fail, suspends the current queue
+  Suspend,
+};
+#endif
diff --git a/src/main/cpp/include/OrderConsumer.h b/src/main/cpp/include/OrderConsumer.h
new file mode 100644
index 0000000..cce53b6
--- /dev/null
+++ b/src/main/cpp/include/OrderConsumer.h
@@ -0,0 +1,19 @@
+#ifndef __ORDERCONSUMER_H__
+#define __ORDERCONSUMER_H__
+
+#include "MessageOrderListener.h"
+
+namespace ons {
+
+class  ONSCLIENT_API OrderConsumer {
+ public:
+  OrderConsumer() {}
+  virtual ~OrderConsumer() {}
+
+  virtual void start() = 0;
+  virtual void shutdown() = 0;
+  virtual void subscribe(const char* topic, const char* subExpression,
+                         MessageOrderListener* listener) = 0;
+};
+}
+#endif
diff --git a/src/main/cpp/include/OrderProducer.h b/src/main/cpp/include/OrderProducer.h
new file mode 100644
index 0000000..8924996
--- /dev/null
+++ b/src/main/cpp/include/OrderProducer.h
@@ -0,0 +1,23 @@
+#ifndef __ORDERLYPRODUCER_H__
+#define __ORDERLYPRODUCER_H__
+
+#include "Message.h"
+#include "SendResultONS.h"
+
+namespace ons {
+
+class ONSCLIENT_API OrderProducer {
+ public:
+  OrderProducer() {}
+  virtual ~OrderProducer() {}
+
+  // before send msg, start must be called to allocate resources.
+  virtual void start() = 0;
+  // before exit ons, shutdown must be called to release all resources allocated
+  // by ons internally.
+  virtual void shutdown() = 0;
+
+  virtual SendResultONS send(Message& msg, std::string shardingKey) = 0;
+};
+}
+#endif
diff --git a/src/main/cpp/include/Producer.h b/src/main/cpp/include/Producer.h
new file mode 100644
index 0000000..2daf759
--- /dev/null
+++ b/src/main/cpp/include/Producer.h
@@ -0,0 +1,35 @@
+#ifndef __PRODUCER_H__
+#define __PRODUCER_H__
+
+#include "Message.h"
+#include "SendResultONS.h"
+#include "ONSClientException.h"
+#include "MessageQueueONS.h"
+#include"ONSCallback.h"
+
+namespace ons {
+
+class  ONSCLIENT_API Producer {
+ public:
+  Producer()  {}
+  virtual ~Producer() {}
+
+  // before send msg, start must be called to allocate resources.
+  virtual void start() = 0;
+  // before exit ons, shutdown must be called to release all resources allocated
+  // by ons internally.
+  virtual void shutdown() = 0;
+  // retry max 3 times if send failed. if no exception throwed, it sends
+  // success;
+  virtual ons::SendResultONS send(Message& msg) throw(ons::ONSClientException) = 0;
+  virtual ons::SendResultONS send(Message& msg,
+      const MessageQueueONS& mq) throw(ons::ONSClientException) = 0;
+
+  // async send
+  virtual void sendAsync(Message& msg, ons::SendCallbackONS* callback) throw(ons::ONSClientException) = 0;
+
+  // oneway send
+  virtual void sendOneway(Message& msg) throw(ons::ONSClientException) = 0;
+};
+}
+#endif
diff --git a/src/main/cpp/include/PullConsumer.h b/src/main/cpp/include/PullConsumer.h
new file mode 100755
index 0000000..5f94e5a
--- /dev/null
+++ b/src/main/cpp/include/PullConsumer.h
@@ -0,0 +1,38 @@
+#ifndef __PULLCONSUMER_H__
+#define __PULLCONSUMER_H__
+
+#include <string>
+#include <vector>
+#include "MessageQueueONS.h"
+#include "PullResultONS.h"
+
+namespace ons {
+
+class ONSFactoryProperty;
+
+class ONSCLIENT_API PullConsumer {
+ public:
+  PullConsumer() {}
+  virtual ~PullConsumer() {}
+
+  virtual void start() = 0;
+  virtual void shutdown() = 0;
+  virtual void fetchSubscribeMessageQueues(
+      const std::string& topic, std::vector<MessageQueueONS>& mqs) = 0;
+  virtual PullResultONS pull(const MessageQueueONS& mq,
+                             const std::string& subExpression, long long offset,
+                             int maxNums) = 0;
+  virtual long long searchOffset(const MessageQueueONS& mq, long long timestamp) = 0;
+  virtual long long maxOffset(const MessageQueueONS& mq) = 0;
+  virtual long long minOffset(const MessageQueueONS& mq) = 0;
+  virtual void updateConsumeOffset(const MessageQueueONS& mq,
+                                   long long offset) = 0;
+  virtual void removeConsumeOffset(const MessageQueueONS& mq) = 0;
+  virtual long long fetchConsumeOffset(const MessageQueueONS& mq,
+                                       bool fromStore) = 0;
+  virtual void persistConsumerOffset4PullConsumer(const MessageQueueONS& mq)
+      throw(ons::ONSClientException) = 0;
+};
+
+}
+#endif
diff --git a/src/main/cpp/include/PullResultONS.h b/src/main/cpp/include/PullResultONS.h
new file mode 100644
index 0000000..2d939ea
--- /dev/null
+++ b/src/main/cpp/include/PullResultONS.h
@@ -0,0 +1,40 @@
+#ifndef __PULLRESULTONS_H__
+#define __PULLRESULTONS_H__
+
+#include <vector>
+#include "Message.h"
+#include "ONSClient.h"
+
+namespace ons {
+
+enum ONSPullStatus {
+  ONS_FOUND,
+  ONS_NO_NEW_MSG,
+  ONS_NO_MATCHED_MSG,
+  ONS_OFFSET_ILLEGAL,
+  ONS_BROKER_TIMEOUT  // indicate pull request timeout or received NULL response
+};
+
+class ONSCLIENT_API PullResultONS {
+ public:
+  PullResultONS(ONSPullStatus status)
+      : pullStatus(status), nextBeginOffset(0), minOffset(0), maxOffset(0) {}
+
+  PullResultONS(ONSPullStatus pullStatus, long long nextBeginOffset,
+                long long minOffset, long long maxOffset)
+      : pullStatus(pullStatus),
+        nextBeginOffset(nextBeginOffset),
+        minOffset(minOffset),
+        maxOffset(maxOffset) {}
+
+  virtual ~PullResultONS() {}
+
+ public:
+  ONSPullStatus pullStatus;
+  long long nextBeginOffset;
+  long long minOffset;
+  long long maxOffset;
+  std::vector<Message> msgFoundList;
+};
+}
+#endif
diff --git a/src/main/cpp/include/PushConsumer.h b/src/main/cpp/include/PushConsumer.h
new file mode 100644
index 0000000..925858c
--- /dev/null
+++ b/src/main/cpp/include/PushConsumer.h
@@ -0,0 +1,20 @@
+#ifndef __PUSHCONSUMER_H__
+#define __PUSHCONSUMER_H__
+
+#include "MessageListener.h"
+
+namespace ons {
+
+class  ONSCLIENT_API PushConsumer {
+ public:
+  PushConsumer() {}
+  virtual ~PushConsumer() {}
+
+  virtual void start() = 0;
+  virtual void shutdown() = 0;
+  virtual void subscribe(const char* topic, const char* subExpression,
+                         MessageListener* listener) = 0;
+  // virtual void setNamesrvAddr(const std::string& nameSrvAddr) = 0;
+};
+}
+#endif
diff --git a/src/main/cpp/include/SendResultONS.h b/src/main/cpp/include/SendResultONS.h
new file mode 100644
index 0000000..d21ec12
--- /dev/null
+++ b/src/main/cpp/include/SendResultONS.h
@@ -0,0 +1,19 @@
+#ifndef __SENDRESULTONS_H__
+#define __SENDRESULTONS_H__
+#include <string>
+#include "ONSClient.h"
+
+namespace ons {
+
+class  ONSCLIENT_API SendResultONS {
+ public:
+  SendResultONS();
+  virtual ~SendResultONS();
+  void setMessageId(const std::string& msgId);
+	const char* getMessageId() const;
+
+ private:
+  std::string messageId;
+};
+}
+#endif
diff --git a/src/main/cpp/include/TransactionProducer.h b/src/main/cpp/include/TransactionProducer.h
new file mode 100644
index 0000000..bceadb0
--- /dev/null
+++ b/src/main/cpp/include/TransactionProducer.h
@@ -0,0 +1,29 @@
+#ifndef __TRANSACTIONPRODUCER_H__
+#define __TRANSACTIONPRODUCER_H__
+
+#include "LocalTransactionExecuter.h"
+#include "ONSClient.h"
+#include "SendResultONS.h"
+
+namespace ons {
+    class ONSCLIENT_API TransactionProducer {
+    public:
+        TransactionProducer() {}
+
+        virtual ~TransactionProducer() {}
+
+        // before send msg, start must be called to allocate resources.
+        virtual void start() = 0;
+
+        // before exit ons, shutdown must be called to release all resources allocated
+        // by ons internally.
+        virtual void shutdown() = 0;
+
+        // retry max 3 times if send failed. if no exception throwed, it sends
+        // success;
+        virtual SendResultONS send(Message &msg,
+                                   LocalTransactionExecuter *executer) = 0;
+    };
+}
+
+#endif
diff --git a/src/main/cpp/include/TransactionStatus.h b/src/main/cpp/include/TransactionStatus.h
new file mode 100644
index 0000000..c47cd99
--- /dev/null
+++ b/src/main/cpp/include/TransactionStatus.h
@@ -0,0 +1,13 @@
+#ifndef __TRANSACTIONSTATUS_H__
+#define __TRANSACTIONSTATUS_H__
+
+namespace ons {
+
+enum TransactionStatus {
+  CommitTransaction = 0,
+  RollbackTransaction = 1,
+  Unknow = 2,
+};
+}
+
+#endif
diff --git a/src/test/cpp/MessageTest.cpp b/src/test/cpp/MessageTest.cpp
new file mode 100644
index 0000000..8b54067
--- /dev/null
+++ b/src/test/cpp/MessageTest.cpp
@@ -0,0 +1,237 @@
+#include <iostream>
+#include <chrono>
+#include <gtest/gtest.h>
+#include <memory>
+
+#include "Message.h"
+#include "ONSFactory.h"
+
+class MessageTest : public testing::Test {
+protected:
+    void SetUp() override {
+        ubody_ = new unsigned char[10];
+        memset(ubody_, 0, 10);
+        strcpy(reinterpret_cast<char *>(ubody_), "RocketMQ");
+        sbody = std::string("RocketMQ");
+    }
+
+    void TearDown() override {
+        delete ubody_;
+    }
+
+    const char *topic_ = "Topic";
+    const char *tag_ = "Tag";
+    const char *key_ = "Key";
+    const char *body_ = "RocketMQ";
+    const char *user_key_ = "UserKey";
+    const char *user_value_ = "UserValue";
+    const char *sys_key_ = "SysKey";
+    const char *sys_value_ = "SysValue";
+    unsigned char *ubody_;
+    int bodylen = strlen("RocketMQ");
+    std::string sbody;
+};
+
+TEST_F(MessageTest, testMessage_TopicBeingNULL) {
+    EXPECT_THROW(
+            ons::Message msg(NULL, tag_, key_, body_);,
+            ons::ONSClientException);
+}
+
+TEST_F(MessageTest, testMessageWithoutKey_TopicBeingNULL) {
+    EXPECT_THROW(
+            ons::Message msg(NULL, tag_, body_);,
+            ons::ONSClientException);
+}
+
+TEST_F(MessageTest, testMessage_TopicLenBeingZero) {
+    EXPECT_THROW(
+            ons::Message msg(topic_, 0, tag_, strlen(tag_), body_, strlen(body_));,
+            ons::ONSClientException);
+}
+
+TEST_F(MessageTest, testMessage_BodyBeingNULL) {
+    EXPECT_THROW(
+            ons::Message msg(topic_, tag_, key_, NULL);,
+            ons::ONSClientException);
+}
+
+TEST_F(MessageTest, testMessageWithoutKey_BodyBeingNULL) {
+    EXPECT_THROW(
+            ons::Message msg(topic_, tag_, NULL);,
+            ons::ONSClientException);
+}
+
+TEST_F(MessageTest, testMessage_BodyLenBeingZero) {
+    EXPECT_THROW(
+            ons::Message msg(topic_, strlen(topic_), tag_, strlen(tag_), body_, 0);,
+            ons::ONSClientException);
+}
+
+TEST_F(MessageTest, testMessageSysPerperty_KeyBeingNULL) {
+    ons::Message msg(topic_, tag_, key_, body_);
+    EXPECT_THROW(
+            msg.putSystemProperties(NULL, sys_value_);,
+            ons::ONSClientException);
+}
+
+TEST_F(MessageTest, testMessageSysPerperty_ValueBeingNULL) {
+    ons::Message msg(topic_, tag_, key_, body_);
+    EXPECT_THROW(
+            msg.putSystemProperties(sys_key_, NULL);,
+            ons::ONSClientException);
+}
+
+TEST_F(MessageTest, testMessageUserPerperty_KeyBeingNULL) {
+    ons::Message msg(topic_, tag_, key_, body_);
+    EXPECT_THROW(
+            msg.putSystemProperties(NULL, user_value_);,
+            ons::ONSClientException);
+}
+
+TEST_F(MessageTest, testMessageUserPerperty_ValueBeingNULL) {
+    ons::Message msg(topic_, tag_, key_, body_);
+    EXPECT_THROW(
+            msg.putSystemProperties(user_key_, NULL);,
+            ons::ONSClientException);
+}
+
+TEST_F(MessageTest, testMessageGetTopic_ValueBeingNormal) {
+    ons::Message msg(topic_, tag_, key_, body_);
+    ASSERT_STREQ(topic_, msg.getTopic());
+}
+
+TEST_F(MessageTest, testMessageetTopic_ValueBeingNormal) {
+    ons::Message msg("", tag_, key_, body_);
+    msg.setTopic(topic_);
+    ASSERT_STREQ(topic_, msg.getTopic());
+}
+
+TEST_F(MessageTest, testMessageGetTopic_ValueBeingNull) {
+    ons::Message msg("", tag_, key_, body_);
+    ASSERT_STREQ("", msg.getTopic());
+}
+
+TEST_F(MessageTest, testMessageSetTopic_ValueBeingNull) {
+    ons::Message msg("", tag_, key_, body_);
+    msg.setTopic(NULL);
+    ASSERT_STREQ("", msg.getTopic());
+}
+
+TEST_F(MessageTest, testMessageGetTag_ValueBeingNormal) {
+    ons::Message msg(topic_, tag_, key_, body_);
+    ASSERT_STREQ(tag_, msg.getTag());
+}
+
+TEST_F(MessageTest, testMessageSetTag_ValueBeingNormal) {
+    ons::Message msg(topic_, "", key_, body_);
+    msg.setTag(tag_);
+    ASSERT_STREQ(tag_, msg.getTag());
+}
+
+TEST_F(MessageTest, testMessageGetTag_ValueBeingNull) {
+    ons::Message msg(topic_, "", key_, body_);
+    ASSERT_STREQ("", msg.getTag());
+}
+
+TEST_F(MessageTest, testMessageSetTag_ValueBeingNull) {
+    ons::Message msg(topic_, "", key_, body_);
+    msg.setTag(NULL);
+    ASSERT_STREQ("", msg.getTag());
+}
+
+TEST_F(MessageTest, testMessageGetBody_ValueBeingNormal) {
+    ons::Message msg(topic_, tag_, key_, body_);
+    ASSERT_STREQ(body_, msg.getBody());
+}
+
+TEST_F(MessageTest, testMessageSetBody_ValueBeingNormal) {
+    ons::Message msg(topic_, tag_, key_, "");
+    msg.setBody(ubody_, bodylen);
+    ASSERT_STREQ(body_, msg.getBody());
+}
+
+TEST_F(MessageTest, testMessageGetByteBody_ValueBeingNormal) {
+    int len = 0;
+    ons::Message msg(topic_, tag_, key_, body_);
+    const char *strBody = msg.getByteBody(&len);
+    ASSERT_STREQ(body_, strBody);
+    ASSERT_EQ(strlen(body_), len);
+}
+
+TEST_F(MessageTest, testMessageSetBodyWithLen_ValueBeingNormal) {
+    int len = 0;
+    ons::Message msg(topic_, tag_, key_, "");
+    msg.setBody(ubody_, bodylen);
+    const char *strBody = msg.getByteBody(&len);
+    ASSERT_STREQ(body_, strBody);
+    ASSERT_EQ(bodylen, len);
+}
+
+TEST_F(MessageTest, testMessageGetBody_ValueBeingNull) {
+    ons::Message msg(topic_, tag_, key_, "");
+    ASSERT_STREQ("", msg.getBody());
+}
+
+TEST_F(MessageTest, testMessageSetBody_ValueBeingNull) {
+    ons::Message msg(topic_, tag_, key_, "");
+    msg.setBody(NULL, strlen(body_));
+    ASSERT_STREQ("", msg.getBody());
+}
+
+TEST_F(MessageTest, testMessageGetMsgBody_ValueBeingString) {
+    ons::Message msg(topic_, tag_, key_, body_);
+    ASSERT_STREQ(sbody.c_str(), msg.getMsgBody().c_str());
+}
+
+TEST_F(MessageTest, testMessageGetMsgBodyLen_ValueBeingString) {
+    ons::Message msg(topic_, tag_, key_, body_);
+    ASSERT_EQ(sbody.length(), msg.getBodySize());
+}
+
+TEST_F(MessageTest, testMessageSetMsgBody_ValueBeingString) {
+    ons::Message msg(topic_, tag_, key_, "");
+    msg.setMsgBody(sbody);
+    ASSERT_STREQ(sbody.c_str(), msg.getMsgBody().c_str());
+}
+
+TEST_F(MessageTest, testMessageGetUserPerperty_KeyBeingNull) {
+    ons::Message msg(topic_, tag_, key_, "");
+    ASSERT_TRUE(NULL == msg.getUserProperties(NULL));
+}
+
+TEST_F(MessageTest, testMessageGetUserPerperty_KeyNotExsit) {
+    ons::Message msg(topic_, tag_, key_, "");
+    ASSERT_STREQ("", msg.getUserProperties("NONEEXISTKEY"));
+}
+
+TEST_F(MessageTest, testMessageGetSysPerperty_KeyBeingNull) {
+    ons::Message msg(topic_, tag_, key_, "");
+    ASSERT_TRUE(NULL == msg.getSystemProperties(NULL));
+}
+
+TEST_F(MessageTest, testMessageGetSysPerperty_KeyNotExsit) {
+    ons::Message msg(topic_, tag_, key_, "");
+    ASSERT_STREQ("", msg.getUserProperties("NONEEXISTKEY"));
+}
+
+TEST_F(MessageTest, testMessageSetDeliverTime_ValueBeingNormal) {
+    ons::Message msg(topic_, tag_, key_, body_);
+    long long now = 123456789L;
+    msg.setStartDeliverTime(now);
+    ASSERT_EQ(now, msg.getStartDeliverTime());
+}
+
+TEST_F(MessageTest, testMessageSetDeliverTime_ValueBeingZero) {
+    ons::Message msg(topic_, tag_, key_, body_);
+    long long now = 0;
+    msg.setStartDeliverTime(now);
+    ASSERT_EQ(now, msg.getStartDeliverTime());
+}
+
+TEST_F(MessageTest, testMessageSetDeliverTime_ValueBeingNagetive) {
+    ons::Message msg(topic_, tag_, key_, body_);
+    long long now = -10000;
+    msg.setStartDeliverTime(now);
+    ASSERT_EQ(now, msg.getStartDeliverTime());
+}