You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2021/10/25 07:57:15 UTC
[rocketmq-client-cpp] branch main updated: Implement trace for fifo
consumption (#381)
This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new fe52d1d Implement trace for fifo consumption (#381)
fe52d1d is described below
commit fe52d1d93288db3bfd986d5e814f8a96df974cef
Author: aaron ai <ya...@gmail.com>
AuthorDate: Mon Oct 25 15:57:12 2021 +0800
Implement trace for fifo consumption (#381)
---
api/rocketmq/MQMessage.h | 9 +-
example/rocketmq/BUILD.bazel | 20 ++++
example/rocketmq/ExampleFifoProducer.cpp | 108 +++++++++++++++++++++
example/rocketmq/ExampleFifoPushConsumer.cpp | 67 +++++++++++++
src/main/cpp/base/MQMessage.cpp | 9 ++
src/main/cpp/base/include/Protocol.h | 1 +
.../cpp/rocketmq/ConsumeFifoMessageService.cpp | 70 +++++++++++++
src/main/cpp/rocketmq/ProducerImpl.cpp | 7 +-
8 files changed, 281 insertions(+), 10 deletions(-)
diff --git a/api/rocketmq/MQMessage.h b/api/rocketmq/MQMessage.h
index a4e6429..e5d27fa 100644
--- a/api/rocketmq/MQMessage.h
+++ b/api/rocketmq/MQMessage.h
@@ -91,17 +91,13 @@ public:
void messageType(MessageType message_type);
MessageType messageType() const;
- void bindMessageGroup(absl::string_view message_group) {
- message_group_ = std::string(message_group.data(), message_group.length());
- }
+ void bindMessageGroup(absl::string_view message_group);
void bindMessageQueue(const MQMessageQueue& message_queue) {
message_queue_ = message_queue;
}
- const std::string& messageGroup() const {
- return message_group_;
- }
+ const std::string& messageGroup() const;
const MQMessageQueue& messageQueue() const {
return message_queue_;
@@ -113,7 +109,6 @@ protected:
friend class MessageAccessor;
private:
- std::string message_group_;
MQMessageQueue message_queue_;
};
diff --git a/example/rocketmq/BUILD.bazel b/example/rocketmq/BUILD.bazel
index a2b2dd3..9c7508a 100644
--- a/example/rocketmq/BUILD.bazel
+++ b/example/rocketmq/BUILD.bazel
@@ -27,6 +27,16 @@ cc_binary(
)
cc_binary(
+ name = "example_fifo_producer",
+ srcs = [
+ "ExampleFifoProducer.cpp",
+ ],
+ deps = [
+ "//src/main/cpp/rocketmq:rocketmq_library",
+ ],
+)
+
+cc_binary(
name = "example_async_producer",
srcs = [
"ExampleAsyncProducer.cpp",
@@ -57,6 +67,16 @@ cc_binary(
)
cc_binary(
+ name = "example_fifo_push_consumer",
+ srcs = [
+ "ExampleFifoPushConsumer.cpp",
+ ],
+ deps = [
+ "//src/main/cpp/rocketmq:rocketmq_library",
+ ],
+)
+
+cc_binary(
name = "push_consumer_with_custom_executor",
srcs = [
"PushConsumerWithCustomExecutor.cpp",
diff --git a/example/rocketmq/ExampleFifoProducer.cpp b/example/rocketmq/ExampleFifoProducer.cpp
new file mode 100644
index 0000000..c4d9e23
--- /dev/null
+++ b/example/rocketmq/ExampleFifoProducer.cpp
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "rocketmq/DefaultMQProducer.h"
+#include <algorithm>
+#include <atomic>
+#include <iostream>
+#include <random>
+
+using namespace rocketmq;
+
+const std::string& alphaNumeric() {
+ static std::string alpha_numeric("0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ");
+ return alpha_numeric;
+}
+
+std::string randomString(std::string::size_type len) {
+ std::string result;
+ result.reserve(len);
+ std::random_device rd;
+ std::mt19937 generator(rd());
+ std::string source(alphaNumeric());
+ std::string::size_type generated = 0;
+ while (generated < len) {
+ std::shuffle(source.begin(), source.end(), generator);
+ std::string::size_type delta = std::min({len - generated, source.length()});
+ result.append(source.substr(0, delta));
+ generated += delta;
+ }
+ return result;
+}
+
+int main(int argc, char* argv[]) {
+ Logger& logger = getLogger();
+ logger.setLevel(Level::Debug);
+ logger.init();
+
+ DefaultMQProducer producer("TestGroup");
+
+ const char* topic = "lingchu_test_order_topic";
+ const char* name_server = "120.25.100.131:8081";
+
+ producer.setNamesrvAddr(name_server);
+ producer.compressBodyThreshold(256);
+ const char* resource_namespace = "MQ_INST_1080056302921134_BXyTLppt";
+ // producer.setRegion("cn-hangzhou-pre");
+ producer.setResourceNamespace(resource_namespace);
+ producer.setCredentialsProvider(std::make_shared<ConfigFileCredentialsProvider>());
+
+ std::atomic_bool stopped;
+ std::atomic_long count(0);
+
+ auto stats_lambda = [&] {
+ while (!stopped.load(std::memory_order_relaxed)) {
+ long cnt = count.load(std::memory_order_relaxed);
+ while (count.compare_exchange_weak(cnt, 0)) {
+ break;
+ }
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+ std::cout << "QPS: " << cnt << std::endl;
+ }
+ };
+
+ std::thread stats_thread(stats_lambda);
+
+ std::string body = randomString(1024 * 4);
+ std::cout << "Message body size: " << body.length() << std::endl;
+
+ try {
+ producer.start();
+ for (int i = 0; i < 16; ++i) {
+ std::string sharding_key = "sharding-key-" + std::to_string(i);
+ MQMessage message;
+ message.setTopic(topic);
+ message.setTags("TagA");
+ message.setKey("Yuck! Why-plural?");
+ message.setBody(body);
+
+ SendResult sendResult = producer.send(message, sharding_key);
+ std::cout << sendResult.getMessageQueue().simpleName() << ": " << sendResult.getMsgId() << std::endl;
+ count++;
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+ }
+ } catch (...) {
+ std::cerr << "Ah...No!!!" << std::endl;
+ }
+
+ stopped.store(true, std::memory_order_relaxed);
+ if (stats_thread.joinable()) {
+ stats_thread.join();
+ }
+
+ producer.shutdown();
+ return EXIT_SUCCESS;
+}
\ No newline at end of file
diff --git a/example/rocketmq/ExampleFifoPushConsumer.cpp b/example/rocketmq/ExampleFifoPushConsumer.cpp
new file mode 100644
index 0000000..df039d7
--- /dev/null
+++ b/example/rocketmq/ExampleFifoPushConsumer.cpp
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <chrono>
+#include <iostream>
+#include <mutex>
+#include <thread>
+
+#include "rocketmq/Logger.h"
+
+#include "rocketmq/MessageListener.h"
+#include "spdlog/spdlog.h"
+
+#include "rocketmq/DefaultMQPushConsumer.h"
+
+using namespace rocketmq;
+
+class SampleMQMessageListener : public FifoMessageListener {
+public:
+ ConsumeMessageResult consumeMessage(const MQMessageExt& message) override {
+ SPDLOG_INFO("Consume message[Topic={}, MessageId={}] OK", message.getTopic(), message.getMsgId());
+ std::cout << "Consume Message[MsgId=" << message.getMsgId() << "] OK. Body Size: " << message.getBody().size()
+ << std::endl;
+ // std::this_thread::sleep_for(std::chrono::seconds(1));
+ return ConsumeMessageResult::SUCCESS;
+ }
+};
+
+int main(int argc, char* argv[]) {
+
+ Logger& logger = getLogger();
+ logger.setLevel(Level::Debug);
+ logger.init();
+
+ const char* group_id = "GID_lingchu_test_order";
+ const char* topic = "lingchu_test_order_topic";
+ const char* resource_namespace = "MQ_INST_1080056302921134_BXyTLppt";
+
+ DefaultMQPushConsumer push_consumer(group_id);
+ push_consumer.setResourceNamespace(resource_namespace);
+ push_consumer.setCredentialsProvider(std::make_shared<ConfigFileCredentialsProvider>());
+ push_consumer.setNamesrvAddr("120.25.100.131:8081");
+ FifoMessageListener* listener = new SampleMQMessageListener();
+ push_consumer.setInstanceName("instance_0");
+ push_consumer.subscribe(topic, "*");
+ push_consumer.registerMessageListener(listener);
+ push_consumer.setConsumeThreadCount(4);
+ push_consumer.start();
+
+ std::this_thread::sleep_for(std::chrono::minutes(30));
+
+ push_consumer.shutdown();
+ return EXIT_SUCCESS;
+}
diff --git a/src/main/cpp/base/MQMessage.cpp b/src/main/cpp/base/MQMessage.cpp
index c5d096e..2015a61 100644
--- a/src/main/cpp/base/MQMessage.cpp
+++ b/src/main/cpp/base/MQMessage.cpp
@@ -178,4 +178,13 @@ MessageType MQMessage::messageType() const {
return impl_->system_attribute_.message_type;
}
+void MQMessage::bindMessageGroup(absl::string_view message_group) {
+ impl_->system_attribute_.message_group.append(message_group.data(), message_group.length());
+ messageType(MessageType::FIFO);
+}
+
+const std::string& MQMessage::messageGroup() const {
+ return impl_->system_attribute_.message_group;
+}
+
ROCKETMQ_NAMESPACE_END
diff --git a/src/main/cpp/base/include/Protocol.h b/src/main/cpp/base/include/Protocol.h
index df51ce1..7d52fd6 100644
--- a/src/main/cpp/base/include/Protocol.h
+++ b/src/main/cpp/base/include/Protocol.h
@@ -74,6 +74,7 @@ struct SystemAttribute {
Resource publisher_group;
std::string trace_context;
std::string target_endpoint;
+ std::string message_group;
};
ROCKETMQ_NAMESPACE_END
diff --git a/src/main/cpp/rocketmq/ConsumeFifoMessageService.cpp b/src/main/cpp/rocketmq/ConsumeFifoMessageService.cpp
index 99d9b16..10519e7 100644
--- a/src/main/cpp/rocketmq/ConsumeFifoMessageService.cpp
+++ b/src/main/cpp/rocketmq/ConsumeFifoMessageService.cpp
@@ -19,10 +19,17 @@
#include <limits>
#include <system_error>
+#include "TracingUtility.h"
+#include "absl/strings/str_join.h"
+
+#include "opencensus/trace/propagation/trace_context.h"
+#include "opencensus/trace/span.h"
+
#include "ConsumeFifoMessageService.h"
#include "MessageAccessor.h"
#include "ProcessQueue.h"
#include "PushConsumerImpl.h"
+#include "rocketmq/MessageListener.h"
ROCKETMQ_NAMESPACE_BEGIN
@@ -120,6 +127,59 @@ void ConsumeFifoMessageService::consumeTask(const ProcessQueueWeakPtr& process_q
SPDLOG_DEBUG("Rate-limit permit acquired");
}
+ // Record await-consumption-span
+ {
+ auto span_context = opencensus::trace::propagation::FromTraceParentHeader(message.traceContext());
+
+ auto span = opencensus::trace::Span::BlankSpan();
+ std::string span_name = consumer->resourceNamespace() + "/" + message.getTopic() + " " +
+ MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_AWAIT_OPERATION;
+ if (span_context.IsValid()) {
+ span = opencensus::trace::Span::StartSpanWithRemoteParent(span_name, span_context, &Samplers::always());
+ } else {
+ span = opencensus::trace::Span::StartSpan(span_name, nullptr, {&Samplers::always()});
+ }
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_OPERATION,
+ MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_AWAIT_OPERATION);
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_OPERATION,
+ MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_AWAIT_OPERATION);
+ TracingUtility::addUniversalSpanAttributes(message, *consumer, span);
+ const auto& keys = message.getKeys();
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_KEYS,
+ absl::StrJoin(keys.begin(), keys.end(), MixAll::MESSAGE_KEY_SEPARATOR));
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_AVAILABLE_TIMESTAMP, message.getStoreTimestamp());
+ absl::Time decoded_timestamp = MessageAccessor::decodedTimestamp(message);
+ span.AddAnnotation(
+ MixAll::SPAN_ANNOTATION_AWAIT_CONSUMPTION,
+ {{MixAll::SPAN_ANNOTATION_ATTR_START_TIME,
+ opencensus::trace::AttributeValueRef(absl::ToInt64Milliseconds(decoded_timestamp - absl::UnixEpoch()))}});
+ span.End();
+ MessageAccessor::setTraceContext(const_cast<MQMessageExt&>(message),
+ opencensus::trace::propagation::ToTraceParentHeader(span.context()));
+ }
+
+ auto span_context = opencensus::trace::propagation::FromTraceParentHeader(message.traceContext());
+ auto span = opencensus::trace::Span::BlankSpan();
+ std::string span_name = consumer->resourceNamespace() + "/" + message.getTopic() + " " +
+ MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_PROCESS_OPERATION;
+ if (span_context.IsValid()) {
+ span = opencensus::trace::Span::StartSpanWithRemoteParent(span_name, span_context);
+ } else {
+ span = opencensus::trace::Span::StartSpan(span_name);
+ }
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_OPERATION,
+ MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_PROCESS_OPERATION);
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_OPERATION,
+ MixAll::SPAN_ATTRIBUTE_VALUE_MESSAGING_PROCESS_OPERATION);
+ TracingUtility::addUniversalSpanAttributes(message, *consumer, span);
+ const auto& keys = message.getKeys();
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_KEYS,
+ absl::StrJoin(keys.begin(), keys.end(), MixAll::MESSAGE_KEY_SEPARATOR));
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_ATTEMPT, message.getDeliveryAttempt());
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_AVAILABLE_TIMESTAMP, message.getStoreTimestamp());
+ MessageAccessor::setTraceContext(const_cast<MQMessageExt&>(message),
+ opencensus::trace::propagation::ToTraceParentHeader(span.context()));
+
auto steady_start = std::chrono::steady_clock::now();
try {
@@ -132,6 +192,16 @@ void ConsumeFifoMessageService::consumeTask(const ProcessQueueWeakPtr& process_q
SPDLOG_ERROR("Business FIFO callback raised an exception when consumeMessage");
}
+ switch (result) {
+ case ConsumeMessageResult::SUCCESS:
+ span.SetStatus(opencensus::trace::StatusCode::OK);
+ break;
+ case ConsumeMessageResult::FAILURE:
+ span.SetStatus(opencensus::trace::StatusCode::UNKNOWN);
+ break;
+ }
+ span.End();
+
auto duration = std::chrono::steady_clock::now() - steady_start;
// Log client consume-time costs
diff --git a/src/main/cpp/rocketmq/ProducerImpl.cpp b/src/main/cpp/rocketmq/ProducerImpl.cpp
index 4780220..d22f6dc 100644
--- a/src/main/cpp/rocketmq/ProducerImpl.cpp
+++ b/src/main/cpp/rocketmq/ProducerImpl.cpp
@@ -180,13 +180,14 @@ void ProducerImpl::wrapSendMessageRequest(const MQMessage& message, SendMessageR
system_attribute->set_body_encoding(rmq::Encoding::IDENTITY);
}
+ system_attribute->set_message_group(message.messageGroup());
+ system_attribute->set_message_id(message.getMsgId());
+ system_attribute->set_partition_id(message_queue.getQueueId());
+
// Forward user-defined-properties
for (auto& item : message.getProperties()) {
request.mutable_message()->mutable_user_attribute()->insert({item.first, item.second});
}
-
- system_attribute->set_message_id(message.getMsgId());
- system_attribute->set_partition_id(message_queue.getQueueId());
SPDLOG_TRACE("SendMessageRequest: {}", request.DebugString());
}