You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2019/07/30 09:38:30 UTC
[rocketmq-ons-cpp] 03/35: Add ONS API
This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-ons-cpp.git
commit e3411416a516d652b591450abc0441e21216e7af
Author: ShannonDing <li...@163.com>
AuthorDate: Tue Jul 23 21:19:24 2019 +0800
Add ONS API
---
src/main/c/native/rocketmq.h | 63 +++++++
src/main/cpp/demos/CMakeLists.Release | 39 ++++
src/main/cpp/demos/CMakeLists.txt | 17 ++
src/main/cpp/demos/ConsumerDemo.cpp | 63 +++++++
src/main/cpp/demos/MultiThreadProducerDemo.cpp | 75 ++++++++
src/main/cpp/demos/OrderConsumerDemo.cpp | 59 ++++++
src/main/cpp/demos/OrderProducerDemo.cpp | 54 ++++++
src/main/cpp/demos/ProducerAsyncDemo.cpp | 93 ++++++++++
src/main/cpp/demos/ProducerDemo.cpp | 56 ++++++
src/main/cpp/demos/ProducerOnewayDemo.cpp | 49 +++++
src/main/cpp/demos/TransactionProducerDemo.cpp | 75 ++++++++
src/main/cpp/include/Action.h | 14 ++
src/main/cpp/include/ConsumeContext.h | 13 ++
src/main/cpp/include/ConsumeOrderContext.h | 13 ++
src/main/cpp/include/LocalTransactionChecker.h | 16 ++
src/main/cpp/include/LocalTransactionExecuter.h | 16 ++
src/main/cpp/include/Message.h | 100 ++++++++++
src/main/cpp/include/MessageListener.h | 19 ++
src/main/cpp/include/MessageOrderListener.h | 20 ++
src/main/cpp/include/MessageQueueONS.h | 39 ++++
src/main/cpp/include/MessageQueueSelectorONS.h | 17 ++
src/main/cpp/include/ONSCallback.h | 16 ++
src/main/cpp/include/ONSChannel.h | 15 ++
src/main/cpp/include/ONSClient.h | 20 ++
src/main/cpp/include/ONSClientException.h | 25 +++
src/main/cpp/include/ONSFactory.h | 103 ++++++++++
src/main/cpp/include/OrderAction.h | 11 ++
src/main/cpp/include/OrderConsumer.h | 19 ++
src/main/cpp/include/OrderProducer.h | 23 +++
src/main/cpp/include/Producer.h | 35 ++++
src/main/cpp/include/PullConsumer.h | 38 ++++
src/main/cpp/include/PullResultONS.h | 40 ++++
src/main/cpp/include/PushConsumer.h | 20 ++
src/main/cpp/include/SendResultONS.h | 19 ++
src/main/cpp/include/TransactionProducer.h | 29 +++
src/main/cpp/include/TransactionStatus.h | 13 ++
src/test/cpp/MessageTest.cpp | 237 ++++++++++++++++++++++++
37 files changed, 1573 insertions(+)
diff --git a/src/main/c/native/rocketmq.h b/src/main/c/native/rocketmq.h
new file mode 100644
index 0000000..ad48a98
--- /dev/null
+++ b/src/main/c/native/rocketmq.h
@@ -0,0 +1,63 @@
+#ifndef __ROCKETMQ_H__
+#define __ROCKETMQ_H__
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+
+typedef struct message_struct {
+ char *topic;
+ char *tags;
+ char *body;
+ unsigned int body_size;
+ char *key;
+ char *user_prop;
+ char *system_prop;
+} message;
+
+typedef struct send_result_struct {
+ char *message_id;
+ int error_no;
+ char *error_msg;
+} send_result;
+
+typedef struct factory_property_struct {
+ char *group_id;
+ char *access_key;
+ char *access_secret;
+ char *name_srv_addr;
+ char *name_srv_domain;
+ char *message_model;
+ char *send_msg_timeout_millis;
+ char *consume_thread_nums;
+ char *ons_channel;
+ char *max_msg_cache_size;
+ char *ons_trace_switch;
+ char *consumer_instance_name;
+ char *language_identifier;
+ char *instance_id;
+ int use_domain;
+} factory_property;
+
+typedef struct callback_func_struct {
+ char *send_callback_ons;
+
+ void (*on_success)(void *thread, char *message_id, char *send_callback_ons);
+
+ void (*on_exception)(void *thread, char *m_msg, int m_error, char *send_callback_ons);
+} callback_func;
+
+typedef struct subscription_struct {
+ char *topic;
+ char *sub_expression;
+
+ int (*on_message)(void *thread, void *opaque, char *topic, char *user_props, char *sys_props, char *body, int len);
+
+ void *opaque;
+} subscription;
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif //__ROCKETMQ_H__
\ No newline at end of file
diff --git a/src/main/cpp/demos/CMakeLists.Release b/src/main/cpp/demos/CMakeLists.Release
new file mode 100644
index 0000000..7f47ee5
--- /dev/null
+++ b/src/main/cpp/demos/CMakeLists.Release
@@ -0,0 +1,39 @@
+cmake_minimum_required(VERSION 3.0)
+project(onsclient4cpp_demo VERSION 1.0
+ LANGUAGES C CXX)
+set(CMAKE_CXX_STANDARD 11)
+include_directories(../include)
+
+find_library(ROCKETMQ_CLIENT_CORE
+ NAMES rocketmq_client_core
+ HINTS ../lib)
+
+if (${ROCKETMQ_CLIENT_CORE-NOTFOUNT})
+ message("find_library for rocketmq_client_core failed")
+endif ()
+
+find_library(ONS_CLIENT
+ NAMES onsclient4cpp
+ HINTS ../lib)
+
+if (${ONS_CLIENT-NOTFOUNT})
+ message("find_library for rocketmq_client_core failed")
+endif ()
+
+macro(add_demo name source_file)
+ add_executable(${name} ${source_file})
+ target_link_libraries(${name} pthread ${ROCKETMQ_CLIENT_CORE} ${ONS_CLIENT})
+ set_target_properties(${name} PROPERTIES RUNTIME_OUTPUT_DIRECTORY ${CMAKE_SOURCE_DIR}/bin)
+endmacro()
+
+add_demo(producer_demo ProducerDemo.cpp)
+add_demo(order_producer_demo OrderProducerDemo.cpp)
+add_demo(multi_thread_producer_demo MultiThreadProducerDemo.cpp)
+add_demo(producer_async_demo ProducerAsyncDemo.cpp)
+add_demo(producer_oneway_demo ProducerOnewayDemo.cpp)
+add_demo(consumer_demo ConsumerDemo.cpp)
+add_demo(order_consumer_demo OrderConsumerDemo.cpp)
+add_demo(transaction_producer_demo TransactionProducerDemo.cpp)
+
+
+
diff --git a/src/main/cpp/demos/CMakeLists.txt b/src/main/cpp/demos/CMakeLists.txt
new file mode 100644
index 0000000..6ffd443
--- /dev/null
+++ b/src/main/cpp/demos/CMakeLists.txt
@@ -0,0 +1,17 @@
+macro(add_demo name source_file)
+ add_executable(${name} ${source_file})
+ target_link_libraries(${name} pthread ${CMAKE_PROJECT_NAME})
+ set_target_properties(${name} PROPERTIES RUNTIME_OUTPUT_DIRECTORY ${CMAKE_SOURCE_DIR}/bin/demos)
+endmacro()
+
+add_demo(producer_demo ProducerDemo.cpp)
+add_demo(order_producer_demo OrderProducerDemo.cpp)
+add_demo(multi_thread_producer_demo MultiThreadProducerDemo.cpp)
+add_demo(producer_async_demo ProducerAsyncDemo.cpp)
+add_demo(producer_oneway_demo ProducerOnewayDemo.cpp)
+add_demo(consumer_demo ConsumerDemo.cpp)
+add_demo(order_consumer_demo OrderConsumerDemo.cpp)
+add_demo(transaction_producer_demo TransactionProducerDemo.cpp)
+
+
+
diff --git a/src/main/cpp/demos/ConsumerDemo.cpp b/src/main/cpp/demos/ConsumerDemo.cpp
new file mode 100644
index 0000000..0df672a
--- /dev/null
+++ b/src/main/cpp/demos/ConsumerDemo.cpp
@@ -0,0 +1,63 @@
+#include "ONSFactory.h"
+
+#include <iostream>
+#include <thread>
+#include <mutex>
+
+using namespace ons;
+
+std::mutex console_mtx;
+
+class ExampleMessageListener : public MessageListener {
+public:
+ Action consume(Message& message, ConsumeContext& context) {
+ //此处为具体的消息处理过程,确认消息被处理成功请返回 CommitMessage,
+ //如果有消费异常,或者期望重新消费,可以返回 ReconsumeLater,消息将会在一段时间后重新投递
+ std::lock_guard<std::mutex> lk(console_mtx);
+ std::cout << "Received a message. Topic: " << message.getTopic() << ", MsgId: "
+ << message.getMsgID() << std::endl;
+ return CommitMessage;
+ }
+};
+
+int main(int argc, char* argv[]) {
+ std::cout << "=======Before consuming messages=======" << std::endl;
+ ONSFactoryProperty factoryInfo;
+ //Request from ONS console, this should be GroupID here.
+ //请填写在阿里云ONS控制台上申请的GroupID,从实例化的版本开始,ProducerId和CounsumerId已经统一,此处设置是为了接口保持向前兼容。
+ factoryInfo.setFactoryProperty(ONSFactoryProperty::ConsumerId, "GID_XXX");
+ //Request from ONS console
+ //请填写阿里云ONS控制台上申请的topic
+ factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics, "Your topic");
+ //Your Access Key from your account.
+ //请填写你的账户的AK
+ factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "Your Access Key");
+ //Your Secret Key from your account.
+ //请填写你的账户的SK
+ factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "Your Secret Key");
+ //This is the endpoint from ONS console
+ //请填写阿里云ONS控制台上对应实例的接入点
+ factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR,
+ "http://xxxxxxxxxxxxxxxx.aliyuncs.com:80");
+
+ PushConsumer *consumer = ONSFactory::getInstance()->createPushConsumer(factoryInfo);
+
+ std::string topic(factoryInfo.getPublishTopics());
+ std::string tag("Your Tag");
+
+ //register your own listener here to handle the messages received.
+ //请注册自定义侦听函数用来处理接收到的消息,并返回响应的处理结果。
+ ExampleMessageListener *messageListener = new ExampleMessageListener();
+ consumer->subscribe(topic.c_str(), tag.c_str(), messageListener);
+
+ //Start this consumer
+ //准备工作完成,必须调用启动函数,才可以正常工作。
+ consumer->start();
+
+ //Keep main thread running until process finished.
+ //请保持线程常驻,不要执行shutdown操作
+ std::this_thread::sleep_for(std::chrono::milliseconds(60 * 1000));
+ consumer->shutdown();
+ std::cout << "=======After consuming messages======" << std::endl;
+ return 0;
+}
\ No newline at end of file
diff --git a/src/main/cpp/demos/MultiThreadProducerDemo.cpp b/src/main/cpp/demos/MultiThreadProducerDemo.cpp
new file mode 100644
index 0000000..867a736
--- /dev/null
+++ b/src/main/cpp/demos/MultiThreadProducerDemo.cpp
@@ -0,0 +1,75 @@
+#include <iostream>
+#include <chrono>
+#include <thread>
+#include <mutex>
+#include <vector>
+
+#include "ONSFactory.h"
+
+using namespace std;
+using namespace ons;
+
+int main() {
+ std::cout << "=======Before sending messages=======" << std::endl;
+ ONSFactoryProperty factoryInfo;
+ //Request from ONS console, this should be GroupID here.
+ //请填写在阿里云ONS控制台上申请的GroupID,从实例化的版本开始,ProducerId和CounsumerId已经统一,此处设置是为了接口保持向前兼容。
+ factoryInfo.setFactoryProperty(ONSFactoryProperty::ProducerId, "GID_XXX");
+ //Request from ONS console
+ //请填写阿里云ONS控制台上申请的topic
+ factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics, "Your topic");
+ //Your Access Key from your account.
+ //请填写你的账户的AK
+ factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "Your Access Key");
+ //Your Secret Key from your account.
+ //请填写你的账户的SK
+ factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "Your Secret Key");
+ //This is the endpoint from ONS console
+ //请填写阿里云ONS控制台上对应实例的接入点
+ factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR,
+ "http://xxxxxxxxxxxxxxxx.aliyuncs.com:80");
+ Producer *producer = nullptr;
+ producer = ONSFactory::getInstance()->createProducer(factoryInfo);
+ producer->start();
+ Message msg(
+ factoryInfo.getPublishTopics(),
+ "Your Tag",
+ "Your Key",
+ "This message body."
+ );
+
+ std::mutex console_mutex;
+
+ auto lambda = [&]() {
+ auto start = std::chrono::system_clock::now();
+ int count = 32;
+ for (int i = 0; i < count; ++i) {
+ SendResultONS sendResult = producer->send(msg);
+ std::cout << "Message ID: " << sendResult.getMessageId() << std::endl;
+ }
+ auto interval = std::chrono::system_clock::now() - start;
+ {
+ std::lock_guard<std::mutex> lk(console_mutex);
+ std::cout << "Send " << count << " messages OK, costs "
+ << std::chrono::duration_cast<std::chrono::milliseconds>(interval).count() << "ms" << std::endl;
+ }
+ };
+
+ std::vector<std::thread> threads;
+ int thread_num = 3;
+
+ threads.reserve(thread_num);
+ for (int i = 0; i < thread_num; ++i) {
+ threads.emplace_back(lambda);
+ }
+
+ for (auto& t : threads) {
+ if (t.joinable()) {
+ t.join();
+ }
+ }
+
+ producer->shutdown();
+ std::cout << "=======After sending messages=======" << std::endl;
+ return 0;
+}
\ No newline at end of file
diff --git a/src/main/cpp/demos/OrderConsumerDemo.cpp b/src/main/cpp/demos/OrderConsumerDemo.cpp
new file mode 100644
index 0000000..28f5ee5
--- /dev/null
+++ b/src/main/cpp/demos/OrderConsumerDemo.cpp
@@ -0,0 +1,59 @@
+#include "ONSFactory.h"
+
+#include <iostream>
+#include <thread>
+#include <mutex>
+
+using namespace ons;
+
+std::mutex console_mtx;
+
+class ExampleMessageListener : public MessageOrderListener {
+public:
+ OrderAction consume(Message &message, ConsumeOrderContext &context) {
+ //此处为具体的消息处理过程,确认消息被处理成功请返回 Success,
+ //如果有消费异常,或者期望重新消费,可以返回 Suspend,消息将会在一段时间后重新投递
+ std::lock_guard<std::mutex> lk(console_mtx);
+ std::cout << "Received a message. Topic: " << message.getTopic() << ", MsgId: "
+ << message.getMsgID() << std::endl;
+ return Success;
+ }
+};
+
+int main(int argc, char *argv[]) {
+ std::cout << "=======Before consuming messages=======" << std::endl;
+ ONSFactoryProperty factoryInfo;
+ //Request from ONS console, this should be GroupID here.
+ //请填写在阿里云ONS控制台上申请的GroupID,从实例化的版本开始,ProducerId和CounsumerId已经统一,此处设置是为了接口保持向前兼容。
+ factoryInfo.setFactoryProperty(ONSFactoryProperty::ConsumerId, "GID_XXX");
+ //Request from ONS console
+ //请填写阿里云ONS控制台上申请的topic
+ factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics, "Your topic");
+ //Your Access Key from your account.
+ //请填写你的账户的AK
+ factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "Your Access Key");
+ //Your Secret Key from your account.
+ //请填写你的账户的SK
+ factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "Your Secret Key");
+ //This is the endpoint from ONS console
+ //请填写阿里云ONS控制台上对应实例的接入点
+ factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR,
+ "http://xxxxxxxxxxxxxxxx.aliyuncs.com:80");
+ OrderConsumer *consumer = ONSFactory::getInstance()->createOrderConsumer(factoryInfo);
+
+ std::string topic(factoryInfo.getPublishTopics());
+ std::string tag("Your Tag");
+ ExampleMessageListener *messageListener = new ExampleMessageListener();
+ consumer->subscribe(topic.c_str(), tag.c_str(), messageListener);
+
+ //Start this consumer
+ //准备工作完成,必须调用启动函数,才可以正常工作。
+ consumer->start();
+
+ //Keep main thread running until process finished.
+ //请保持线程常驻,不要执行shutdown操作
+ std::this_thread::sleep_for(std::chrono::milliseconds(60 * 1000));
+ consumer->shutdown();
+ std::cout << "=======After consuming messages======" << std::endl;
+ return 0;
+}
\ No newline at end of file
diff --git a/src/main/cpp/demos/OrderProducerDemo.cpp b/src/main/cpp/demos/OrderProducerDemo.cpp
new file mode 100644
index 0000000..6bf6ed4
--- /dev/null
+++ b/src/main/cpp/demos/OrderProducerDemo.cpp
@@ -0,0 +1,54 @@
+#include <iostream>
+#include <chrono>
+#include "ONSFactory.h"
+
+using namespace std;
+using namespace ons;
+
+int main() {
+ std::cout << "=======Before sending messages=======" << std::endl;
+ ONSFactoryProperty factoryInfo;
+ //Request from ONS console, this should be GroupID here.
+ //请填写在阿里云ONS控制台上申请的GroupID,从实例化的版本开始,ProducerId和CounsumerId已经统一,此处设置是为了接口保持向前兼容。
+ factoryInfo.setFactoryProperty(ONSFactoryProperty::ProducerId, "GID_XXX");
+ //Request from ONS console
+ //请填写阿里云ONS控制台上申请的topic
+ factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics, "Your topic");
+ //Your Access Key from your account.
+ //请填写你的账户的AK
+ factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "Your Access Key");
+ //Your Secret Key from your account.
+ //请填写你的账户的SK
+ factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "Your Secret Key");
+ //This is the endpoint from ONS console
+ //请填写阿里云ONS控制台上对应实例的接入点
+ factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR,
+ "http://xxxxxxxxxxxxxxxx.aliyuncs.com:80");
+ OrderProducer *producer = nullptr;
+ producer = ONSFactory::getInstance()->createOrderProducer(factoryInfo);
+ producer->start();
+ Message msg(
+ factoryInfo.getPublishTopics(),
+ "Your Tag",
+ "Your Key",
+ "This message body."
+ );
+
+ auto start = std::chrono::system_clock::now();
+ int count = 32;
+ for (int i = 0; i < count; ++i) {
+ try {
+ SendResultONS sendResult = producer->send(msg,"Your Sharding Key");
+ std::cout << "Topic: " << msg.getTopic() << ", Message ID: " << sendResult.getMessageId() << std::endl;
+ } catch (ONSClientException e) {
+ std::cout << "ErrorCode: " << e.GetError() << " Exception:" << e.GetMsg() << std::endl;
+ }
+ }
+ auto interval = std::chrono::system_clock::now() - start;
+ std::cout << "Send " << count << " messages OK, costs "
+ << std::chrono::duration_cast<std::chrono::milliseconds>(interval).count() << "ms" << std::endl;
+
+ producer->shutdown();
+ std::cout << "=======After sending messages=======" << std::endl;
+ return 0;
+}
\ No newline at end of file
diff --git a/src/main/cpp/demos/ProducerAsyncDemo.cpp b/src/main/cpp/demos/ProducerAsyncDemo.cpp
new file mode 100644
index 0000000..451ab8a
--- /dev/null
+++ b/src/main/cpp/demos/ProducerAsyncDemo.cpp
@@ -0,0 +1,93 @@
+#include <iostream>
+#include <chrono>
+#include <mutex>
+#include <condition_variable>
+#include "ONSFactory.h"
+#include"ONSCallback.h"
+
+using namespace std;
+using namespace ons;
+
+std::mutex m1;
+std::mutex m2;
+std::condition_variable cv;
+
+class MyCallback : public SendCallbackONS {
+public:
+
+ void onSuccess(SendResultONS &sendResult) override {
+ std::lock_guard<std::mutex> lg(m2);
+ success_num++;
+ std::cout << "send success, message_id: " << sendResult.getMessageId() << ", total: " << success_num
+ << std::endl;
+ if (success_num + failed_num == total) {
+ cv.notify_all();
+ }
+ }
+
+ void onException(ONSClientException &e) override {
+ std::lock_guard<std::mutex> lg(m2);
+ failed_num++;
+ std::cout << "send failure, total: " << failed_num << std::endl;
+ std::cout << e.what() << std::endl;
+ if (success_num + failed_num == total) {
+ cv.notify_all();
+ }
+ }
+
+ static int success_num;
+ static int failed_num;
+ static int total;
+};
+
+int MyCallback::success_num = 0;
+int MyCallback::failed_num = 0;
+int MyCallback::total = 32;
+
+
+int main() {
+ std::cout << "=======before send message=======" << std::endl;
+ ONSFactoryProperty factoryInfo;
+ //Request from ONS console, this should be GroupID here.
+ //请填写在阿里云ONS控制台上申请的GroupID,从实例化的版本开始,ProducerId和CounsumerId已经统一,此处设置是为了接口保持向前兼容。
+ factoryInfo.setFactoryProperty(ONSFactoryProperty::ProducerId, "GID_XXX");
+ //Request from ONS console
+ //请填写阿里云ONS控制台上申请的topic
+ factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics, "Your topic");
+ //Your Access Key from your account.
+ //请填写你的账户的AK
+ factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "Your Access Key");
+ //Your Secret Key from your account.
+ //请填写你的账户的SK
+ factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "Your Secret Key");
+ //This is the endpoint from ONS console
+ //请填写阿里云ONS控制台上对应实例的接入点
+ factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR,
+ "http://xxxxxxxxxxxxxxxx.aliyuncs.com:80");
+ Producer *producer = nullptr;
+ producer = ONSFactory::getInstance()->createProducer(factoryInfo);
+ producer->start();
+ Message msg(
+ factoryInfo.getPublishTopics(),
+ "Your Tag",
+ "Your Key",
+ "This message body."
+ );
+
+ auto start = std::chrono::system_clock::now();
+ MyCallback m_callback;
+ for (int i = 0; i < MyCallback::total; ++i) {
+ producer->sendAsync(msg, &m_callback);
+ }
+
+ {
+ std::unique_lock<std::mutex> lk(m1);
+ cv.wait(lk);
+ }
+
+ producer->shutdown();
+ std::cout << "=======after sending messages=======" << std::endl;
+
+
+ return 0;
+}
\ No newline at end of file
diff --git a/src/main/cpp/demos/ProducerDemo.cpp b/src/main/cpp/demos/ProducerDemo.cpp
new file mode 100644
index 0000000..0b19174
--- /dev/null
+++ b/src/main/cpp/demos/ProducerDemo.cpp
@@ -0,0 +1,56 @@
+#include <iostream>
+#include <chrono>
+#include "ONSFactory.h"
+
+using namespace std;
+using namespace ons;
+
+int main() {
+ std::cout << "=======Before sending messages=======" << std::endl;
+ ONSFactoryProperty factoryInfo;
+ //Request from ONS console, this should be GroupID here.
+ //请填写在阿里云ONS控制台上申请的GroupID,从实例化的版本开始,ProducerId和CounsumerId已经统一,此处设置是为了接口保持向前兼容。
+ factoryInfo.setFactoryProperty(ONSFactoryProperty::ProducerId, "GID_XXX");
+ //Request from ONS console
+ //请填写阿里云ONS控制台上申请的topic
+ factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics, "Your topic");
+ //Your Access Key from your account.
+ //请填写你的账户的AK
+ factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "Your Access Key");
+ //Your Secret Key from your account.
+ //请填写你的账户的SK
+ factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "Your Secret Key");
+ //This is the endpoint from ONS console
+ //请填写阿里云ONS控制台上对应实例的接入点
+ factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR,
+ "http://xxxxxxxxxxxxxxxx.aliyuncs.com:80");
+ Producer *producer = nullptr;
+ producer = ONSFactory::getInstance()->createProducer(factoryInfo);
+ producer->start();
+ Message msg(
+ factoryInfo.getPublishTopics(),
+ "Your Tag",
+ "Your Key",
+ "This message body."
+ );
+
+ auto start = std::chrono::system_clock::now();
+ int count = 32;
+ for (int i = 0; i < count; ++i) {
+ try {
+ SendResultONS sendResult = producer->send(msg);
+ std::cout << "Message ID: " << sendResult.getMessageId() << std::endl;
+ } catch (ONSClientException e) {
+ std::cout << "ErrorCode: " << e.GetError() << " Exception:" << e.GetMsg() << std::endl;
+ }
+ }
+ auto interval = std::chrono::system_clock::now() - start;
+ std::cout << "Send " << count << " messages OK, costs "
+ << std::chrono::duration_cast<std::chrono::milliseconds>(interval).count() << "ms" << std::endl;
+
+ //Keep main thread running until process finished.
+ //请保持线程常驻,不要执行shutdown操作
+ producer->shutdown();
+ std::cout << "=======After sending messages=======" << std::endl;
+ return 0;
+}
\ No newline at end of file
diff --git a/src/main/cpp/demos/ProducerOnewayDemo.cpp b/src/main/cpp/demos/ProducerOnewayDemo.cpp
new file mode 100644
index 0000000..031bff3
--- /dev/null
+++ b/src/main/cpp/demos/ProducerOnewayDemo.cpp
@@ -0,0 +1,49 @@
+#include <iostream>
+#include <chrono>
+#include "ONSFactory.h"
+
+using namespace std;
+using namespace ons;
+
+int main() {
+ std::cout << "=======Before sending messages=======" << std::endl;
+ ONSFactoryProperty factoryInfo;
+ //Request from ONS console, this should be GroupID here.
+ //请填写在阿里云ONS控制台上申请的GroupID,从实例化的版本开始,ProducerId和CounsumerId已经统一,此处设置是为了接口保持向前兼容。
+ factoryInfo.setFactoryProperty(ONSFactoryProperty::ProducerId, "GID_XXX");
+ //Request from ONS console
+ //请填写阿里云ONS控制台上申请的topic
+ factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics, "Your topic");
+ //Your Access Key from your account.
+ //请填写你的账户的AK
+ factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "Your Access Key");
+ //Your Secret Key from your account.
+ //请填写你的账户的SK
+ factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "Your Secret Key");
+ //This is the endpoint from ONS console
+ //请填写阿里云ONS控制台上对应实例的接入点
+ factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR,
+ "http://xxxxxxxxxxxxxxxx.aliyuncs.com:80");
+ Producer *producer = nullptr;
+ producer = ONSFactory::getInstance()->createProducer(factoryInfo);
+ producer->start();
+ Message msg(
+ factoryInfo.getPublishTopics(),
+ "Your Tag",
+ "Your Key",
+ "This message body."
+ );
+
+ auto start = std::chrono::system_clock::now();
+ int count = 32;
+ for (int i = 0; i < count; ++i) {
+ producer->sendOneway(msg);
+ }
+ auto interval = std::chrono::system_clock::now() - start;
+ std::cout << "Send " << count << " messages OK, costs "
+ << std::chrono::duration_cast<std::chrono::milliseconds>(interval).count() << "ms" << std::endl;
+
+ producer->shutdown();
+ std::cout << "=======After sending messages=======" << std::endl;
+ return 0;
+}
\ No newline at end of file
diff --git a/src/main/cpp/demos/TransactionProducerDemo.cpp b/src/main/cpp/demos/TransactionProducerDemo.cpp
new file mode 100644
index 0000000..6044589
--- /dev/null
+++ b/src/main/cpp/demos/TransactionProducerDemo.cpp
@@ -0,0 +1,75 @@
+#include <iostream>
+#include <chrono>
+#include "ONSFactory.h"
+#include"LocalTransactionChecker.h"
+#include "LocalTransactionExecuter.h"
+
+
+using namespace std;
+using namespace ons;
+
+class LocalTransactionCheckerImpl : public LocalTransactionChecker {
+ virtual TransactionStatus check(Message &msg) {
+ cout << "checker::commit transaction" << endl;
+ return CommitTransaction;
+ }
+};
+
+class LocalTransactionExecuterImpl : public LocalTransactionExecuter {
+ virtual TransactionStatus execute(Message &msg) {
+ cout << "executer::commit transaction of msgid: " << msg.getMsgID() << endl;
+ return CommitTransaction;
+ }
+};
+
+
+int main() {
+ std::cout << "=======Before sending message=======" << std::endl;
+ ONSFactoryProperty factoryInfo;
+ //Request from ONS console, this should be GroupID here.
+ //请填写在阿里云ONS控制台上申请的GroupID,从实例化的版本开始,ProducerId和CounsumerId已经统一,此处设置是为了接口保持向前兼容。
+ factoryInfo.setFactoryProperty(ONSFactoryProperty::ProducerId, "GID_XXX");
+ //Request from ONS console
+ //请填写阿里云ONS控制台上申请的topic
+ factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics, "Your topic");
+ //Your Access Key from your account.
+ //请填写你的账户的AK
+ factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "Your Access Key");
+ //Your Secret Key from your account.
+ //请填写你的账户的SK
+ factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "Your Secret Key");
+ //This is the endpoint from ONS console
+ //请填写阿里云ONS控制台上对应实例的接入点
+ factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR,
+ "http://xxxxxxxxxxxxxxxx.aliyuncs.com:80");
+ TransactionProducer *producer = nullptr;
+ LocalTransactionCheckerImpl *checker = new LocalTransactionCheckerImpl();
+ producer = ONSFactory::getInstance()->createTransactionProducer(factoryInfo, checker);
+ producer->start();
+ Message msg(
+ factoryInfo.getPublishTopics(),
+ "Your Tag",
+ "Your Key",
+ "This message body."
+ );
+ auto start = std::chrono::system_clock::now();
+ int count = 32;
+ LocalTransactionExecuterImpl executer;
+ for (int i = 0; i < count; ++i) {
+ try {
+ SendResultONS sendResult = producer->send(msg, &executer);
+ std::cout << "Message ID: " << sendResult.getMessageId() << std::endl;
+ }
+ catch (ONSClientException &e) {
+ cout << e.GetMsg() << endl;
+ }
+ cout << endl;
+ }
+ auto interval = std::chrono::system_clock::now() - start;
+ std::cout << "Send " << count << " messages OK, costs "
+ << std::chrono::duration_cast<std::chrono::milliseconds>(interval).count() << "ms" << std::endl;
+
+// producer->shutdown();
+ std::cout << "=======After sending message=======" << std::endl;
+ return 0;
+}
\ No newline at end of file
diff --git a/src/main/cpp/include/Action.h b/src/main/cpp/include/Action.h
new file mode 100644
index 0000000..289f440
--- /dev/null
+++ b/src/main/cpp/include/Action.h
@@ -0,0 +1,14 @@
+#ifndef __ACTION_H__
+#define __ACTION_H__
+
+#include "ONSClient.h"
+
+// consuming result
+enum Action {
+ // consume success, application could continue to consume next message
+ CommitMessage,
+ // consume fail, server will deliver this message later, application could
+ // continue to consume next message
+ ReconsumeLater
+};
+#endif
diff --git a/src/main/cpp/include/ConsumeContext.h b/src/main/cpp/include/ConsumeContext.h
new file mode 100644
index 0000000..abb59de
--- /dev/null
+++ b/src/main/cpp/include/ConsumeContext.h
@@ -0,0 +1,13 @@
+#ifndef __CONSUMECONTEXT_H__
+#define __CONSUMECONTEXT_H__
+#include "ONSClient.h"
+
+namespace ons {
+
+class ONSCLIENT_API ConsumeContext {
+ public:
+ ConsumeContext() {}
+ virtual ~ConsumeContext() {}
+};
+}
+#endif
diff --git a/src/main/cpp/include/ConsumeOrderContext.h b/src/main/cpp/include/ConsumeOrderContext.h
new file mode 100644
index 0000000..a196d5d
--- /dev/null
+++ b/src/main/cpp/include/ConsumeOrderContext.h
@@ -0,0 +1,13 @@
+#ifndef __CONSUMEORDERLYCONTEXT_H__
+#define __CONSUMEORDERLYCONTEXT_H__
+#include "ONSClient.h"
+
+namespace ons {
+
+class ONSCLIENT_API ConsumeOrderContext {
+ public:
+ ConsumeOrderContext() {}
+ virtual ~ConsumeOrderContext() {}
+};
+}
+#endif
diff --git a/src/main/cpp/include/LocalTransactionChecker.h b/src/main/cpp/include/LocalTransactionChecker.h
new file mode 100644
index 0000000..8601671
--- /dev/null
+++ b/src/main/cpp/include/LocalTransactionChecker.h
@@ -0,0 +1,16 @@
+#ifndef __LOCALTRANSACTIONCHECKER_H__
+#define __LOCALTRANSACTIONCHECKER_H__
+
+#include "Message.h"
+#include "TransactionStatus.h"
+
+namespace ons {
+class LocalTransactionChecker {
+ public:
+ LocalTransactionChecker() {}
+ virtual TransactionStatus check(Message& msg) = 0;
+ virtual ~LocalTransactionChecker() {}
+};
+}
+
+#endif
diff --git a/src/main/cpp/include/LocalTransactionExecuter.h b/src/main/cpp/include/LocalTransactionExecuter.h
new file mode 100644
index 0000000..1d7ea4f
--- /dev/null
+++ b/src/main/cpp/include/LocalTransactionExecuter.h
@@ -0,0 +1,16 @@
+#ifndef __LOCALTRANSACTIONEXECUTER_H__
+#define __LOCALTRANSACTIONEXECUTER_H__
+
+#include "Message.h"
+#include "TransactionStatus.h"
+
+namespace ons {
+class LocalTransactionExecuter {
+ public:
+ LocalTransactionExecuter() {}
+ virtual TransactionStatus execute(Message& msg) = 0;
+ virtual ~LocalTransactionExecuter() {}
+};
+}
+
+#endif
diff --git a/src/main/cpp/include/Message.h b/src/main/cpp/include/Message.h
new file mode 100755
index 0000000..8038e29
--- /dev/null
+++ b/src/main/cpp/include/Message.h
@@ -0,0 +1,100 @@
+#ifndef __MESSAGE_H__
+#define __MESSAGE_H__
+
+#include <map>
+#include <sstream>
+#include <vector>
+#include "ONSClient.h"
+
+namespace ons {
+
+class SystemPropKey {
+ public:
+ SystemPropKey() {}
+ ~SystemPropKey() {}
+ static const char* TAG;
+ static const char* KEY;
+ static const char* MSGID;
+ static const char* RECONSUMETIMES;
+ static const char* STARTDELIVERTIME;
+};
+
+class ONSCLIENT_API Message {
+ public:
+ Message();
+ Message(const std::string& topic, const std::string& tags, const std::string& byte_body);
+ Message(const char* topic, const char* tags, const char* byte_body);
+ Message(const char* topic, size_t topic_size, const char* tags, size_t tags_size, const char* body, size_t body_size);
+ Message(const char* topic, const char* tags, const char* keys, const char* body);
+
+ virtual ~Message();
+
+ Message(const Message& other);
+ Message& operator=(const Message& other);
+
+ // userProperties was used to save user specific parameters which doesn't
+ // belong to SystemPropKey
+ void putUserProperties(const char* key, const char* value);
+ const char* getUserProperties(const char* key) const;
+ void setUserProperties(std::map<std::string, std::string>& userProperty);
+ std::map<std::string, std::string> getUserProperties() const;
+
+ // systemProperties only save parameters defined in SystemPropKey, please do
+ // not add other parameters into systemProperties, else it was not saved.
+ void putSystemProperties(const char* key, const char* value);
+ const char* getSystemProperties(const char* key) const;
+ void setSystemProperties(std::map<std::string, std::string>& systemProperty);
+ std::map<std::string, std::string> getSystemProperties() const;
+
+ const char* getTopic() const;
+ void setTopic(const char* topic);
+
+ const char* getTag() const;
+ void setTag(const char* tags);
+
+ const char* getKey() const;
+ void setKey(const char* keys);
+
+ const char* getMsgID() const;
+ void setMsgID(const char* msgId);
+
+ const long long getStartDeliverTime() const;
+ void setStartDeliverTime(long long level);
+
+ const char* getBody() const;
+ const char* getByteBody(int *len) const;
+ const std::string getMsgBody() const;
+ const size_t getBodySize() const;
+ void setMsgBody(const std::string msgbody);
+ void setBody(unsigned char* byte_msgbody, int len);
+
+ const int getReconsumeTimes() const;
+ void setReconsumeTimes(int reconsumeTimes);
+
+ long long getStoreTimestamp() const;
+ void setStoreTimestamp(long long storeTimestamp);
+
+ const std::string toString() const;
+
+ const std::string toSystemString() const;
+
+ const std::string toUserString() const;
+
+ long long getQueueOffset() const;
+ void setQueueOffset(long long queueOffset);
+ protected:
+ void Init(const std::string& topic, const std::string& tags,
+ const std::string& keys, const std::string& body);
+
+ private:
+ std::string topic;
+ std::string body;
+ size_t body_size;
+ long long m_storeTimestamp;
+ long long m_queueOffset;
+ std::map<std::string, std::string> systemProperties;
+ std::map<std::string, std::string> userProperties;
+};
+
+} //<!end namespace;
+#endif
diff --git a/src/main/cpp/include/MessageListener.h b/src/main/cpp/include/MessageListener.h
new file mode 100644
index 0000000..5733d2f
--- /dev/null
+++ b/src/main/cpp/include/MessageListener.h
@@ -0,0 +1,19 @@
+#ifndef __MESSAGELISTENER_H__
+#define __MESSAGELISTENER_H__
+
+#include "Action.h"
+#include "ConsumeContext.h"
+#include "Message.h"
+
+namespace ons {
+
+class ONSCLIENT_API MessageListener {
+ public:
+ MessageListener() {}
+ virtual ~MessageListener() {}
+
+ // interface of consuming message, should be realized by application
+ virtual Action consume(Message& message, ConsumeContext& context) = 0;
+};
+}
+#endif
diff --git a/src/main/cpp/include/MessageOrderListener.h b/src/main/cpp/include/MessageOrderListener.h
new file mode 100644
index 0000000..848dd20
--- /dev/null
+++ b/src/main/cpp/include/MessageOrderListener.h
@@ -0,0 +1,20 @@
+#ifndef __MESSAGEORDERLYLISTENER_H__
+#define __MESSAGEORDERLYLISTENER_H__
+
+#include "ConsumeOrderContext.h"
+#include "Message.h"
+#include "OrderAction.h"
+
+namespace ons {
+
+class MessageOrderListener {
+ public:
+ MessageOrderListener() {}
+ virtual ~MessageOrderListener() {}
+
+ // interface of consuming message, should be realized by application
+ virtual OrderAction consume(Message& message,
+ ConsumeOrderContext& context) = 0;
+};
+}
+#endif
diff --git a/src/main/cpp/include/MessageQueueONS.h b/src/main/cpp/include/MessageQueueONS.h
new file mode 100755
index 0000000..bd8a8dc
--- /dev/null
+++ b/src/main/cpp/include/MessageQueueONS.h
@@ -0,0 +1,39 @@
+#ifndef __MESSAGEQUEUEONS_H__
+#define __MESSAGEQUEUEONS_H__
+
+#include <iomanip>
+#include <sstream>
+#include <string>
+
+using namespace std;
+
+namespace ons {
+class MessageQueueONS {
+ public:
+ MessageQueueONS();
+ MessageQueueONS(const string& topic, const string& brokerName, int queueId);
+
+ MessageQueueONS(const MessageQueueONS& other);
+ MessageQueueONS& operator=(const MessageQueueONS& other);
+
+ string getTopic() const;
+ void setTopic(const string& topic);
+
+ string getBrokerName() const;
+ void setBrokerName(const string& brokerName);
+
+ int getQueueId() const;
+ void setQueueId(int queueId);
+
+ bool operator==(const MessageQueueONS& mq) const;
+ bool operator<(const MessageQueueONS& mq) const;
+ int compareTo(const MessageQueueONS& mq) const;
+
+ private:
+ string m_topic;
+ string m_brokerName;
+ int m_queueId;
+};
+
+} //<!end namespace;
+#endif
diff --git a/src/main/cpp/include/MessageQueueSelectorONS.h b/src/main/cpp/include/MessageQueueSelectorONS.h
new file mode 100644
index 0000000..a82a285
--- /dev/null
+++ b/src/main/cpp/include/MessageQueueSelectorONS.h
@@ -0,0 +1,17 @@
+#ifndef _MESSAGEQUEUESELECTOR_H_
+#define _MESSAGEQUEUESELECTOR_H_
+
+#include "Message.h"
+#include "MessageQueueONS.h"
+
+namespace ons {
+
+class MessageQueueSelectorONS {
+ public:
+ virtual ~MessageQueueSelectorONS() {}
+ virtual MessageQueueONS select(const vector<MessageQueueONS>& mqs,
+ const Message& msg, void* arg) = 0;
+};
+
+} //<!end namespace;
+#endif //<! _MQSELECTOR_H_
diff --git a/src/main/cpp/include/ONSCallback.h b/src/main/cpp/include/ONSCallback.h
new file mode 100644
index 0000000..d529426
--- /dev/null
+++ b/src/main/cpp/include/ONSCallback.h
@@ -0,0 +1,16 @@
+#ifndef __ONSCALLBACK_H__
+#define __ONSCALLBACK_H__
+
+#include "ONSClientException.h"
+#include "SendResultONS.h"
+
+namespace ons {
+ class SendCallbackONS {
+ public:
+ virtual ~SendCallbackONS() {}
+ virtual void onSuccess(SendResultONS& sendResult) {};
+ virtual void onException(ONSClientException& e) {};
+ };
+
+} // end of namespace SendResultONS
+#endif // end of _SENDCALLBACK_H_
diff --git a/src/main/cpp/include/ONSChannel.h b/src/main/cpp/include/ONSChannel.h
new file mode 100644
index 0000000..6f538af
--- /dev/null
+++ b/src/main/cpp/include/ONSChannel.h
@@ -0,0 +1,15 @@
+#ifndef __ONSCHANNEL_H__
+#define __ONSCHANNEL_H__
+
+namespace ons {
+
+ enum ONSChannel {
+ CLOUD,
+ ALIYUN,
+ ALL,
+ LOCAL,
+ INNER
+ };
+}
+
+#endif
diff --git a/src/main/cpp/include/ONSClient.h b/src/main/cpp/include/ONSClient.h
new file mode 100644
index 0000000..4ebc595
--- /dev/null
+++ b/src/main/cpp/include/ONSClient.h
@@ -0,0 +1,20 @@
+#ifndef __ONSCLIENT_H__
+#define __ONSCLIENT_H__
+
+#ifdef WIN32
+#ifdef ONSCLIENT_EXPORTS
+
+#ifndef SWIG
+#define ONSCLIENT_API __declspec(dllexport)
+#else
+#define ONSCLIENT_API
+#endif
+
+#else
+#define ONSCLIENT_API __declspec(dllimport)
+#endif
+#else
+#define ONSCLIENT_API
+#endif
+
+#endif
diff --git a/src/main/cpp/include/ONSClientException.h b/src/main/cpp/include/ONSClientException.h
new file mode 100644
index 0000000..76922f3
--- /dev/null
+++ b/src/main/cpp/include/ONSClientException.h
@@ -0,0 +1,25 @@
+#ifndef __ONSCLIENTEXCEPTION_H__
+#define __ONSCLIENTEXCEPTION_H__
+
+#include <exception>
+#include <string>
+#include "ONSClient.h"
+
+namespace ons {
+
+class ONSCLIENT_API ONSClientException : public std::exception {
+ public:
+ ONSClientException() throw();
+ virtual ~ONSClientException() throw();
+ ONSClientException(std::string msg, int error) throw();
+ const char* GetMsg() const throw();
+ const char* what() const throw();
+ int GetError() const throw();
+
+ private:
+ std::string m_msg;
+ int m_error;
+};
+}
+
+#endif
diff --git a/src/main/cpp/include/ONSFactory.h b/src/main/cpp/include/ONSFactory.h
new file mode 100755
index 0000000..ecb531a
--- /dev/null
+++ b/src/main/cpp/include/ONSFactory.h
@@ -0,0 +1,103 @@
+#ifndef __ONSFACTORY_H_
+#define __ONSFACTORY_H_
+
+#include "LocalTransactionChecker.h"
+#include "ONSChannel.h"
+#include "ONSClientException.h"
+#include "OrderConsumer.h"
+#include "OrderProducer.h"
+#include "Producer.h"
+#include "PullConsumer.h"
+#include "PushConsumer.h"
+#include "TransactionProducer.h"
+
+namespace ons {
+class ONSCLIENT_API ONSFactoryProperty {
+ public:
+ ONSFactoryProperty();
+ virtual ~ONSFactoryProperty();
+ bool checkValidityOfFactoryProperties(const std::string& key,
+ const std::string& value) throw(ons::ONSClientException);
+ const char* getLogPath() const;
+ void setSendMsgTimeout(const int value);
+ void setSendMsgRetryTimes(const int value);
+ void setMaxMsgCacheSize(const int size);
+ void setOnsTraceSwitch(bool onswitch);
+ void setOnsChannel(ONSChannel onsChannel) throw(ons::ONSClientException);
+ void setFactoryProperty(const char* key, const char* value) throw(ons::ONSClientException);
+ void setFactoryProperties(std::map<std::string, std::string> factoryProperties);
+ std::map<std::string, std::string> getFactoryProperties() const;
+ const char* getProducerId() const;
+ const char* getConsumerId() const;
+ const char* getGroupId() const;
+ const char* getPublishTopics() const;
+ const char* getMessageModel() const;
+ const int getSendMsgTimeout() const;
+ const int getSendMsgRetryTimes() const;
+ const int getConsumeThreadNums() const;
+ const int getMaxMsgCacheSize() const;
+ const ONSChannel getOnsChannel() const;
+ const char* getChannel() const;
+ const char* getMessageContent() const;
+ const char* getNameSrvAddr() const;
+ const char* getNameSrvDomain() const;
+ const char* getAccessKey() const;
+ const char* getSecretKey() const;
+ const char* getConsumerInstanceName() const;
+ bool getOnsTraceSwitch() const;
+ const char* getInstanceId() const;
+
+ public:
+ static const char* LogPath;
+ static const char* ProducerId;
+ static const char* ConsumerId;
+ static const char* GroupId;
+ static const char* PublishTopics;
+ static const char* MsgContent;
+ static const char* ONSAddr;
+ static const char* AccessKey;
+ static const char* SecretKey;
+ static const char* MessageModel;
+ static const char* BROADCASTING;
+ static const char* CLUSTERING;
+ static const char* SendMsgTimeoutMillis;
+ static const char* NAMESRV_ADDR;
+ static const char* ConsumeThreadNums;
+ static const char* OnsChannel;
+ static const char* MaxMsgCacheSize;
+ static const char* OnsTraceSwitch;
+ static const char* SendMsgRetryTimes;
+ static const char* ConsumerInstanceName;
+ static const char* InstanceId;
+
+ private:
+ std::map<std::string, std::string> m_onsFactoryProperties;
+};
+
+class ONSCLIENT_API ONSFactoryAPI {
+ public:
+ ONSFactoryAPI();
+ virtual ~ONSFactoryAPI();
+
+ virtual ons::Producer* createProducer(ons::ONSFactoryProperty factoryProperty) throw(ons::ONSClientException);
+ virtual ons::OrderProducer* createOrderProducer(ons::ONSFactoryProperty factoryProperty) throw(
+ ons::ONSClientException);
+ virtual ons::OrderConsumer* createOrderConsumer(ons::ONSFactoryProperty factoryProperty) throw(
+ ons::ONSClientException);
+ virtual ons::TransactionProducer* createTransactionProducer(
+ ons::ONSFactoryProperty factoryProperty, ons::LocalTransactionChecker* checker) throw(ons::ONSClientException);
+ virtual ons::PullConsumer* createPullConsumer(ons::ONSFactoryProperty factoryProperty) throw(ons::ONSClientException);
+ virtual ons::PushConsumer* createPushConsumer(ons::ONSFactoryProperty factoryProperty) throw(ons::ONSClientException);
+};
+
+class ONSCLIENT_API ONSFactory {
+ public:
+ virtual ~ONSFactory();
+ static ons::ONSFactoryAPI* getInstance();
+
+ private:
+ ONSFactory();
+ static ons::ONSFactoryAPI* onsFactoryInstance;
+};
+} // namespace ons
+#endif
diff --git a/src/main/cpp/include/OrderAction.h b/src/main/cpp/include/OrderAction.h
new file mode 100644
index 0000000..297d285
--- /dev/null
+++ b/src/main/cpp/include/OrderAction.h
@@ -0,0 +1,11 @@
+#ifndef __ORDERACTION_H__
+#define __ORDERACTION_H__
+
+// order consuming result
+enum OrderAction {
+ // consume success, application could continue to consume next message
+ Success,
+ // consume fail, suspends the current queue
+ Suspend,
+};
+#endif
diff --git a/src/main/cpp/include/OrderConsumer.h b/src/main/cpp/include/OrderConsumer.h
new file mode 100644
index 0000000..cce53b6
--- /dev/null
+++ b/src/main/cpp/include/OrderConsumer.h
@@ -0,0 +1,19 @@
+#ifndef __ORDERCONSUMER_H__
+#define __ORDERCONSUMER_H__
+
+#include "MessageOrderListener.h"
+
+namespace ons {
+
+class ONSCLIENT_API OrderConsumer {
+ public:
+ OrderConsumer() {}
+ virtual ~OrderConsumer() {}
+
+ virtual void start() = 0;
+ virtual void shutdown() = 0;
+ virtual void subscribe(const char* topic, const char* subExpression,
+ MessageOrderListener* listener) = 0;
+};
+}
+#endif
diff --git a/src/main/cpp/include/OrderProducer.h b/src/main/cpp/include/OrderProducer.h
new file mode 100644
index 0000000..8924996
--- /dev/null
+++ b/src/main/cpp/include/OrderProducer.h
@@ -0,0 +1,23 @@
+#ifndef __ORDERLYPRODUCER_H__
+#define __ORDERLYPRODUCER_H__
+
+#include "Message.h"
+#include "SendResultONS.h"
+
+namespace ons {
+
+class ONSCLIENT_API OrderProducer {
+ public:
+ OrderProducer() {}
+ virtual ~OrderProducer() {}
+
+ // before send msg, start must be called to allocate resources.
+ virtual void start() = 0;
+ // before exit ons, shutdown must be called to release all resources allocated
+ // by ons internally.
+ virtual void shutdown() = 0;
+
+ virtual SendResultONS send(Message& msg, std::string shardingKey) = 0;
+};
+}
+#endif
diff --git a/src/main/cpp/include/Producer.h b/src/main/cpp/include/Producer.h
new file mode 100644
index 0000000..2daf759
--- /dev/null
+++ b/src/main/cpp/include/Producer.h
@@ -0,0 +1,35 @@
+#ifndef __PRODUCER_H__
+#define __PRODUCER_H__
+
+#include "Message.h"
+#include "SendResultONS.h"
+#include "ONSClientException.h"
+#include "MessageQueueONS.h"
+#include"ONSCallback.h"
+
+namespace ons {
+
+class ONSCLIENT_API Producer {
+ public:
+ Producer() {}
+ virtual ~Producer() {}
+
+ // before send msg, start must be called to allocate resources.
+ virtual void start() = 0;
+ // before exit ons, shutdown must be called to release all resources allocated
+ // by ons internally.
+ virtual void shutdown() = 0;
+ // retry max 3 times if send failed. if no exception throwed, it sends
+ // success;
+ virtual ons::SendResultONS send(Message& msg) throw(ons::ONSClientException) = 0;
+ virtual ons::SendResultONS send(Message& msg,
+ const MessageQueueONS& mq) throw(ons::ONSClientException) = 0;
+
+ // async send
+ virtual void sendAsync(Message& msg, ons::SendCallbackONS* callback) throw(ons::ONSClientException) = 0;
+
+ // oneway send
+ virtual void sendOneway(Message& msg) throw(ons::ONSClientException) = 0;
+};
+}
+#endif
diff --git a/src/main/cpp/include/PullConsumer.h b/src/main/cpp/include/PullConsumer.h
new file mode 100755
index 0000000..5f94e5a
--- /dev/null
+++ b/src/main/cpp/include/PullConsumer.h
@@ -0,0 +1,38 @@
+#ifndef __PULLCONSUMER_H__
+#define __PULLCONSUMER_H__
+
+#include <string>
+#include <vector>
+#include "MessageQueueONS.h"
+#include "PullResultONS.h"
+
+namespace ons {
+
+class ONSFactoryProperty;
+
+class ONSCLIENT_API PullConsumer {
+ public:
+ PullConsumer() {}
+ virtual ~PullConsumer() {}
+
+ virtual void start() = 0;
+ virtual void shutdown() = 0;
+ virtual void fetchSubscribeMessageQueues(
+ const std::string& topic, std::vector<MessageQueueONS>& mqs) = 0;
+ virtual PullResultONS pull(const MessageQueueONS& mq,
+ const std::string& subExpression, long long offset,
+ int maxNums) = 0;
+ virtual long long searchOffset(const MessageQueueONS& mq, long long timestamp) = 0;
+ virtual long long maxOffset(const MessageQueueONS& mq) = 0;
+ virtual long long minOffset(const MessageQueueONS& mq) = 0;
+ virtual void updateConsumeOffset(const MessageQueueONS& mq,
+ long long offset) = 0;
+ virtual void removeConsumeOffset(const MessageQueueONS& mq) = 0;
+ virtual long long fetchConsumeOffset(const MessageQueueONS& mq,
+ bool fromStore) = 0;
+ virtual void persistConsumerOffset4PullConsumer(const MessageQueueONS& mq)
+ throw(ons::ONSClientException) = 0;
+};
+
+}
+#endif
diff --git a/src/main/cpp/include/PullResultONS.h b/src/main/cpp/include/PullResultONS.h
new file mode 100644
index 0000000..2d939ea
--- /dev/null
+++ b/src/main/cpp/include/PullResultONS.h
@@ -0,0 +1,40 @@
+#ifndef __PULLRESULTONS_H__
+#define __PULLRESULTONS_H__
+
+#include <vector>
+#include "Message.h"
+#include "ONSClient.h"
+
+namespace ons {
+
+enum ONSPullStatus {
+ ONS_FOUND,
+ ONS_NO_NEW_MSG,
+ ONS_NO_MATCHED_MSG,
+ ONS_OFFSET_ILLEGAL,
+ ONS_BROKER_TIMEOUT // indicate pull request timeout or received NULL response
+};
+
+class ONSCLIENT_API PullResultONS {
+ public:
+ PullResultONS(ONSPullStatus status)
+ : pullStatus(status), nextBeginOffset(0), minOffset(0), maxOffset(0) {}
+
+ PullResultONS(ONSPullStatus pullStatus, long long nextBeginOffset,
+ long long minOffset, long long maxOffset)
+ : pullStatus(pullStatus),
+ nextBeginOffset(nextBeginOffset),
+ minOffset(minOffset),
+ maxOffset(maxOffset) {}
+
+ virtual ~PullResultONS() {}
+
+ public:
+ ONSPullStatus pullStatus;
+ long long nextBeginOffset;
+ long long minOffset;
+ long long maxOffset;
+ std::vector<Message> msgFoundList;
+};
+}
+#endif
diff --git a/src/main/cpp/include/PushConsumer.h b/src/main/cpp/include/PushConsumer.h
new file mode 100644
index 0000000..925858c
--- /dev/null
+++ b/src/main/cpp/include/PushConsumer.h
@@ -0,0 +1,20 @@
+#ifndef __PUSHCONSUMER_H__
+#define __PUSHCONSUMER_H__
+
+#include "MessageListener.h"
+
+namespace ons {
+
+class ONSCLIENT_API PushConsumer {
+ public:
+ PushConsumer() {}
+ virtual ~PushConsumer() {}
+
+ virtual void start() = 0;
+ virtual void shutdown() = 0;
+ virtual void subscribe(const char* topic, const char* subExpression,
+ MessageListener* listener) = 0;
+ // virtual void setNamesrvAddr(const std::string& nameSrvAddr) = 0;
+};
+}
+#endif
diff --git a/src/main/cpp/include/SendResultONS.h b/src/main/cpp/include/SendResultONS.h
new file mode 100644
index 0000000..d21ec12
--- /dev/null
+++ b/src/main/cpp/include/SendResultONS.h
@@ -0,0 +1,19 @@
+#ifndef __SENDRESULTONS_H__
+#define __SENDRESULTONS_H__
+#include <string>
+#include "ONSClient.h"
+
+namespace ons {
+
+class ONSCLIENT_API SendResultONS {
+ public:
+ SendResultONS();
+ virtual ~SendResultONS();
+ void setMessageId(const std::string& msgId);
+ const char* getMessageId() const;
+
+ private:
+ std::string messageId;
+};
+}
+#endif
diff --git a/src/main/cpp/include/TransactionProducer.h b/src/main/cpp/include/TransactionProducer.h
new file mode 100644
index 0000000..bceadb0
--- /dev/null
+++ b/src/main/cpp/include/TransactionProducer.h
@@ -0,0 +1,29 @@
+#ifndef __TRANSACTIONPRODUCER_H__
+#define __TRANSACTIONPRODUCER_H__
+
+#include "LocalTransactionExecuter.h"
+#include "ONSClient.h"
+#include "SendResultONS.h"
+
+namespace ons {
+ class ONSCLIENT_API TransactionProducer {
+ public:
+ TransactionProducer() {}
+
+ virtual ~TransactionProducer() {}
+
+ // before send msg, start must be called to allocate resources.
+ virtual void start() = 0;
+
+ // before exit ons, shutdown must be called to release all resources allocated
+ // by ons internally.
+ virtual void shutdown() = 0;
+
+ // retry max 3 times if send failed. if no exception throwed, it sends
+ // success;
+ virtual SendResultONS send(Message &msg,
+ LocalTransactionExecuter *executer) = 0;
+ };
+}
+
+#endif
diff --git a/src/main/cpp/include/TransactionStatus.h b/src/main/cpp/include/TransactionStatus.h
new file mode 100644
index 0000000..c47cd99
--- /dev/null
+++ b/src/main/cpp/include/TransactionStatus.h
@@ -0,0 +1,13 @@
+#ifndef __TRANSACTIONSTATUS_H__
+#define __TRANSACTIONSTATUS_H__
+
+namespace ons {
+
+enum TransactionStatus {
+ CommitTransaction = 0,
+ RollbackTransaction = 1,
+ Unknow = 2,
+};
+}
+
+#endif
diff --git a/src/test/cpp/MessageTest.cpp b/src/test/cpp/MessageTest.cpp
new file mode 100644
index 0000000..8b54067
--- /dev/null
+++ b/src/test/cpp/MessageTest.cpp
@@ -0,0 +1,237 @@
+#include <iostream>
+#include <chrono>
+#include <gtest/gtest.h>
+#include <memory>
+
+#include "Message.h"
+#include "ONSFactory.h"
+
+class MessageTest : public testing::Test {
+protected:
+ void SetUp() override {
+ ubody_ = new unsigned char[10];
+ memset(ubody_, 0, 10);
+ strcpy(reinterpret_cast<char *>(ubody_), "RocketMQ");
+ sbody = std::string("RocketMQ");
+ }
+
+ void TearDown() override {
+ delete ubody_;
+ }
+
+ const char *topic_ = "Topic";
+ const char *tag_ = "Tag";
+ const char *key_ = "Key";
+ const char *body_ = "RocketMQ";
+ const char *user_key_ = "UserKey";
+ const char *user_value_ = "UserValue";
+ const char *sys_key_ = "SysKey";
+ const char *sys_value_ = "SysValue";
+ unsigned char *ubody_;
+ int bodylen = strlen("RocketMQ");
+ std::string sbody;
+};
+
+TEST_F(MessageTest, testMessage_TopicBeingNULL) {
+ EXPECT_THROW(
+ ons::Message msg(NULL, tag_, key_, body_);,
+ ons::ONSClientException);
+}
+
+TEST_F(MessageTest, testMessageWithoutKey_TopicBeingNULL) {
+ EXPECT_THROW(
+ ons::Message msg(NULL, tag_, body_);,
+ ons::ONSClientException);
+}
+
+TEST_F(MessageTest, testMessage_TopicLenBeingZero) {
+ EXPECT_THROW(
+ ons::Message msg(topic_, 0, tag_, strlen(tag_), body_, strlen(body_));,
+ ons::ONSClientException);
+}
+
+TEST_F(MessageTest, testMessage_BodyBeingNULL) {
+ EXPECT_THROW(
+ ons::Message msg(topic_, tag_, key_, NULL);,
+ ons::ONSClientException);
+}
+
+TEST_F(MessageTest, testMessageWithoutKey_BodyBeingNULL) {
+ EXPECT_THROW(
+ ons::Message msg(topic_, tag_, NULL);,
+ ons::ONSClientException);
+}
+
+TEST_F(MessageTest, testMessage_BodyLenBeingZero) {
+ EXPECT_THROW(
+ ons::Message msg(topic_, strlen(topic_), tag_, strlen(tag_), body_, 0);,
+ ons::ONSClientException);
+}
+
+TEST_F(MessageTest, testMessageSysPerperty_KeyBeingNULL) {
+ ons::Message msg(topic_, tag_, key_, body_);
+ EXPECT_THROW(
+ msg.putSystemProperties(NULL, sys_value_);,
+ ons::ONSClientException);
+}
+
+TEST_F(MessageTest, testMessageSysPerperty_ValueBeingNULL) {
+ ons::Message msg(topic_, tag_, key_, body_);
+ EXPECT_THROW(
+ msg.putSystemProperties(sys_key_, NULL);,
+ ons::ONSClientException);
+}
+
+TEST_F(MessageTest, testMessageUserPerperty_KeyBeingNULL) {
+ ons::Message msg(topic_, tag_, key_, body_);
+ EXPECT_THROW(
+ msg.putSystemProperties(NULL, user_value_);,
+ ons::ONSClientException);
+}
+
+TEST_F(MessageTest, testMessageUserPerperty_ValueBeingNULL) {
+ ons::Message msg(topic_, tag_, key_, body_);
+ EXPECT_THROW(
+ msg.putSystemProperties(user_key_, NULL);,
+ ons::ONSClientException);
+}
+
+TEST_F(MessageTest, testMessageGetTopic_ValueBeingNormal) {
+ ons::Message msg(topic_, tag_, key_, body_);
+ ASSERT_STREQ(topic_, msg.getTopic());
+}
+
+TEST_F(MessageTest, testMessageetTopic_ValueBeingNormal) {
+ ons::Message msg("", tag_, key_, body_);
+ msg.setTopic(topic_);
+ ASSERT_STREQ(topic_, msg.getTopic());
+}
+
+TEST_F(MessageTest, testMessageGetTopic_ValueBeingNull) {
+ ons::Message msg("", tag_, key_, body_);
+ ASSERT_STREQ("", msg.getTopic());
+}
+
+TEST_F(MessageTest, testMessageSetTopic_ValueBeingNull) {
+ ons::Message msg("", tag_, key_, body_);
+ msg.setTopic(NULL);
+ ASSERT_STREQ("", msg.getTopic());
+}
+
+TEST_F(MessageTest, testMessageGetTag_ValueBeingNormal) {
+ ons::Message msg(topic_, tag_, key_, body_);
+ ASSERT_STREQ(tag_, msg.getTag());
+}
+
+TEST_F(MessageTest, testMessageSetTag_ValueBeingNormal) {
+ ons::Message msg(topic_, "", key_, body_);
+ msg.setTag(tag_);
+ ASSERT_STREQ(tag_, msg.getTag());
+}
+
+TEST_F(MessageTest, testMessageGetTag_ValueBeingNull) {
+ ons::Message msg(topic_, "", key_, body_);
+ ASSERT_STREQ("", msg.getTag());
+}
+
+TEST_F(MessageTest, testMessageSetTag_ValueBeingNull) {
+ ons::Message msg(topic_, "", key_, body_);
+ msg.setTag(NULL);
+ ASSERT_STREQ("", msg.getTag());
+}
+
+TEST_F(MessageTest, testMessageGetBody_ValueBeingNormal) {
+ ons::Message msg(topic_, tag_, key_, body_);
+ ASSERT_STREQ(body_, msg.getBody());
+}
+
+TEST_F(MessageTest, testMessageSetBody_ValueBeingNormal) {
+ ons::Message msg(topic_, tag_, key_, "");
+ msg.setBody(ubody_, bodylen);
+ ASSERT_STREQ(body_, msg.getBody());
+}
+
+TEST_F(MessageTest, testMessageGetByteBody_ValueBeingNormal) {
+ int len = 0;
+ ons::Message msg(topic_, tag_, key_, body_);
+ const char *strBody = msg.getByteBody(&len);
+ ASSERT_STREQ(body_, strBody);
+ ASSERT_EQ(strlen(body_), len);
+}
+
+TEST_F(MessageTest, testMessageSetBodyWithLen_ValueBeingNormal) {
+ int len = 0;
+ ons::Message msg(topic_, tag_, key_, "");
+ msg.setBody(ubody_, bodylen);
+ const char *strBody = msg.getByteBody(&len);
+ ASSERT_STREQ(body_, strBody);
+ ASSERT_EQ(bodylen, len);
+}
+
+TEST_F(MessageTest, testMessageGetBody_ValueBeingNull) {
+ ons::Message msg(topic_, tag_, key_, "");
+ ASSERT_STREQ("", msg.getBody());
+}
+
+TEST_F(MessageTest, testMessageSetBody_ValueBeingNull) {
+ ons::Message msg(topic_, tag_, key_, "");
+ msg.setBody(NULL, strlen(body_));
+ ASSERT_STREQ("", msg.getBody());
+}
+
+TEST_F(MessageTest, testMessageGetMsgBody_ValueBeingString) {
+ ons::Message msg(topic_, tag_, key_, body_);
+ ASSERT_STREQ(sbody.c_str(), msg.getMsgBody().c_str());
+}
+
+TEST_F(MessageTest, testMessageGetMsgBodyLen_ValueBeingString) {
+ ons::Message msg(topic_, tag_, key_, body_);
+ ASSERT_EQ(sbody.length(), msg.getBodySize());
+}
+
+TEST_F(MessageTest, testMessageSetMsgBody_ValueBeingString) {
+ ons::Message msg(topic_, tag_, key_, "");
+ msg.setMsgBody(sbody);
+ ASSERT_STREQ(sbody.c_str(), msg.getMsgBody().c_str());
+}
+
+TEST_F(MessageTest, testMessageGetUserPerperty_KeyBeingNull) {
+ ons::Message msg(topic_, tag_, key_, "");
+ ASSERT_TRUE(NULL == msg.getUserProperties(NULL));
+}
+
+TEST_F(MessageTest, testMessageGetUserPerperty_KeyNotExsit) {
+ ons::Message msg(topic_, tag_, key_, "");
+ ASSERT_STREQ("", msg.getUserProperties("NONEEXISTKEY"));
+}
+
+TEST_F(MessageTest, testMessageGetSysPerperty_KeyBeingNull) {
+ ons::Message msg(topic_, tag_, key_, "");
+ ASSERT_TRUE(NULL == msg.getSystemProperties(NULL));
+}
+
+TEST_F(MessageTest, testMessageGetSysPerperty_KeyNotExsit) {
+ ons::Message msg(topic_, tag_, key_, "");
+ ASSERT_STREQ("", msg.getUserProperties("NONEEXISTKEY"));
+}
+
+TEST_F(MessageTest, testMessageSetDeliverTime_ValueBeingNormal) {
+ ons::Message msg(topic_, tag_, key_, body_);
+ long long now = 123456789L;
+ msg.setStartDeliverTime(now);
+ ASSERT_EQ(now, msg.getStartDeliverTime());
+}
+
+TEST_F(MessageTest, testMessageSetDeliverTime_ValueBeingZero) {
+ ons::Message msg(topic_, tag_, key_, body_);
+ long long now = 0;
+ msg.setStartDeliverTime(now);
+ ASSERT_EQ(now, msg.getStartDeliverTime());
+}
+
+TEST_F(MessageTest, testMessageSetDeliverTime_ValueBeingNagetive) {
+ ons::Message msg(topic_, tag_, key_, body_);
+ long long now = -10000;
+ msg.setStartDeliverTime(now);
+ ASSERT_EQ(now, msg.getStartDeliverTime());
+}