You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2021/10/25 07:57:15 UTC

[rocketmq-client-cpp] branch main updated: Implement trace for fifo consumption (#381)

This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new fe52d1d  Implement trace for fifo consumption (#381)
fe52d1d is described below

commit fe52d1d93288db3bfd986d5e814f8a96df974cef
Author: aaron ai <ya...@gmail.com>
AuthorDate: Mon Oct 25 15:57:12 2021 +0800

    Implement trace for fifo consumption (#381)
---
 api/rocketmq/MQMessage.h                           |   9 +-
 example/rocketmq/BUILD.bazel                       |  20 ++++
 example/rocketmq/ExampleFifoProducer.cpp           | 108 +++++++++++++++++++++
 example/rocketmq/ExampleFifoPushConsumer.cpp       |  67 +++++++++++++
 src/main/cpp/base/MQMessage.cpp                    |   9 ++
 src/main/cpp/base/include/Protocol.h               |   1 +
 .../cpp/rocketmq/ConsumeFifoMessageService.cpp     |  70 +++++++++++++
 src/main/cpp/rocketmq/ProducerImpl.cpp             |   7 +-
 8 files changed, 281 insertions(+), 10 deletions(-)

diff --git a/api/rocketmq/MQMessage.h b/api/rocketmq/MQMessage.h
index a4e6429..e5d27fa 100644
--- a/api/rocketmq/MQMessage.h
+++ b/api/rocketmq/MQMessage.h
@@ -91,17 +91,13 @@ public:
   void messageType(MessageType message_type);
   MessageType messageType() const;
 
-  void bindMessageGroup(absl::string_view message_group) {
-    message_group_ = std::string(message_group.data(), message_group.length());
-  }
+  void bindMessageGroup(absl::string_view message_group);
 
   void bindMessageQueue(const MQMessageQueue& message_queue) {
     message_queue_ = message_queue;
   }
 
-  const std::string& messageGroup() const {
-    return message_group_;
-  }
+  const std::string& messageGroup() const;
 
   const MQMessageQueue& messageQueue() const {
     return message_queue_;
@@ -113,7 +109,6 @@ protected:
   friend class MessageAccessor;
 
 private:
-  std::string message_group_;
   MQMessageQueue message_queue_;
 };
 
diff --git a/example/rocketmq/BUILD.bazel b/example/rocketmq/BUILD.bazel
index a2b2dd3..9c7508a 100644
--- a/example/rocketmq/BUILD.bazel
+++ b/example/rocketmq/BUILD.bazel
@@ -27,6 +27,16 @@ cc_binary(
 )
 
 cc_binary(
+    name = "example_fifo_producer",
+    srcs = [
+        "ExampleFifoProducer.cpp",
+    ],
+    deps = [
+        "//src/main/cpp/rocketmq:rocketmq_library",
+    ],
+)
+
+cc_binary(
     name = "example_async_producer",
     srcs = [
         "ExampleAsyncProducer.cpp",
@@ -57,6 +67,16 @@ cc_binary(
 )
 
 cc_binary(
+    name = "example_fifo_push_consumer",
+    srcs = [
+        "ExampleFifoPushConsumer.cpp",
+    ],
+    deps = [
+        "//src/main/cpp/rocketmq:rocketmq_library",
+    ],
+)
+
+cc_binary(
     name = "push_consumer_with_custom_executor",
     srcs = [
         "PushConsumerWithCustomExecutor.cpp",
diff --git a/example/rocketmq/ExampleFifoProducer.cpp b/example/rocketmq/ExampleFifoProducer.cpp
new file mode 100644
index 0000000..c4d9e23
--- /dev/null
+++ b/example/rocketmq/ExampleFifoProducer.cpp
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "rocketmq/DefaultMQProducer.h"
+#include <algorithm>
+#include <atomic>
+#include <iostream>
+#include <random>
+
+using namespace rocketmq;
+
+const std::string& alphaNumeric() {
+  static std::string alpha_numeric("0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ");
+  return alpha_numeric;
+}
+
+std::string randomString(std::string::size_type len) {
+  std::string result;
+  result.reserve(len);
+  std::random_device rd;
+  std::mt19937 generator(rd());
+  std::string source(alphaNumeric());
+  std::string::size_type generated = 0;
+  while (generated < len) {
+    std::shuffle(source.begin(), source.end(), generator);
+    std::string::size_type delta = std::min({len - generated, source.length()});
+    result.append(source.substr(0, delta));
+    generated += delta;
+  }
+  return result;
+}
+
+int main(int argc, char* argv[]) {
+  Logger& logger = getLogger();
+  logger.setLevel(Level::Debug);
+  logger.init();
+
+  DefaultMQProducer producer("TestGroup");
+
+  const char* topic = "lingchu_test_order_topic";
+  const char* name_server = "120.25.100.131:8081";
+
+  producer.setNamesrvAddr(name_server);
+  producer.compressBodyThreshold(256);
+  const char* resource_namespace = "MQ_INST_1080056302921134_BXyTLppt";
+  // producer.setRegion("cn-hangzhou-pre");
+  producer.setResourceNamespace(resource_namespace);
+  producer.setCredentialsProvider(std::make_shared<ConfigFileCredentialsProvider>());
+
+  std::atomic_bool stopped;
+  std::atomic_long count(0);
+
+  auto stats_lambda = [&] {
+    while (!stopped.load(std::memory_order_relaxed)) {
+      long cnt = count.load(std::memory_order_relaxed);
+      while (count.compare_exchange_weak(cnt, 0)) {
+        break;
+      }
+      std::this_thread::sleep_for(std::chrono::seconds(1));
+      std::cout << "QPS: " << cnt << std::endl;
+    }
+  };
+
+  std::thread stats_thread(stats_lambda);
+
+  std::string body = randomString(1024 * 4);
+  std::cout << "Message body size: " << body.length() << std::endl;
+
+  try {
+    producer.start();
+    for (int i = 0; i < 16; ++i) {
+      std::string sharding_key = "sharding-key-" + std::to_string(i);
+      MQMessage message;
+      message.setTopic(topic);
+      message.setTags("TagA");
+      message.setKey("Yuck! Why-plural?");
+      message.setBody(body);
+
+      SendResult sendResult = producer.send(message, sharding_key);
+      std::cout << sendResult.getMessageQueue().simpleName() << ": " << sendResult.getMsgId() << std::endl;
+      count++;
+      std::this_thread::sleep_for(std::chrono::seconds(1));
+    }
+  } catch (...) {
+    std::cerr << "Ah...No!!!" << std::endl;
+  }
+
+  stopped.store(true, std::memory_order_relaxed);
+  if (stats_thread.joinable()) {
+    stats_thread.join();
+  }
+
+  producer.shutdown();
+  return EXIT_SUCCESS;
+}
\ No newline at end of file
diff --git a/example/rocketmq/ExampleFifoPushConsumer.cpp b/example/rocketmq/ExampleFifoPushConsumer.cpp
new file mode 100644
index 0000000..df039d7
--- /dev/null
+++ b/example/rocketmq/ExampleFifoPushConsumer.cpp
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <chrono>
+#include <iostream>
+#include <mutex>
+#include <thread>
+
+#include "rocketmq/Logger.h"
+
+#include "rocketmq/MessageListener.h"
+#include "spdlog/spdlog.h"
+
+#include "rocketmq/DefaultMQPushConsumer.h"
+
+using namespace rocketmq;
+
+class SampleMQMessageListener : public FifoMessageListener {
+public:
+  ConsumeMessageResult consumeMessage(const MQMessageExt& message) override {
+    SPDLOG_INFO("Consume message[Topic={}, MessageId={}] OK", message.getTopic(), message.getMsgId());
+    std::cout << "Consume Message[MsgId=" << message.getMsgId() << "] OK. Body Size: " << message.getBody().size()
+              << std::endl;
+    // std::this_thread::sleep_for(std::chrono::seconds(1));
+    return ConsumeMessageResult::SUCCESS;
+  }
+};
+
+int main(int argc, char* argv[]) {
+
+  Logger& logger = getLogger();
+  logger.setLevel(Level::Debug);
+  logger.init();
+
+  const char* group_id = "GID_lingchu_test_order";
+  const char* topic = "lingchu_test_order_topic";
+  const char* resource_namespace = "MQ_INST_1080056302921134_BXyTLppt";
+
+  DefaultMQPushConsumer push_consumer(group_id);
+  push_consumer.setResourceNamespace(resource_namespace);
+  push_consumer.setCredentialsProvider(std::make_shared<ConfigFileCredentialsProvider>());
+  push_consumer.setNamesrvAddr("120.25.100.131:8081");
+  FifoMessageListener* listener = new SampleMQMessageListener();
+  push_consumer.setInstanceName("instance_0");
+  push_consumer.subscribe(topic, "*");
+  push_consumer.registerMessageListener(listener);
+  push_consumer.setConsumeThreadCount(4);
+  push_consumer.start();
+
+  std::this_thread::sleep_for(std::chrono::minutes(30));
+
+  push_consumer.shutdown();
+  return EXIT_SUCCESS;
+}
diff --git a/src/main/cpp/base/MQMessage.cpp b/src/main/cpp/base/MQMessage.cpp
index c5d096e..2015a61 100644
--- a/src/main/cpp/base/MQMessage.cpp
+++ b/src/main/cpp/base/MQMessage.cpp
@@ -178,4 +178,13 @@ MessageType MQMessage::messageType() const {
   return impl_->system_attribute_.message_type;
 }
 
+void MQMessage::bindMessageGroup(absl::string_view message_group) {
+  impl_->system_attribute_.message_group.append(message_group.data(), message_group.length());
+  messageType(MessageType::FIFO);
+}
+
+const std::string& MQMessage::messageGroup() const {
+  return impl_->system_attribute_.message_group;
+}
+
 ROCKETMQ_NAMESPACE_END
diff --git a/src/main/cpp/base/include/Protocol.h b/src/main/cpp/base/include/Protocol.h
index df51ce1..7d52fd6 100644
--- a/src/main/cpp/base/include/Protocol.h
+++ b/src/main/cpp/base/include/Protocol.h
@@ -74,6 +74,7 @@ struct SystemAttribute {
   Resource publisher_group;
   std::string trace_context;
   std::string target_endpoint;
+  std::string message_group;
 };
 
 ROCKETMQ_NAMESPACE_END
diff --git a/src/main/cpp/rocketmq/ConsumeFifoMessageService.cpp b/src/main/cpp/rocketmq/ConsumeFifoMessageService.cpp
index 99d9b16..10519e7 100644
--- a/src/main/cpp/rocketmq/ConsumeFifoMessageService.cpp
+++ b/src/main/cpp/rocketmq/ConsumeFifoMessageService.cpp
@@ -19,10 +19,17 @@
 #include <limits>
 #include <system_error>
 
+#include "TracingUtility.h"
+#include "absl/strings/str_join.h"
+
+#include "opencensus/trace/propagation/trace_context.h"
+#include "opencensus/trace/span.h"
+
 #include "ConsumeFifoMessageService.h"
 #include "MessageAccessor.h"
 #include "ProcessQueue.h"
 #include "PushConsumerImpl.h"
+#include "rocketmq/MessageListener.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 
@@ -120,6 +127,59 @@ void ConsumeFifoMessageService::consumeTask(const ProcessQueueWeakPtr& process_q
     SPDLOG_DEBUG("Rate-limit permit acquired");
   }
 
+  // Record await-consumption-span
+  {
+    auto span_context = opencensus::trace::propagation::FromTraceParentHeader(message.traceContext());
+
+    auto span = opencensus::trace::Span::BlankSpan();
+    std::string span_name = consumer->resourceNamespace() + "/" + message.getTopic() + " " +
+                            MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_AWAIT_OPERATION;
+    if (span_context.IsValid()) {
+      span = opencensus::trace::Span::StartSpanWithRemoteParent(span_name, span_context, &Samplers::always());
+    } else {
+      span = opencensus::trace::Span::StartSpan(span_name, nullptr, {&Samplers::always()});
+    }
+    span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_OPERATION,
+                      MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_AWAIT_OPERATION);
+    span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_OPERATION,
+                      MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_AWAIT_OPERATION);
+    TracingUtility::addUniversalSpanAttributes(message, *consumer, span);
+    const auto& keys = message.getKeys();
+    span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_KEYS,
+                      absl::StrJoin(keys.begin(), keys.end(), MixAll::MESSAGE_KEY_SEPARATOR));
+    span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_AVAILABLE_TIMESTAMP, message.getStoreTimestamp());
+    absl::Time decoded_timestamp = MessageAccessor::decodedTimestamp(message);
+    span.AddAnnotation(
+        MixAll::SPAN_ANNOTATION_AWAIT_CONSUMPTION,
+        {{MixAll::SPAN_ANNOTATION_ATTR_START_TIME,
+          opencensus::trace::AttributeValueRef(absl::ToInt64Milliseconds(decoded_timestamp - absl::UnixEpoch()))}});
+    span.End();
+    MessageAccessor::setTraceContext(const_cast<MQMessageExt&>(message),
+                                     opencensus::trace::propagation::ToTraceParentHeader(span.context()));
+  }
+
+  auto span_context = opencensus::trace::propagation::FromTraceParentHeader(message.traceContext());
+  auto span = opencensus::trace::Span::BlankSpan();
+  std::string span_name = consumer->resourceNamespace() + "/" + message.getTopic() + " " +
+                          MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_PROCESS_OPERATION;
+  if (span_context.IsValid()) {
+    span = opencensus::trace::Span::StartSpanWithRemoteParent(span_name, span_context);
+  } else {
+    span = opencensus::trace::Span::StartSpan(span_name);
+  }
+  span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_OPERATION,
+                    MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_PROCESS_OPERATION);
+  span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_OPERATION,
+                    MixAll::SPAN_ATTRIBUTE_VALUE_MESSAGING_PROCESS_OPERATION);
+  TracingUtility::addUniversalSpanAttributes(message, *consumer, span);
+  const auto& keys = message.getKeys();
+  span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_KEYS,
+                    absl::StrJoin(keys.begin(), keys.end(), MixAll::MESSAGE_KEY_SEPARATOR));
+  span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_ATTEMPT, message.getDeliveryAttempt());
+  span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_AVAILABLE_TIMESTAMP, message.getStoreTimestamp());
+  MessageAccessor::setTraceContext(const_cast<MQMessageExt&>(message),
+                                   opencensus::trace::propagation::ToTraceParentHeader(span.context()));
+
   auto steady_start = std::chrono::steady_clock::now();
 
   try {
@@ -132,6 +192,16 @@ void ConsumeFifoMessageService::consumeTask(const ProcessQueueWeakPtr& process_q
     SPDLOG_ERROR("Business FIFO callback raised an exception when consumeMessage");
   }
 
+  switch (result) {
+    case ConsumeMessageResult::SUCCESS:
+      span.SetStatus(opencensus::trace::StatusCode::OK);
+      break;
+    case ConsumeMessageResult::FAILURE:
+      span.SetStatus(opencensus::trace::StatusCode::UNKNOWN);
+      break;
+  }
+  span.End();
+
   auto duration = std::chrono::steady_clock::now() - steady_start;
 
   // Log client consume-time costs
diff --git a/src/main/cpp/rocketmq/ProducerImpl.cpp b/src/main/cpp/rocketmq/ProducerImpl.cpp
index 4780220..d22f6dc 100644
--- a/src/main/cpp/rocketmq/ProducerImpl.cpp
+++ b/src/main/cpp/rocketmq/ProducerImpl.cpp
@@ -180,13 +180,14 @@ void ProducerImpl::wrapSendMessageRequest(const MQMessage& message, SendMessageR
     system_attribute->set_body_encoding(rmq::Encoding::IDENTITY);
   }
 
+  system_attribute->set_message_group(message.messageGroup());
+  system_attribute->set_message_id(message.getMsgId());
+  system_attribute->set_partition_id(message_queue.getQueueId());
+
   // Forward user-defined-properties
   for (auto& item : message.getProperties()) {
     request.mutable_message()->mutable_user_attribute()->insert({item.first, item.second});
   }
-
-  system_attribute->set_message_id(message.getMsgId());
-  system_attribute->set_partition_id(message_queue.getQueueId());
   SPDLOG_TRACE("SendMessageRequest: {}", request.DebugString());
 }