You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2021/10/19 03:52:57 UTC
[rocketmq-client-cpp] branch main updated: BugFix: forward system
attributes
This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 46444c2 BugFix: forward system attributes
46444c2 is described below
commit 46444c2d8056ff9117a98ff8e792008af243494b
Author: Li Zhanhui <li...@gmail.com>
AuthorDate: Tue Oct 19 03:52:47 2021 +0000
BugFix: forward system attributes
---
src/main/cpp/rocketmq/ProducerImpl.cpp | 32 ++++++++++++++++++++++++++++++++
1 file changed, 32 insertions(+)
diff --git a/src/main/cpp/rocketmq/ProducerImpl.cpp b/src/main/cpp/rocketmq/ProducerImpl.cpp
index e14399f..4145e0a 100644
--- a/src/main/cpp/rocketmq/ProducerImpl.cpp
+++ b/src/main/cpp/rocketmq/ProducerImpl.cpp
@@ -17,6 +17,7 @@
#include "ProducerImpl.h"
#include <atomic>
+#include <chrono>
#include <limits>
#include <system_error>
#include <utility>
@@ -112,6 +113,36 @@ std::string ProducerImpl::wrapSendMessageRequest(const MQMessage& message, SendM
auto system_attribute = request.mutable_message()->mutable_system_attribute();
+ // Handle Tag
+ auto&& tag = message.getTags();
+ if (!tag.empty()) {
+ system_attribute->set_tag(tag);
+ }
+
+ // Handle Key
+ const auto& keys = message.getKeys();
+ if (!keys.empty()) {
+ system_attribute->mutable_keys()->Add(keys.begin(), keys.end());
+ }
+
+ // TraceContext
+ const auto& trace_context = message.traceContext();
+ if (!trace_context.empty()) {
+ system_attribute->set_trace_context(trace_context);
+ }
+
+ // Delivery Timestamp
+ auto delivery_timestamp = message.deliveryTimestamp();
+ if (delivery_timestamp.time_since_epoch().count()) {
+ auto duration = delivery_timestamp.time_since_epoch();
+ system_attribute->set_delivery_attempt(std::chrono::duration_cast<std::chrono::milliseconds>(duration).count());
+ }
+
+ // Delay Level
+ if (message.getDelayTimeLevel()) {
+ system_attribute->set_delay_level(message.getDelayTimeLevel());
+ }
+
// Born-time
auto duration = absl::Now() - absl::UnixEpoch();
int64_t seconds = absl::ToInt64Seconds(duration);
@@ -148,6 +179,7 @@ std::string ProducerImpl::wrapSendMessageRequest(const MQMessage& message, SendM
system_attribute->set_body_encoding(rmq::Encoding::IDENTITY);
}
+ // Forward user-defined-properties
for (auto& item : message.getProperties()) {
request.mutable_message()->mutable_user_attribute()->insert({item.first, item.second});
}