You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2021/10/19 03:52:57 UTC

[rocketmq-client-cpp] branch main updated: BugFix: forward system attributes

This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 46444c2  BugFix: forward system attributes
46444c2 is described below

commit 46444c2d8056ff9117a98ff8e792008af243494b
Author: Li Zhanhui <li...@gmail.com>
AuthorDate: Tue Oct 19 03:52:47 2021 +0000

    BugFix: forward system attributes
---
 src/main/cpp/rocketmq/ProducerImpl.cpp | 32 ++++++++++++++++++++++++++++++++
 1 file changed, 32 insertions(+)

diff --git a/src/main/cpp/rocketmq/ProducerImpl.cpp b/src/main/cpp/rocketmq/ProducerImpl.cpp
index e14399f..4145e0a 100644
--- a/src/main/cpp/rocketmq/ProducerImpl.cpp
+++ b/src/main/cpp/rocketmq/ProducerImpl.cpp
@@ -17,6 +17,7 @@
 #include "ProducerImpl.h"
 
 #include <atomic>
+#include <chrono>
 #include <limits>
 #include <system_error>
 #include <utility>
@@ -112,6 +113,36 @@ std::string ProducerImpl::wrapSendMessageRequest(const MQMessage& message, SendM
 
   auto system_attribute = request.mutable_message()->mutable_system_attribute();
 
+  // Handle Tag
+  auto&& tag = message.getTags();
+  if (!tag.empty()) {
+    system_attribute->set_tag(tag);
+  }
+
+  // Handle Key
+  const auto& keys = message.getKeys();
+  if (!keys.empty()) {
+    system_attribute->mutable_keys()->Add(keys.begin(), keys.end());
+  }
+
+  // TraceContext
+  const auto& trace_context = message.traceContext();
+  if (!trace_context.empty()) {
+    system_attribute->set_trace_context(trace_context);
+  }
+
+  // Delivery Timestamp
+  auto delivery_timestamp = message.deliveryTimestamp();
+  if (delivery_timestamp.time_since_epoch().count()) {
+    auto duration = delivery_timestamp.time_since_epoch();
+    system_attribute->set_delivery_attempt(std::chrono::duration_cast<std::chrono::milliseconds>(duration).count());
+  }
+
+  // Delay Level
+  if (message.getDelayTimeLevel()) {
+    system_attribute->set_delay_level(message.getDelayTimeLevel());
+  }
+
   // Born-time
   auto duration = absl::Now() - absl::UnixEpoch();
   int64_t seconds = absl::ToInt64Seconds(duration);
@@ -148,6 +179,7 @@ std::string ProducerImpl::wrapSendMessageRequest(const MQMessage& message, SendM
     system_attribute->set_body_encoding(rmq::Encoding::IDENTITY);
   }
 
+  // Forward user-defined-properties
   for (auto& item : message.getProperties()) {
     request.mutable_message()->mutable_user_attribute()->insert({item.first, item.second});
   }