You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/05/30 07:04:38 UTC
[rocketmq-client-cpp] branch main updated: Instrument with opencensus to collect various metrics (#415)
This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new e7d188a Instrument with opencensus to collect various metrics (#415)
e7d188a is described below
commit e7d188a3dc3ec00e11e0c00aba45b5e135b1185b
Author: Zhanhui Li <li...@gmail.com>
AuthorDate: Mon May 30 15:04:33 2022 +0800
Instrument with opencensus to collect various metrics (#415)
* Sync with protobuf files
* Add more tags
* Collect send success, failure and latency statistics data
* Collect metrics for tx commit/rollback success/failure
* Extract tag definitions to Tag
* Define measures and metrics for consumer
* Collect metrics for PushConsumer
* Collect more metrics
* Use latest opencensus to configure exporter interval
* WIP
* WIP
* WIP
* Remove metrics of committing/rolling-back transactional messages
* Fix trace
---
.vscode/settings.json | 4 +
api/rocketmq/Message.h | 4 +
bazel/rocketmq_deps.bzl | 11 ++
proto/apache/rocketmq/v2/definition.proto | 9 +-
proto/apache/rocketmq/v2/service.proto | 25 ++-
src/main/cpp/client/ClientManagerImpl.cpp | 8 +-
src/main/cpp/client/TelemetryBidiReactor.cpp | 8 +
src/main/cpp/client/include/ClientConfig.h | 6 +
.../cpp/rocketmq/ConsumeMessageServiceImpl.cpp | 29 +++-
src/main/cpp/rocketmq/ConsumeTask.cpp | 15 ++
src/main/cpp/rocketmq/ProducerImpl.cpp | 63 ++++---
src/main/cpp/rocketmq/SendContext.cpp | 48 +++++-
.../cpp/rocketmq/include/ConsumeMessageService.h | 2 +
.../rocketmq/include/ConsumeMessageServiceImpl.h | 2 +
src/main/cpp/rocketmq/include/ProducerImpl.h | 8 +-
src/main/cpp/rocketmq/include/PushConsumerImpl.h | 7 +
src/main/cpp/rocketmq/include/SendContext.h | 2 +
src/main/cpp/stats/BUILD.bazel | 2 +
src/main/cpp/stats/ConsumeStats.cpp | 148 ++++++++++++++++
src/main/cpp/stats/MetricBidiReactor.cpp | 116 +++++++++++++
src/main/cpp/stats/OpencensusExporter.cpp | 190 +++++++++++++++++++++
src/main/cpp/stats/PublishStats.cpp | 20 +--
.../cpp/stats/{include/PublishStats.h => Tag.cpp} | 45 ++---
src/main/cpp/stats/include/ConsumeStats.h | 87 ++++++++++
.../stats/include/{PublishStats.h => Exporter.h} | 26 +--
src/main/cpp/stats/include/MetricBidiReactor.h | 78 +++++++++
src/main/cpp/stats/include/OpencensusExporter.h | 53 ++++++
src/main/cpp/stats/include/PublishStats.h | 9 +-
.../cpp/stats/include/{PublishStats.h => Tag.h} | 25 +--
src/main/cpp/stats/tests/PublishStatsTest.cpp | 71 +++++++-
30 files changed, 983 insertions(+), 138 deletions(-)
diff --git a/.vscode/settings.json b/.vscode/settings.json
index cb613f9..9d42abd 100644
--- a/.vscode/settings.json
+++ b/.vscode/settings.json
@@ -9,5 +9,9 @@
"--header-insertion=never",
"--compile-commands-dir=${workspaceFolder}/",
"--query-driver=/**/*",
+ ],
+ "cSpell.words": [
+ "Opencensus",
+ "SPDLOG"
]
}
\ No newline at end of file
diff --git a/api/rocketmq/Message.h b/api/rocketmq/Message.h
index 0537fa4..0c2ef02 100644
--- a/api/rocketmq/Message.h
+++ b/api/rocketmq/Message.h
@@ -78,6 +78,10 @@ public:
return absl::make_optional(trace_context_);
}
+ void traceContext(std::string &&trace_context) {
+ trace_context_ = std::move(trace_context);
+ }
+
const std::string& bornHost() const {
return born_host_;
}
diff --git a/bazel/rocketmq_deps.bzl b/bazel/rocketmq_deps.bzl
index 7282dd7..39f3642 100644
--- a/bazel/rocketmq_deps.bzl
+++ b/bazel/rocketmq_deps.bzl
@@ -85,6 +85,17 @@ def rocketmq_deps():
],
)
+ maybe(
+ http_archive,
+ name = "io_opencensus_cpp",
+ sha256 = "317f2bfdaba469561c7e64b1a55282b87e677c109c9d8877097940e6d5cbca08",
+ urls = [
+ "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/opencensus-cpp/opencensus-cpp-0.4.1.tar.gz",
+ "https://github.com/lizhanhui/opencensus-cpp/archive/refs/tags/v0.4.1.tar.gz",
+ ],
+ strip_prefix = "opencensus-cpp-0.4.1",
+ )
+
if "com_google_absl" not in native.existing_rules():
http_archive(
name = "com_google_absl",
diff --git a/proto/apache/rocketmq/v2/definition.proto b/proto/apache/rocketmq/v2/definition.proto
index 6565a21..58bf06d 100644
--- a/proto/apache/rocketmq/v2/definition.proto
+++ b/proto/apache/rocketmq/v2/definition.proto
@@ -291,12 +291,6 @@ message Assignment {
MessageQueue message_queue = 1;
}
-message SendReceipt {
- string message_id = 1;
- string transaction_id = 2;
- int64 offset = 3;
-}
-
enum Code {
// Success.
OK = 0;
@@ -395,6 +389,9 @@ enum Code {
// Client type could not be recognized.
UNRECOGNIZED_CLIENT_TYPE = 32;
+ // Return different results for entries in composite request.
+ MULTIPLE_RESULTS = 33;
+
// Code indicates that the server encountered an unexpected condition
// that prevented it from fulfilling the request.
// This error response is a generic "catch-all" response.
diff --git a/proto/apache/rocketmq/v2/service.proto b/proto/apache/rocketmq/v2/service.proto
index aa688b4..fa2a2f0 100644
--- a/proto/apache/rocketmq/v2/service.proto
+++ b/proto/apache/rocketmq/v2/service.proto
@@ -60,9 +60,19 @@ message SendMessageRequest {
repeated Message messages = 1;
}
+message SendResultEntry {
+ Status status = 1;
+ string message_id = 2;
+ string transaction_id = 3;
+ int64 offset = 4;
+}
+
message SendMessageResponse {
Status status = 1;
- repeated SendReceipt receipts = 2;
+
+ // Some implementation may have partial failure issues. Client SDK developers are expected to inspect
+ // each entry for best certainty.
+ repeated SendResultEntry entries = 2;
}
message QueryAssignmentRequest {
@@ -169,7 +179,8 @@ message ThreadStackTrace {
message VerifyMessageCommand {
string nonce = 1;
- Message message = 2;
+ MessageQueue message_queue = 2;
+ Message message = 3;
}
message VerifyMessageResult {
@@ -230,6 +241,14 @@ message Subscription {
optional google.protobuf.Duration long_polling_timeout = 5;
}
+message Metric {
+ // Indicates that if client should export local metrics to server.
+ bool on = 1;
+
+ // The endpoint that client metrics should be exported to, which is required if the switch is on.
+ optional Endpoints endpoints = 2;
+}
+
message Settings {
// Configurations for all clients.
optional ClientType client_type = 1;
@@ -259,6 +278,8 @@ message Settings {
// User agent details
UA user_agent = 7;
+
+ Metric metric = 8;
}
message TelemetryCommand {
diff --git a/src/main/cpp/client/ClientManagerImpl.cpp b/src/main/cpp/client/ClientManagerImpl.cpp
index a5fe96a..0c6e155 100644
--- a/src/main/cpp/client/ClientManagerImpl.cpp
+++ b/src/main/cpp/client/ClientManagerImpl.cpp
@@ -316,7 +316,7 @@ bool ClientManagerImpl::send(const std::string& target_host, const Metadata& met
auto&& status = invocation_context->response.status();
switch (invocation_context->response.status().code()) {
case rmq::Code::OK: {
- send_receipt.message_id = invocation_context->response.receipts().begin()->message_id();
+ send_receipt.message_id = invocation_context->response.entries().begin()->message_id();
break;
}
case rmq::Code::TOPIC_NOT_FOUND: {
@@ -416,9 +416,9 @@ SendReceipt ClientManagerImpl::processSendResponse(const rmq::MessageQueue& mess
switch (response.status().code()) {
case rmq::Code::OK: {
- assert(response.receipts_size() > 0);
- send_receipt.message_id = response.receipts().begin()->message_id();
- send_receipt.transaction_id = response.receipts().begin()->transaction_id();
+ assert(response.entries_size() > 0);
+ send_receipt.message_id = response.entries().begin()->message_id();
+ send_receipt.transaction_id = response.entries().begin()->transaction_id();
return send_receipt;
}
case rmq::Code::ILLEGAL_TOPIC: {
diff --git a/src/main/cpp/client/TelemetryBidiReactor.cpp b/src/main/cpp/client/TelemetryBidiReactor.cpp
index f39cdec..cf251e4 100644
--- a/src/main/cpp/client/TelemetryBidiReactor.cpp
+++ b/src/main/cpp/client/TelemetryBidiReactor.cpp
@@ -194,6 +194,14 @@ void TelemetryBidiReactor::applySettings(const rmq::Settings& settings) {
applyBackoffPolicy(settings, ptr);
+ // Sync metrics collector configuration
+ if (settings.has_metric()) {
+ const auto& metric = settings.metric();
+ ptr->config().metric.on = metric.on();
+ ptr->config().metric.endpoints.set_scheme(metric.endpoints().scheme());
+ ptr->config().metric.endpoints.mutable_addresses()->CopyFrom(metric.endpoints().addresses());
+ }
+
switch (settings.pub_sub_case()) {
case rmq::Settings::PubSubCase::kPublishing: {
applyPublishingConfig(settings, ptr);
diff --git a/src/main/cpp/client/include/ClientConfig.h b/src/main/cpp/client/include/ClientConfig.h
index 458ca5f..66ca230 100644
--- a/src/main/cpp/client/include/ClientConfig.h
+++ b/src/main/cpp/client/include/ClientConfig.h
@@ -44,6 +44,11 @@ struct SubscriberConfig {
absl::Duration polling_timeout{absl::Seconds(30)};
};
+struct Metric {
+ bool on{false};
+ rmq::Endpoints endpoints;
+};
+
struct ClientConfig {
std::string client_id;
rmq::ClientType client_type{rmq::ClientType::CLIENT_TYPE_UNSPECIFIED};
@@ -55,6 +60,7 @@ struct ClientConfig {
std::shared_ptr<CredentialsProvider> credentials_provider;
PublisherConfig publisher;
SubscriberConfig subscriber;
+ Metric metric;
};
ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/main/cpp/rocketmq/ConsumeMessageServiceImpl.cpp b/src/main/cpp/rocketmq/ConsumeMessageServiceImpl.cpp
index 9c9ad49..74d8176 100644
--- a/src/main/cpp/rocketmq/ConsumeMessageServiceImpl.cpp
+++ b/src/main/cpp/rocketmq/ConsumeMessageServiceImpl.cpp
@@ -16,9 +16,11 @@
*/
#include "ConsumeMessageServiceImpl.h"
+#include "ConsumeStats.h"
#include "ConsumeTask.h"
#include "LoggerImpl.h"
#include "PushConsumerImpl.h"
+#include "Tag.h"
#include "ThreadPoolImpl.h"
ROCKETMQ_NAMESPACE_BEGIN
@@ -81,7 +83,24 @@ void ConsumeMessageServiceImpl::ack(const Message& message, std::function<void(c
return;
}
- consumer->ack(message, cb);
+ std::weak_ptr<PushConsumerImpl> client(consumer_);
+ const auto& topic = message.topic();
+ // Collect metrics
+ opencensus::stats::Record({{consumer->stats().processSuccess(), 1}},
+ {{Tag::topicTag(), topic}, {Tag::clientIdTag(), consumer->config().client_id}});
+ auto callback = [cb, client, topic](const std::error_code& ec) {
+ auto consumer = client.lock();
+ if (ec) {
+ opencensus::stats::Record({{consumer->stats().ackFailure(), 1}},
+ {{Tag::topicTag(), topic}, {Tag::clientIdTag(), consumer->config().client_id}});
+ } else {
+ opencensus::stats::Record({{consumer->stats().ackSuccess(), 1}},
+ {{Tag::topicTag(), topic}, {Tag::clientIdTag(), consumer->config().client_id}});
+ }
+ cb(ec);
+ };
+
+ consumer->ack(message, callback);
}
void ConsumeMessageServiceImpl::nack(const Message& message, std::function<void(const std::error_code&)> cb) {
@@ -89,7 +108,9 @@ void ConsumeMessageServiceImpl::nack(const Message& message, std::function<void(
if (!consumer) {
return;
}
-
+ // Collect metrics
+ opencensus::stats::Record({{consumer->stats().processFailure(), 1}},
+ {{Tag::topicTag(), message.topic()}, {Tag::clientIdTag(), consumer->config().client_id}});
consumer->nack(message, cb);
}
@@ -114,6 +135,10 @@ std::size_t ConsumeMessageServiceImpl::maxDeliveryAttempt() {
return consumer->maxDeliveryAttempts();
}
+std::weak_ptr<PushConsumerImpl> ConsumeMessageServiceImpl::consumer() {
+ return consumer_;
+}
+
bool ConsumeMessageServiceImpl::preHandle(const Message& message) {
return true;
}
diff --git a/src/main/cpp/rocketmq/ConsumeTask.cpp b/src/main/cpp/rocketmq/ConsumeTask.cpp
index dede4a4..69d299a 100644
--- a/src/main/cpp/rocketmq/ConsumeTask.cpp
+++ b/src/main/cpp/rocketmq/ConsumeTask.cpp
@@ -17,7 +17,10 @@
#include "ConsumeTask.h"
+#include "ConsumeStats.h"
#include "LoggerImpl.h"
+#include "PushConsumerImpl.h"
+#include "Tag.h"
ROCKETMQ_NAMESPACE_BEGIN
@@ -115,6 +118,8 @@ void ConsumeTask::process() {
return;
}
+ std::shared_ptr<PushConsumerImpl> consumer = svc->consumer().lock();
+
auto self = shared_from_this();
switch (next_step_) {
@@ -123,7 +128,17 @@ void ConsumeTask::process() {
auto it = messages_.begin();
SPDLOG_DEBUG("Start to process message[message-id={}]", (*it)->id());
svc->preHandle(**it);
+ auto await_time = std::chrono::system_clock::now() - (*it)->extension().decode_time;
+ opencensus::stats::Record(
+ {{consumer->stats().awaitTime(), MixAll::millisecondsOf(await_time)}},
+ {{Tag::topicTag(), (*it)->topic()}, {Tag::clientIdTag(), consumer->config().client_id}});
+
+ std::chrono::steady_clock::time_point start = std::chrono::steady_clock::now();
auto result = listener(**it);
+ auto duration = std::chrono::steady_clock::now() - start;
+ opencensus::stats::Record(
+ {{consumer->stats().processTime(), MixAll::millisecondsOf(duration)}},
+ {{Tag::topicTag(), (*it)->topic()}, {Tag::clientIdTag(), consumer->config().client_id}});
svc->postHandle(**it, result);
switch (result) {
case ConsumeResult::SUCCESS: {
diff --git a/src/main/cpp/rocketmq/ProducerImpl.cpp b/src/main/cpp/rocketmq/ProducerImpl.cpp
index 4c203d8..4b6c3f0 100644
--- a/src/main/cpp/rocketmq/ProducerImpl.cpp
+++ b/src/main/cpp/rocketmq/ProducerImpl.cpp
@@ -34,6 +34,7 @@
#include "SendContext.h"
#include "SendMessageContext.h"
#include "Signature.h"
+#include "Tag.h"
#include "TracingUtility.h"
#include "TransactionImpl.h"
#include "UniqueIdGenerator.h"
@@ -287,32 +288,35 @@ void ProducerImpl::sendImpl(std::shared_ptr<SendContext> context) {
return;
}
- // {
- // Trace Send RPC
- // auto span_context =
- // opencensus::trace::propagation::FromTraceParentHeader(context->message_.traceContext().value()); auto span =
- // opencensus::trace::Span::BlankSpan(); std::string span_name = resourceNamespace() + "/" + context->message_.topic()
- // + " " +
- // MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_SEND_OPERATION;
- // if (span_context.IsValid()) {
- // span = opencensus::trace::Span::StartSpanWithRemoteParent(span_name, span_context, {&Samplers::always()});
- // } else {
- // span = opencensus::trace::Span::StartSpan(span_name, nullptr, {&Samplers::always()});
- // }
- // span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_OPERATION,
- // MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_SEND_OPERATION);
- // span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_OPERATION,
- // MixAll::SPAN_ATTRIBUTE_VALUE_MESSAGING_SEND_OPERATION);
- // TracingUtility::addUniversalSpanAttributes(context->message_, *this, span);
- // Note: attempt-time is 0-based
- // span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_ATTEMPT, 1 + callback->attemptTime());
- // if (message.deliveryTimestamp() != absl::ToChronoTime(absl::UnixEpoch())) {
- // span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_DELIVERY_TIMESTAMP,
- // absl::FormatTime(absl::FromChrono(message.deliveryTimestamp())));
- // }
- // callback->message().traceContext(opencensus::trace::propagation::ToTraceParentHeader(span.context()));
- // callback->span() = span;
- // }
+ {
+ // Trace Send RPC
+ if (context->message_->traceContext().has_value()) {
+ auto span_context =
+ opencensus::trace::propagation::FromTraceParentHeader(context->message_->traceContext().value());
+ auto span = opencensus::trace::Span::BlankSpan();
+ std::string span_name = resourceNamespace() + "/" + context->message_->topic() + " " +
+ MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_SEND_OPERATION;
+ if (span_context.IsValid()) {
+ span = opencensus::trace::Span::StartSpanWithRemoteParent(span_name, span_context, {traceSampler()});
+ } else {
+ span = opencensus::trace::Span::StartSpan(span_name, nullptr, {traceSampler()});
+ }
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_OPERATION,
+ MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_SEND_OPERATION);
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_OPERATION,
+ MixAll::SPAN_ATTRIBUTE_VALUE_MESSAGING_SEND_OPERATION);
+ TracingUtility::addUniversalSpanAttributes(*context->message_, config(), span);
+ // Note: attempt-time is 0-based
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_ATTEMPT, 1 + context->attempt_times_);
+ if (context->message_->deliveryTimestamp().has_value()) {
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_DELIVERY_TIMESTAMP,
+ absl::FormatTime(absl::FromChrono(context->message_->deliveryTimestamp().value())));
+ }
+ auto ptr = const_cast<Message*>(context->message_.get());
+ ptr->traceContext(opencensus::trace::propagation::ToTraceParentHeader(span.context()));
+ context->span_ = span;
+ }
+ }
SendMessageRequest request;
wrapSendMessageRequest(*context->message_, request, context->messageQueue());
@@ -354,7 +358,8 @@ void ProducerImpl::send0(MessageConstPtr message, SendCallback callback, std::ve
bool ProducerImpl::endTransaction0(const Transaction& transaction, TransactionState resolution) {
EndTransactionRequest request;
- request.mutable_topic()->set_name(transaction.topic());
+ const std::string& topic = transaction.topic();
+ request.mutable_topic()->set_name(topic);
request.mutable_topic()->set_resource_namespace(resourceNamespace());
request.set_message_id(transaction.messageId());
request.set_transaction_id(transaction.messageId());
@@ -396,7 +401,9 @@ bool ProducerImpl::endTransaction0(const Transaction& transaction, TransactionSt
auto mtx = std::make_shared<absl::Mutex>();
auto cv = std::make_shared<absl::CondVar>();
const auto& endpoint = transaction.endpoint();
- auto cb = [&, span, endpoint, mtx, cv](const std::error_code& ec, const EndTransactionResponse& response) {
+ std::weak_ptr<ProducerImpl> publisher(shared_from_this());
+
+ auto cb = [&, span, endpoint, mtx, cv, topic](const std::error_code& ec, const EndTransactionResponse& response) {
if (ec) {
{
span.SetStatus(opencensus::trace::StatusCode::ABORTED);
diff --git a/src/main/cpp/rocketmq/SendContext.cpp b/src/main/cpp/rocketmq/SendContext.cpp
index 73db9a1..9dee0e8 100644
--- a/src/main/cpp/rocketmq/SendContext.cpp
+++ b/src/main/cpp/rocketmq/SendContext.cpp
@@ -18,14 +18,15 @@
#include <system_error>
-#include "opencensus/trace/propagation/trace_context.h"
-#include "opencensus/trace/span.h"
-#include "spdlog/spdlog.h"
-
#include "ProducerImpl.h"
+#include "PublishStats.h"
+#include "Tag.h"
#include "TransactionImpl.h"
+#include "opencensus/trace/propagation/trace_context.h"
+#include "opencensus/trace/span.h"
#include "rocketmq/Logger.h"
#include "rocketmq/SendReceipt.h"
+#include "spdlog/spdlog.h"
ROCKETMQ_NAMESPACE_BEGIN
@@ -35,6 +36,28 @@ void SendContext::onSuccess(const SendReceipt& send_receipt) noexcept {
span_.SetStatus(opencensus::trace::StatusCode::OK);
span_.End();
}
+
+ auto publisher = producer_.lock();
+ if (!publisher) {
+ return;
+ }
+
+ // Collect metrics
+ {
+ auto duration = std::chrono::steady_clock::now() - request_time_;
+ opencensus::stats::Record({{publisher->stats().latency(), MixAll::millisecondsOf(duration)}},
+ {
+ {Tag::topicTag(), message_->topic()},
+ {Tag::clientIdTag(), publisher->config().client_id},
+ });
+
+ opencensus::stats::Record({{publisher->stats().success(), 1}},
+ {
+ {Tag::topicTag(), message_->topic()},
+ {Tag::clientIdTag(), publisher->config().client_id},
+ });
+ }
+
// send_receipt.traceContext(opencensus::trace::propagation::ToTraceParentHeader(span_.context()));
std::error_code ec;
callback_(ec, send_receipt);
@@ -52,6 +75,22 @@ void SendContext::onFailure(const std::error_code& ec) noexcept {
return;
}
+ // Collect metrics
+ {
+ auto duration = std::chrono::steady_clock::now() - request_time_;
+ opencensus::stats::Record({{publisher->stats().latency(), MixAll::millisecondsOf(duration)}},
+ {
+ {Tag::topicTag(), message_->topic()},
+ {Tag::clientIdTag(), publisher->config().client_id},
+ });
+
+ opencensus::stats::Record({{publisher->stats().failure(), 1}},
+ {
+ {Tag::topicTag(), message_->topic()},
+ {Tag::clientIdTag(), publisher->config().client_id},
+ });
+ }
+
if (++attempt_times_ >= publisher->maxAttemptTimes()) {
SPDLOG_WARN("Retried {} times, which exceeds the limit: {}", attempt_times_, publisher->maxAttemptTimes());
callback_(ec, {});
@@ -72,6 +111,7 @@ void SendContext::onFailure(const std::error_code& ec) noexcept {
}
auto message_queue = candidates_[attempt_times_ % candidates_.size()];
+ request_time_ = std::chrono::steady_clock::now();
producer->sendImpl(shared_from_this());
}
diff --git a/src/main/cpp/rocketmq/include/ConsumeMessageService.h b/src/main/cpp/rocketmq/include/ConsumeMessageService.h
index 39d9d8e..0f8b49a 100644
--- a/src/main/cpp/rocketmq/include/ConsumeMessageService.h
+++ b/src/main/cpp/rocketmq/include/ConsumeMessageService.h
@@ -61,6 +61,8 @@ public:
virtual void schedule(std::shared_ptr<ConsumeTask> task, std::chrono::milliseconds delay) = 0;
virtual std::size_t maxDeliveryAttempt() = 0;
+
+ virtual std::weak_ptr<PushConsumerImpl> consumer() = 0;
};
using ConsumeMessageServiceWeakPtr = std::weak_ptr<ConsumeMessageService>;
diff --git a/src/main/cpp/rocketmq/include/ConsumeMessageServiceImpl.h b/src/main/cpp/rocketmq/include/ConsumeMessageServiceImpl.h
index 7d6c34a..632a388 100644
--- a/src/main/cpp/rocketmq/include/ConsumeMessageServiceImpl.h
+++ b/src/main/cpp/rocketmq/include/ConsumeMessageServiceImpl.h
@@ -72,6 +72,8 @@ public:
std::size_t maxDeliveryAttempt() override;
+ std::weak_ptr<PushConsumerImpl> consumer() override;
+
protected:
std::atomic<State> state_;
diff --git a/src/main/cpp/rocketmq/include/ProducerImpl.h b/src/main/cpp/rocketmq/include/ProducerImpl.h
index b765fa6..a50c6ef 100644
--- a/src/main/cpp/rocketmq/include/ProducerImpl.h
+++ b/src/main/cpp/rocketmq/include/ProducerImpl.h
@@ -37,6 +37,7 @@
#include "rocketmq/SendReceipt.h"
#include "rocketmq/State.h"
#include "rocketmq/TransactionChecker.h"
+#include "PublishStats.h"
ROCKETMQ_NAMESPACE_BEGIN
@@ -105,7 +106,10 @@ public:
void buildClientSettings(rmq::Settings& settings) override;
- void topicsOfInterest(std::vector<std::string> topics) LOCKS_EXCLUDED(topics_mtx_);
+ void topicsOfInterest(std::vector<std::string> topics)
+ LOCKS_EXCLUDED(topics_mtx_);
+
+ const PublishStats& stats() const { return stats_; }
protected:
std::shared_ptr<ClientImpl> self() override {
@@ -126,6 +130,8 @@ private:
std::vector<std::string> topics_ GUARDED_BY(topics_mtx_);
absl::Mutex topics_mtx_;
+ PublishStats stats_;
+
/**
* @brief Acquire PublishInfo for the given topic.
* Generally speaking, it first checks presence of the desired info in local cache, aka, topic_publish_table_;
diff --git a/src/main/cpp/rocketmq/include/PushConsumerImpl.h b/src/main/cpp/rocketmq/include/PushConsumerImpl.h
index cf5fe50..0067009 100644
--- a/src/main/cpp/rocketmq/include/PushConsumerImpl.h
+++ b/src/main/cpp/rocketmq/include/PushConsumerImpl.h
@@ -25,6 +25,7 @@
#include "ClientImpl.h"
#include "ClientManagerImpl.h"
#include "ConsumeMessageService.h"
+#include "ConsumeStats.h"
#include "ProcessQueue.h"
#include "Scheduler.h"
#include "TopicAssignmentInfo.h"
@@ -162,6 +163,10 @@ public:
return client_config_.subscriber.group.name();
}
+ const ConsumeStats& stats() const {
+ return stats_;
+ }
+
protected:
std::shared_ptr<ClientImpl> self() override {
return shared_from_this();
@@ -206,6 +211,8 @@ private:
int32_t max_delivery_attempts_{MixAll::DEFAULT_MAX_DELIVERY_ATTEMPTS};
+ ConsumeStats stats_;
+
void fetchRoutes() LOCKS_EXCLUDED(topic_filter_expression_table_mtx_);
std::chrono::milliseconds invisibleDuration(std::size_t attempt);
diff --git a/src/main/cpp/rocketmq/include/SendContext.h b/src/main/cpp/rocketmq/include/SendContext.h
index bba83c8..4c05ceb 100644
--- a/src/main/cpp/rocketmq/include/SendContext.h
+++ b/src/main/cpp/rocketmq/include/SendContext.h
@@ -72,6 +72,8 @@ public:
* @brief The on-going span. Should be terminated in the callback functions.
*/
opencensus::trace::Span span_;
+
+ std::chrono::steady_clock::time_point request_time_{std::chrono::steady_clock::now()};
};
ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/main/cpp/stats/BUILD.bazel b/src/main/cpp/stats/BUILD.bazel
index f31a081..fbf6af2 100644
--- a/src/main/cpp/stats/BUILD.bazel
+++ b/src/main/cpp/stats/BUILD.bazel
@@ -22,7 +22,9 @@ cc_library(
strip_include_prefix = "//src/main/cpp/stats/include",
deps = [
"//api:rocketmq_interface",
+ "//src/main/cpp/client:client_library",
"@io_opencensus_cpp//opencensus/stats",
+ "@opencensus_proto//opencensus/proto/agent/metrics/v1:metrics_service_grpc_cc",
],
visibility = ["//visibility:public"],
)
\ No newline at end of file
diff --git a/src/main/cpp/stats/ConsumeStats.cpp b/src/main/cpp/stats/ConsumeStats.cpp
new file mode 100644
index 0000000..e563c8c
--- /dev/null
+++ b/src/main/cpp/stats/ConsumeStats.cpp
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ConsumeStats.h"
+
+#include "Tag.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+ConsumeStats::ConsumeStats()
+ : process_success_(
+ opencensus::stats::MeasureInt64::Register("process_success", "Number of messages processed", "1")),
+ process_failure_(opencensus::stats::MeasureInt64::Register(
+ "process_failure", "Number of failures when processing messages", "1")),
+ ack_success_(opencensus::stats::MeasureInt64::Register("ack_success", "Number of messages acknowledged", "1")),
+ ack_failure_(opencensus::stats::MeasureInt64::Register(
+ "ack_failure", "Number of failures when acknowledging messages", "1")),
+ change_invisible_time_success_(opencensus::stats::MeasureInt64::Register(
+ "change_invisible_time_success", "Number of change-invisible-time performed", "1")),
+ change_invisible_time_failure_(opencensus::stats::MeasureInt64::Register(
+ "change_invisible_time_failure", "Number of failures when changing message invisible time", "1")),
+ cached_message_quantity_(opencensus::stats::MeasureInt64::Register(
+ "cached_message_quantity", "Number of locally cached messages", "1")),
+ cached_message_bytes_(opencensus::stats::MeasureInt64::Register(
+ "cached_message_bytes", "Number of locally cached messages in bytes", "1")),
+ delivery_latency_(opencensus::stats::MeasureInt64::Register(
+ "delivery_latency", "Time spent delivering messages from servers to clients", "1")),
+ await_time_(opencensus::stats::MeasureInt64::Register(
+ "await_time", "Client side queuing time of messages before getting processed", "1")),
+ process_time_(opencensus::stats::MeasureInt64::Register("process_time", "Process message time", "1")) {
+ opencensus::stats::ViewDescriptor()
+ .set_name("rocketmq_process_success_total")
+ .set_description("Number of messages processed")
+ .set_measure("process_success")
+ .set_aggregation(opencensus::stats::Aggregation::Sum())
+ .add_column(Tag::topicTag())
+ .add_column(Tag::clientIdTag())
+ .RegisterForExport();
+
+ opencensus::stats::ViewDescriptor()
+ .set_name("rocketmq_process_failure_total")
+ .set_description("Number of failures on processing messages")
+ .set_measure("process_failure")
+ .set_aggregation(opencensus::stats::Aggregation::Sum())
+ .add_column(Tag::topicTag())
+ .add_column(Tag::clientIdTag())
+ .RegisterForExport();
+
+ opencensus::stats::ViewDescriptor()
+ .set_name("rocketmq_ack_success_total")
+ .set_description("Number of messages acknowledged")
+ .set_measure("ack_success")
+ .set_aggregation(opencensus::stats::Aggregation::Sum())
+ .add_column(Tag::topicTag())
+ .add_column(Tag::clientIdTag())
+ .RegisterForExport();
+
+ opencensus::stats::ViewDescriptor()
+ .set_name("rocketmq_ack_failure_total")
+ .set_description("Number of failures on acknowledging messages")
+ .set_measure("ack_failure")
+ .set_aggregation(opencensus::stats::Aggregation::Sum())
+ .add_column(Tag::topicTag())
+ .add_column(Tag::clientIdTag())
+ .RegisterForExport();
+
+ opencensus::stats::ViewDescriptor()
+ .set_name("rocketmq_change_invisible_time_success_total")
+ .set_description("Number of change-invisible-time operations")
+ .set_measure("change_invisible_time_success")
+ .set_aggregation(opencensus::stats::Aggregation::Sum())
+ .add_column(Tag::topicTag())
+ .add_column(Tag::clientIdTag())
+ .RegisterForExport();
+
+ opencensus::stats::ViewDescriptor()
+ .set_name("rocketmq_change_invisible_time_failure_total")
+ .set_description("Number of failed change-invisible-time operations")
+ .set_measure("change_invisible_time_failure")
+ .set_aggregation(opencensus::stats::Aggregation::Sum())
+ .add_column(Tag::topicTag())
+ .add_column(Tag::clientIdTag())
+ .RegisterForExport();
+
+ opencensus::stats::ViewDescriptor()
+ .set_name("rocketmq_consumer_cached_messages")
+ .set_description("Number of messages locally cached")
+ .set_measure("cached_message_quantity")
+ .set_aggregation(opencensus::stats::Aggregation::LastValue())
+ .add_column(Tag::topicTag())
+ .add_column(Tag::clientIdTag())
+ .RegisterForExport();
+
+ opencensus::stats::ViewDescriptor()
+ .set_name("rocketmq_consumer_cached_bytes")
+ .set_description("Number of locally cached messages in bytes")
+ .set_measure("cached_message_bytes")
+ .set_aggregation(opencensus::stats::Aggregation::LastValue())
+ .add_column(Tag::topicTag())
+ .add_column(Tag::clientIdTag())
+ .RegisterForExport();
+
+ opencensus::stats::ViewDescriptor()
+ .set_name("rocketmq_delivery_latency")
+ .set_description("Message delivery latency")
+ .set_measure("delivery_latency")
+ .set_aggregation(opencensus::stats::Aggregation::Distribution(
+ opencensus::stats::BucketBoundaries::Explicit({5, 10, 20, 50, 500})))
+ .add_column(Tag::topicTag())
+ .add_column(Tag::clientIdTag())
+ .RegisterForExport();
+
+ opencensus::stats::ViewDescriptor()
+ .set_name("rocketmq_await_time")
+ .set_description("Message await time")
+ .set_measure("await_time")
+ .set_aggregation(opencensus::stats::Aggregation::Distribution(
+ opencensus::stats::BucketBoundaries::Explicit({1, 1000, 60000, 900000})))
+ .add_column(Tag::topicTag())
+ .add_column(Tag::clientIdTag())
+ .RegisterForExport();
+
+ opencensus::stats::ViewDescriptor()
+ .set_name("rocketmq_process_time")
+ .set_description("Process time")
+ .set_measure("process_time")
+ .set_aggregation(opencensus::stats::Aggregation::Distribution(
+ opencensus::stats::BucketBoundaries::Explicit({100, 1000, 60000, 900000})))
+ .add_column(Tag::topicTag())
+ .add_column(Tag::clientIdTag())
+ .RegisterForExport();
+}
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/main/cpp/stats/MetricBidiReactor.cpp b/src/main/cpp/stats/MetricBidiReactor.cpp
new file mode 100644
index 0000000..62983ee
--- /dev/null
+++ b/src/main/cpp/stats/MetricBidiReactor.cpp
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "MetricBidiReactor.h"
+
+#include "LoggerImpl.h"
+#include "OpencensusExporter.h"
+#include "Signature.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+MetricBidiReactor::MetricBidiReactor(std::weak_ptr<Client> client, std::weak_ptr<OpencensusExporter> exporter)
+ : client_(client), exporter_(exporter) {
+ grpc::ClientContext context;
+ auto ptr = client_.lock();
+
+ Metadata metadata;
+ Signature::sign(ptr->config(), metadata);
+
+ for (const auto& entry : metadata) {
+ context.AddMetadata(entry.first, entry.second);
+ }
+ context.set_deadline(std::chrono::system_clock::now() + absl::ToChronoMilliseconds(ptr->config().request_timeout));
+
+ auto exporter_ptr = exporter_.lock();
+ if (!exporter_ptr) {
+ return;
+ }
+
+ exporter_ptr->stub()->async()->Export(&context, this);
+ StartCall();
+}
+
+void MetricBidiReactor::OnReadDone(bool ok) {
+ if (!ok) {
+ SPDLOG_WARN("Failed to read response");
+ return;
+ }
+
+ StartRead(&response_);
+}
+
+void MetricBidiReactor::OnWriteDone(bool ok) {
+ if (!ok) {
+ SPDLOG_WARN("Failed to report metrics");
+ return;
+ }
+
+ bool expected = true;
+ if (inflight_.compare_exchange_strong(expected, false, std::memory_order_relaxed)) {
+ fireWrite();
+ }
+}
+
+void MetricBidiReactor::OnDone(const grpc::Status& s) {
+ auto client = client_.lock();
+ if (!client) {
+ return;
+ }
+
+ if (s.ok()) {
+ SPDLOG_DEBUG("Bi-directional stream ended. status.code={}, status.message={}", s.error_code(), s.error_message());
+ } else {
+ SPDLOG_WARN("Bi-directional stream ended. status.code={}, status.message={}", s.error_code(), s.error_message());
+ }
+}
+
+void MetricBidiReactor::write(ExportMetricsServiceRequest request) {
+ {
+ absl::MutexLock lk(&requests_mtx_);
+ requests_.emplace_back(std::move(request));
+ }
+
+ fireWrite();
+}
+
+void MetricBidiReactor::fireWrite() {
+ {
+ absl::MutexLock lk(&requests_mtx_);
+ if (requests_.empty()) {
+ SPDLOG_DEBUG("No more metric data to write");
+ return;
+ }
+ }
+
+ bool expected = false;
+ if (inflight_.compare_exchange_strong(expected, true, std::memory_order_relaxed)) {
+ absl::MutexLock lk(&requests_mtx_);
+ request_.CopyFrom(requests_[0]);
+ requests_.erase(requests_.begin());
+ StartWrite(&request_);
+ fireRead();
+ }
+}
+
+void MetricBidiReactor::fireRead() {
+ bool expected = false;
+ if (read_.compare_exchange_strong(expected, true, std::memory_order_relaxed)) {
+ StartRead(&response_);
+ }
+}
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/main/cpp/stats/OpencensusExporter.cpp b/src/main/cpp/stats/OpencensusExporter.cpp
new file mode 100644
index 0000000..0bf13a4
--- /dev/null
+++ b/src/main/cpp/stats/OpencensusExporter.cpp
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "OpencensusExporter.h"
+
+#include "MetricBidiReactor.h"
+#include "google/protobuf/util/time_util.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+namespace opencensus_proto = opencensus::proto::metrics::v1;
+
+OpencensusExporter::OpencensusExporter(std::string endpoints, std::weak_ptr<Client> client) : client_(client) {
+}
+
+void OpencensusExporter::wrap(const MetricData& data, ExportMetricsServiceRequest& request) {
+ auto metrics = request.mutable_metrics();
+
+ for (const auto& entry : data) {
+ const auto& view_descriptor = entry.first;
+
+ auto metric = new opencensus::proto::metrics::v1::Metric();
+ auto descriptor = metric->mutable_metric_descriptor();
+ descriptor->set_name(view_descriptor.name());
+ descriptor->set_description(view_descriptor.description());
+ descriptor->set_unit(view_descriptor.measure_descriptor().units());
+ switch (view_descriptor.aggregation().type()) {
+ case opencensus::stats::Aggregation::Type::kCount: {
+ descriptor->set_type(opencensus_proto::MetricDescriptor_Type::MetricDescriptor_Type_CUMULATIVE_INT64);
+ break;
+ }
+
+ case opencensus::stats::Aggregation::Type::kSum: {
+ descriptor->set_type(opencensus_proto::MetricDescriptor_Type::MetricDescriptor_Type_CUMULATIVE_INT64);
+ break;
+ }
+
+ case opencensus::stats::Aggregation::Type::kLastValue: {
+ descriptor->set_type(opencensus_proto::MetricDescriptor_Type::MetricDescriptor_Type_GAUGE_INT64);
+ break;
+ }
+
+ case opencensus::stats::Aggregation::Type::kDistribution: {
+ descriptor->set_type(opencensus_proto::MetricDescriptor_Type::MetricDescriptor_Type_GAUGE_DISTRIBUTION);
+ break;
+ }
+ }
+
+ auto label_keys = descriptor->mutable_label_keys();
+ for (const auto& column : view_descriptor.columns()) {
+ auto label_key = new opencensus::proto::metrics::v1::LabelKey;
+ label_key->set_key(column.name());
+ label_keys->AddAllocated(label_key);
+ }
+
+ auto time_series = metric->mutable_timeseries();
+ const auto& view_data = entry.second;
+
+ auto start_times = view_data.start_times();
+
+ switch (view_data.type()) {
+ case opencensus::stats::ViewData::Type::kInt64: {
+ for (const auto& entry : view_data.int_data()) {
+ auto time_series_element = new opencensus::proto::metrics::v1::TimeSeries();
+
+ auto search = start_times.find(entry.first);
+ if (search != start_times.end()) {
+ absl::Time time = search->second;
+ time_series_element->mutable_start_timestamp()->CopyFrom(
+ google::protobuf::util::TimeUtil::TimeTToTimestamp(absl::ToTimeT(time)));
+ }
+
+ auto label_values = time_series_element->mutable_label_values();
+ for (const auto& value : entry.first) {
+ auto label_value = new opencensus::proto::metrics::v1::LabelValue;
+ label_value->set_value(value);
+ label_value->set_has_value(true);
+ label_values->AddAllocated(label_value);
+ }
+
+ auto point = new opencensus::proto::metrics::v1::Point();
+ point->set_int64_value(entry.second);
+ time_series_element->mutable_points()->AddAllocated(point);
+
+ time_series->AddAllocated(time_series_element);
+ }
+ break;
+ }
+ case opencensus::stats::ViewData::Type::kDouble: {
+ for (const auto& entry : view_data.double_data()) {
+ auto time_series_element = new opencensus::proto::metrics::v1::TimeSeries();
+ auto search = start_times.find(entry.first);
+ if (search != start_times.end()) {
+ absl::Time time = search->second;
+ time_series_element->mutable_start_timestamp()->CopyFrom(
+ google::protobuf::util::TimeUtil::TimeTToTimestamp(absl::ToTimeT(time)));
+ }
+
+ auto label_values = time_series_element->mutable_label_values();
+ for (const auto& value : entry.first) {
+ auto label_value = new opencensus::proto::metrics::v1::LabelValue;
+ label_value->set_value(value);
+ label_value->set_has_value(true);
+ label_values->AddAllocated(label_value);
+ }
+
+ auto point = new opencensus::proto::metrics::v1::Point();
+ point->set_double_value(entry.second);
+ time_series_element->mutable_points()->AddAllocated(point);
+
+ time_series->AddAllocated(time_series_element);
+ }
+ break;
+ }
+ case opencensus::stats::ViewData::Type::kDistribution: {
+ for (const auto& entry : view_data.distribution_data()) {
+ auto time_series_element = new opencensus::proto::metrics::v1::TimeSeries();
+ auto search = start_times.find(entry.first);
+ if (search != start_times.end()) {
+ absl::Time time = search->second;
+ time_series_element->mutable_start_timestamp()->CopyFrom(
+ google::protobuf::util::TimeUtil::TimeTToTimestamp(absl::ToTimeT(time)));
+ }
+
+ auto label_values = time_series_element->mutable_label_values();
+ for (const auto& value : entry.first) {
+ auto label_value = new opencensus::proto::metrics::v1::LabelValue;
+ label_value->set_value(value);
+ label_value->set_has_value(true);
+ label_values->AddAllocated(label_value);
+ }
+ auto point = new opencensus::proto::metrics::v1::Point();
+ auto distribution_value = new opencensus::proto::metrics::v1::DistributionValue;
+ point->set_allocated_distribution_value(distribution_value);
+ time_series_element->mutable_points()->AddAllocated(point);
+ distribution_value->set_count(entry.second.count());
+ distribution_value->set_sum_of_squared_deviation(entry.second.sum_of_squared_deviation());
+ for (const auto& cnt : entry.second.bucket_counts()) {
+ auto bucket = new opencensus::proto::metrics::v1::DistributionValue::Bucket();
+ bucket->set_count(cnt);
+ distribution_value->mutable_buckets()->AddAllocated(bucket);
+ }
+
+ auto bucket_options = distribution_value->mutable_bucket_options();
+
+ for (const auto& boundary : entry.second.bucket_boundaries().lower_boundaries()) {
+ bucket_options->mutable_explicit_()->mutable_bounds()->Add(boundary);
+ }
+
+ time_series->AddAllocated(time_series_element);
+ }
+ break;
+ }
+ }
+
+ metrics->AddAllocated(metric);
+ }
+}
+
+void OpencensusExporter::exportMetrics(
+ const std::vector<std::pair<opencensus::stats::ViewDescriptor, opencensus::stats::ViewData>>& data) {
+ opencensus::proto::agent::metrics::v1::ExportMetricsServiceRequest request;
+ wrap(data, request);
+ std::weak_ptr<OpencensusExporter> exporter{shared_from_this()};
+ if (!bidi_reactor_) {
+ bidi_reactor_ = absl::make_unique<MetricBidiReactor>(client_, exporter);
+ }
+ bidi_reactor_->write(request);
+}
+
+void OpencensusExporter::resetStream() {
+ std::weak_ptr<OpencensusExporter> exporter{shared_from_this()};
+ bidi_reactor_.reset(new MetricBidiReactor(client_, exporter));
+}
+
+ROCKETMQ_NAMESPACE_END
diff --git a/src/main/cpp/stats/PublishStats.cpp b/src/main/cpp/stats/PublishStats.cpp
index 9356f98..37ce613 100644
--- a/src/main/cpp/stats/PublishStats.cpp
+++ b/src/main/cpp/stats/PublishStats.cpp
@@ -16,6 +16,8 @@
*/
#include "PublishStats.h"
+#include "Tag.h"
+
ROCKETMQ_NAMESPACE_BEGIN
PublishStats::PublishStats()
@@ -27,15 +29,17 @@ PublishStats::PublishStats()
.set_description("Number of messages published")
.set_measure("publish_success")
.set_aggregation(opencensus::stats::Aggregation::Sum())
- .add_column(topicTag())
+ .add_column(Tag::topicTag())
+ .add_column(Tag::clientIdTag())
.RegisterForExport();
opencensus::stats::ViewDescriptor()
.set_name("rocketmq_send_failure_total")
.set_description("Number of publish failures")
- .set_measure("pubish_failure")
+ .set_measure("publish_failure")
.set_aggregation(opencensus::stats::Aggregation::Sum())
- .add_column(topicTag())
+ .add_column(Tag::topicTag())
+ .add_column(Tag::clientIdTag())
.RegisterForExport();
opencensus::stats::ViewDescriptor()
@@ -43,14 +47,10 @@ PublishStats::PublishStats()
.set_description("Publish latency")
.set_measure("publish_latency")
.set_aggregation(opencensus::stats::Aggregation::Distribution(
- opencensus::stats::BucketBoundaries::Explicit({1, 10, 100, 1000})))
- .add_column(topicTag())
+ opencensus::stats::BucketBoundaries::Explicit({5, 10, 20, 50, 500})))
+ .add_column(Tag::topicTag())
+ .add_column(Tag::clientIdTag())
.RegisterForExport();
}
-opencensus::tags::TagKey& PublishStats::topicTag() {
- static opencensus::tags::TagKey topic_tag = opencensus::tags::TagKey::Register("topic");
- return topic_tag;
-}
-
ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/main/cpp/stats/include/PublishStats.h b/src/main/cpp/stats/Tag.cpp
similarity index 54%
copy from src/main/cpp/stats/include/PublishStats.h
copy to src/main/cpp/stats/Tag.cpp
index 46305d0..9599233 100644
--- a/src/main/cpp/stats/include/PublishStats.h
+++ b/src/main/cpp/stats/Tag.cpp
@@ -14,37 +14,28 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-#pragma once
-
-#include <string>
-
-#include "opencensus/stats/stats.h"
-#include "rocketmq/RocketMQ.h"
+#include "Tag.h"
ROCKETMQ_NAMESPACE_BEGIN
-class PublishStats {
-public:
- PublishStats();
-
- opencensus::stats::MeasureInt64& success() {
- return success_;
- }
-
- opencensus::stats::MeasureInt64& failure() {
- return failure_;
- }
+opencensus::tags::TagKey& Tag::topicTag() {
+ static opencensus::tags::TagKey topic_tag = opencensus::tags::TagKey::Register("topic");
+ return topic_tag;
+}
- opencensus::stats::MeasureInt64& latency() {
- return latency_;
- }
+opencensus::tags::TagKey& Tag::clientIdTag() {
+ static opencensus::tags::TagKey client_id_tag = opencensus::tags::TagKey::Register("client_id");
+ return client_id_tag;
+}
- static opencensus::tags::TagKey& topicTag();
+opencensus::tags::TagKey& Tag::userIdTag() {
+ static opencensus::tags::TagKey uid_tag = opencensus::tags::TagKey::Register("uid");
+ return uid_tag;
+}
-private:
- opencensus::stats::MeasureInt64 success_;
- opencensus::stats::MeasureInt64 failure_;
- opencensus::stats::MeasureInt64 latency_;
-};
+opencensus::tags::TagKey& Tag::deploymentTag() {
+ static opencensus::tags::TagKey deployment_tag = opencensus::tags::TagKey::Register("deployment");
+ return deployment_tag;
+}
-ROCKETMQ_NAMESPACE_END
\ No newline at end of file
+ROCKETMQ_NAMESPACE_END
diff --git a/src/main/cpp/stats/include/ConsumeStats.h b/src/main/cpp/stats/include/ConsumeStats.h
new file mode 100644
index 0000000..a70f324
--- /dev/null
+++ b/src/main/cpp/stats/include/ConsumeStats.h
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include "opencensus/stats/stats.h"
+#include "rocketmq/RocketMQ.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+class ConsumeStats {
+public:
+ ConsumeStats();
+
+ const opencensus::stats::MeasureInt64& processSuccess() const {
+ return process_success_;
+ }
+
+ const opencensus::stats::MeasureInt64& processFailure() const {
+ return process_failure_;
+ }
+
+ const opencensus::stats::MeasureInt64& ackSuccess() const {
+ return ack_success_;
+ }
+
+ const opencensus::stats::MeasureInt64& ackFailure() const {
+ return ack_failure_;
+ }
+
+ const opencensus::stats::MeasureInt64& changeInvisibleTimeSuccess() const {
+ return change_invisible_time_success_;
+ }
+
+ const opencensus::stats::MeasureInt64& changeInvisibleTimeFailure() const {
+ return change_invisible_time_failure_;
+ }
+
+ const opencensus::stats::MeasureInt64& cachedMessageQuantity() const {
+ return cached_message_quantity_;
+ }
+
+ const opencensus::stats::MeasureInt64& cachedMessageBytes() const {
+ return cached_message_bytes_;
+ }
+
+ const opencensus::stats::MeasureInt64& deliveryLatency() const {
+ return delivery_latency_;
+ }
+
+ const opencensus::stats::MeasureInt64& awaitTime() const {
+ return await_time_;
+ }
+
+ const opencensus::stats::MeasureInt64& processTime() const {
+ return process_time_;
+ }
+
+private:
+ opencensus::stats::MeasureInt64 process_success_;
+ opencensus::stats::MeasureInt64 process_failure_;
+ opencensus::stats::MeasureInt64 ack_success_;
+ opencensus::stats::MeasureInt64 ack_failure_;
+ opencensus::stats::MeasureInt64 change_invisible_time_success_;
+ opencensus::stats::MeasureInt64 change_invisible_time_failure_;
+ opencensus::stats::MeasureInt64 cached_message_quantity_;
+ opencensus::stats::MeasureInt64 cached_message_bytes_;
+
+ opencensus::stats::MeasureInt64 delivery_latency_;
+ opencensus::stats::MeasureInt64 await_time_;
+ opencensus::stats::MeasureInt64 process_time_;
+};
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/main/cpp/stats/include/PublishStats.h b/src/main/cpp/stats/include/Exporter.h
similarity index 65%
copy from src/main/cpp/stats/include/PublishStats.h
copy to src/main/cpp/stats/include/Exporter.h
index 46305d0..7f2f24a 100644
--- a/src/main/cpp/stats/include/PublishStats.h
+++ b/src/main/cpp/stats/include/Exporter.h
@@ -16,35 +16,15 @@
*/
#pragma once
-#include <string>
-
#include "opencensus/stats/stats.h"
#include "rocketmq/RocketMQ.h"
ROCKETMQ_NAMESPACE_BEGIN
-class PublishStats {
+class Exporter {
public:
- PublishStats();
-
- opencensus::stats::MeasureInt64& success() {
- return success_;
- }
-
- opencensus::stats::MeasureInt64& failure() {
- return failure_;
- }
-
- opencensus::stats::MeasureInt64& latency() {
- return latency_;
- }
-
- static opencensus::tags::TagKey& topicTag();
-
-private:
- opencensus::stats::MeasureInt64 success_;
- opencensus::stats::MeasureInt64 failure_;
- opencensus::stats::MeasureInt64 latency_;
+ virtual void exportMetrics(
+ const std::vector<std::pair<opencensus::stats::ViewDescriptor, opencensus::stats::ViewData>>& data) = 0;
};
ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/main/cpp/stats/include/MetricBidiReactor.h b/src/main/cpp/stats/include/MetricBidiReactor.h
new file mode 100644
index 0000000..cc08c1d
--- /dev/null
+++ b/src/main/cpp/stats/include/MetricBidiReactor.h
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include "Client.h"
+#include "grpcpp/grpcpp.h"
+#include "grpcpp/impl/codegen/client_callback.h"
+#include "opencensus/proto/agent/metrics/v1/metrics_service.grpc.pb.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+class OpencensusExporter;
+
+using ExportMetricsServiceRequest = opencensus::proto::agent::metrics::v1::ExportMetricsServiceRequest;
+using ExportMetricsServiceResponse = opencensus::proto::agent::metrics::v1::ExportMetricsServiceResponse;
+
+class MetricBidiReactor : public grpc::ClientBidiReactor<ExportMetricsServiceRequest, ExportMetricsServiceResponse> {
+public:
+ MetricBidiReactor(std::weak_ptr<Client> client, std::weak_ptr<OpencensusExporter> exporter);
+
+ /// Notifies the application that a StartRead operation completed.
+ ///
+ /// \param[in] ok Was it successful? If false, no new read/write operation
+ /// will succeed, and any further Start* should not be called.
+ void OnReadDone(bool /*ok*/) override;
+
+ /// Notifies the application that a StartWrite or StartWriteLast operation
+ /// completed.
+ ///
+ /// \param[in] ok Was it successful? If false, no new read/write operation
+ /// will succeed, and any further Start* should not be called.
+ void OnWriteDone(bool /*ok*/) override;
+
+ /// Notifies the application that all operations associated with this RPC
+ /// have completed and all Holds have been removed. OnDone provides the RPC
+ /// status outcome for both successful and failed RPCs and will be called in
+ /// all cases. If it is not called, it indicates an application-level problem
+ /// (like failure to remove a hold).
+ ///
+ /// \param[in] s The status outcome of this RPC
+ void OnDone(const grpc::Status& /*s*/) override;
+
+ void write(ExportMetricsServiceRequest request) LOCKS_EXCLUDED(requests_mtx_);
+
+private:
+ std::weak_ptr<Client> client_;
+ std::weak_ptr<OpencensusExporter> exporter_;
+
+ ExportMetricsServiceRequest request_;
+
+ std::vector<ExportMetricsServiceRequest> requests_ GUARDED_BY(requests_mtx_);
+ absl::Mutex requests_mtx_;
+
+ std::atomic_bool inflight_{false};
+ std::atomic_bool read_{false};
+
+ ExportMetricsServiceResponse response_;
+
+ void fireWrite();
+
+ void fireRead();
+};
+
+ROCKETMQ_NAMESPACE_END
diff --git a/src/main/cpp/stats/include/OpencensusExporter.h b/src/main/cpp/stats/include/OpencensusExporter.h
new file mode 100644
index 0000000..7920ff5
--- /dev/null
+++ b/src/main/cpp/stats/include/OpencensusExporter.h
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include "Client.h"
+#include "Exporter.h"
+#include "grpcpp/grpcpp.h"
+#include "opencensus/proto/agent/metrics/v1/metrics_service.grpc.pb.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+class MetricBidiReactor;
+
+using Stub = opencensus::proto::agent::metrics::v1::MetricsService::Stub;
+using StubPtr = std::unique_ptr<Stub>;
+using MetricData = std::vector<std::pair<opencensus::stats::ViewDescriptor, opencensus::stats::ViewData>>;
+using ExportMetricsServiceRequest = opencensus::proto::agent::metrics::v1::ExportMetricsServiceRequest;
+
+class OpencensusExporter : public Exporter, public std::enable_shared_from_this<OpencensusExporter> {
+public:
+ OpencensusExporter(std::string endpoints, std::weak_ptr<Client> client);
+
+ void exportMetrics(const MetricData& data) override;
+
+ static void wrap(const MetricData& data, ExportMetricsServiceRequest& request);
+
+ void resetStream();
+
+ Stub* stub() {
+ return stub_.get();
+ }
+
+private:
+ std::weak_ptr<Client> client_;
+ StubPtr stub_;
+ std::unique_ptr<MetricBidiReactor> bidi_reactor_;
+};
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/main/cpp/stats/include/PublishStats.h b/src/main/cpp/stats/include/PublishStats.h
index 46305d0..d9d750a 100644
--- a/src/main/cpp/stats/include/PublishStats.h
+++ b/src/main/cpp/stats/include/PublishStats.h
@@ -18,6 +18,7 @@
#include <string>
+#include "Tag.h"
#include "opencensus/stats/stats.h"
#include "rocketmq/RocketMQ.h"
@@ -27,20 +28,18 @@ class PublishStats {
public:
PublishStats();
- opencensus::stats::MeasureInt64& success() {
+ const opencensus::stats::MeasureInt64& success() const {
return success_;
}
- opencensus::stats::MeasureInt64& failure() {
+ const opencensus::stats::MeasureInt64& failure() const {
return failure_;
}
- opencensus::stats::MeasureInt64& latency() {
+ const opencensus::stats::MeasureInt64& latency() const {
return latency_;
}
- static opencensus::tags::TagKey& topicTag();
-
private:
opencensus::stats::MeasureInt64 success_;
opencensus::stats::MeasureInt64 failure_;
diff --git a/src/main/cpp/stats/include/PublishStats.h b/src/main/cpp/stats/include/Tag.h
similarity index 68%
copy from src/main/cpp/stats/include/PublishStats.h
copy to src/main/cpp/stats/include/Tag.h
index 46305d0..ae69fcb 100644
--- a/src/main/cpp/stats/include/PublishStats.h
+++ b/src/main/cpp/stats/include/Tag.h
@@ -16,35 +16,20 @@
*/
#pragma once
-#include <string>
-
#include "opencensus/stats/stats.h"
#include "rocketmq/RocketMQ.h"
ROCKETMQ_NAMESPACE_BEGIN
-class PublishStats {
+class Tag {
public:
- PublishStats();
-
- opencensus::stats::MeasureInt64& success() {
- return success_;
- }
-
- opencensus::stats::MeasureInt64& failure() {
- return failure_;
- }
+ static opencensus::tags::TagKey& topicTag();
- opencensus::stats::MeasureInt64& latency() {
- return latency_;
- }
+ static opencensus::tags::TagKey& clientIdTag();
- static opencensus::tags::TagKey& topicTag();
+ static opencensus::tags::TagKey& userIdTag();
-private:
- opencensus::stats::MeasureInt64 success_;
- opencensus::stats::MeasureInt64 failure_;
- opencensus::stats::MeasureInt64 latency_;
+ static opencensus::tags::TagKey& deploymentTag();
};
ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/main/cpp/stats/tests/PublishStatsTest.cpp b/src/main/cpp/stats/tests/PublishStatsTest.cpp
index 609e634..b6fb7d3 100644
--- a/src/main/cpp/stats/tests/PublishStatsTest.cpp
+++ b/src/main/cpp/stats/tests/PublishStatsTest.cpp
@@ -17,6 +17,7 @@
#include <chrono>
#include <iostream>
+#include <mutex>
#include <thread>
#include "PublishStats.h"
@@ -28,11 +29,33 @@ class Handler : public opencensus::stats::StatsExporter::Handler {
public:
void ExportViewData(
const std::vector<std::pair<opencensus::stats::ViewDescriptor, opencensus::stats::ViewData>>& data) override {
+ std::cout << "================================================================================" << std::endl;
for (const auto& datum : data) {
const auto& view_data = datum.second;
+ const auto& descriptor = datum.first;
+ auto start_times = view_data.start_times();
+ auto columns = descriptor.columns();
+
switch (view_data.type()) {
case opencensus::stats::ViewData::Type::kInt64: {
- exportDatum(datum.first, view_data.start_time(), view_data.end_time(), view_data.int_data());
+ auto data_map = view_data.int_data();
+ for (const auto& entry : data_map) {
+ absl::Time time = start_times[entry.first];
+ std::string line;
+ line.append(absl::FormatTime(time)).append(" ");
+ line.append(descriptor.name());
+ line.append("{");
+ for (std::size_t i = 0; i < columns.size(); i++) {
+ line.append(columns[i].name()).append("=").append(entry.first[i]);
+ if (i < columns.size() - 1) {
+ line.append(", ");
+ } else {
+ line.append("} ==> ");
+ }
+ }
+ line.append(std::to_string(entry.second));
+ println(line);
+ }
break;
}
case opencensus::stats::ViewData::Type::kDouble: {
@@ -40,7 +63,22 @@ public:
break;
}
case opencensus::stats::ViewData::Type::kDistribution: {
- exportDatum(datum.first, view_data.start_time(), view_data.end_time(), view_data.distribution_data());
+ for (const auto& entry : view_data.distribution_data()) {
+ std::string line(descriptor.name());
+ line.append("{");
+ for (std::size_t i = 0; i < columns.size(); i++) {
+ line.append(columns[i].name()).append("=").append(entry.first[i]);
+ if (i < columns.size() - 1) {
+ line.append(", ");
+ } else {
+ line.append("} ==> ");
+ }
+ }
+ line.append(entry.second.DebugString());
+ println(line);
+
+ println(absl::StrJoin(entry.second.bucket_boundaries().lower_boundaries(), ","));
+ }
break;
}
}
@@ -53,7 +91,7 @@ public:
absl::Time end_time,
const opencensus::stats::ViewData::DataMap<T>& data) {
if (data.empty()) {
- std::cout << "No data for " << descriptor.name() << std::endl;
+ // std::cout << "No data for " << descriptor.name() << std::endl;
return;
}
@@ -65,6 +103,12 @@ public:
}
}
+ std::mutex console_mtx;
+ void println(const std::string& line) {
+ std::lock_guard<std::mutex> lk(console_mtx);
+ std::cout << line << std::endl;
+ }
+
// Functions to format data for different aggregation types.
std::string DataToString(double data) {
return absl::StrCat(": ", data, "\n");
@@ -87,12 +131,27 @@ TEST(StatsTest, testBasics) {
std::string t1("T1");
std::string t2("T2");
PublishStats metrics;
+ opencensus::stats::StatsExporter::SetInterval(absl::Seconds(5));
opencensus::stats::StatsExporter::RegisterPushHandler(absl::make_unique<Handler>());
- opencensus::stats::Record({{metrics.success(), 1}}, {{PublishStats::topicTag(), t1}});
- opencensus::stats::Record({{metrics.success(), 100}}, {{PublishStats::topicTag(), t2}});
- opencensus::stats::Record({{metrics.latency(), 100}}, {{PublishStats::topicTag(), t1}});
+ std::atomic_bool stopped{false};
+ auto generator = [&]() {
+ while (!stopped) {
+ opencensus::stats::Record({{metrics.success(), 1}}, {{Tag::topicTag(), t1}, {Tag::clientIdTag(), "client-0"}});
+ opencensus::stats::Record({{metrics.success(), 100}}, {{Tag::topicTag(), t2}, {Tag::clientIdTag(), "client-0"}});
+ for (std::size_t i = 0; i < 10; i++) {
+ opencensus::stats::Record({{metrics.latency(), i * 10}},
+ {{Tag::topicTag(), t1}, {Tag::clientIdTag(), "client-0"}});
+ }
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+ }
+ };
+ std::thread feeder(generator);
std::this_thread::sleep_for(std::chrono::seconds(10));
+ stopped.store(true);
+ if (feeder.joinable()) {
+ feeder.join();
+ }
}
ROCKETMQ_NAMESPACE_END