You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/07/13 06:41:19 UTC
[rocketmq-clients] branch master updated: Support client metric exporting (#39)
This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new c8ed73d Support client metric exporting (#39)
c8ed73d is described below
commit c8ed73d3ef7e37e3fcd11fa6fa66126ffe1ac8ce
Author: Zhanhui Li <li...@gmail.com>
AuthorDate: Wed Jul 13 14:41:15 2022 +0800
Support client metric exporting (#39)
---
cpp/proto/apache/rocketmq/v2/definition.proto | 14 ++-
cpp/proto/apache/rocketmq/v2/service.proto | 19 +---
cpp/src/main/cpp/base/ErrorCategory.cpp | 53 +++++++++-
cpp/src/main/cpp/base/MixAll.cpp | 25 ++---
cpp/src/main/cpp/base/include/MixAll.h | 3 +-
cpp/src/main/cpp/base/tests/MixAllTest.cpp | 11 ++
cpp/src/main/cpp/client/BUILD.bazel | 1 +
cpp/src/main/cpp/client/ClientManagerImpl.cpp | 7 +-
cpp/src/main/cpp/client/LogInterceptor.cpp | 4 +-
cpp/src/main/cpp/client/TelemetryBidiReactor.cpp | 1 -
cpp/src/main/cpp/client/include/ClientConfig.h | 3 +-
cpp/src/main/cpp/client/include/ClientManager.h | 6 ++
cpp/src/main/cpp/rocketmq/ClientImpl.cpp | 47 +++++++++
.../cpp/rocketmq/ConsumeMessageServiceImpl.cpp | 13 ---
cpp/src/main/cpp/rocketmq/ConsumeTask.cpp | 32 +++++-
cpp/src/main/cpp/rocketmq/ProducerImpl.cpp | 16 ++-
cpp/src/main/cpp/rocketmq/PushConsumerImpl.cpp | 6 +-
cpp/src/main/cpp/rocketmq/SendContext.cpp | 14 +--
cpp/src/main/cpp/rocketmq/SimpleConsumerImpl.cpp | 2 +-
cpp/src/main/cpp/rocketmq/include/ClientImpl.h | 3 +-
cpp/src/main/cpp/rocketmq/include/ProducerImpl.h | 5 +-
.../main/cpp/rocketmq/include/PushConsumerImpl.h | 2 +-
.../main/cpp/rocketmq/include/SimpleConsumerImpl.h | 2 +-
cpp/src/main/cpp/stats/ConsumeStats.cpp | 67 +-----------
cpp/src/main/cpp/stats/MetricBidiReactor.cpp | 25 +++--
cpp/src/main/cpp/stats/OpencensusExporter.cpp | 114 ++++++++++-----------
.../stats/{include/Tag.h => OpencensusHandler.cpp} | 20 ++--
cpp/src/main/cpp/stats/PublishStats.cpp | 24 +----
cpp/src/main/cpp/stats/StdoutHandler.cpp | 81 +++++++++++++++
cpp/src/main/cpp/stats/Tag.cpp | 11 +-
cpp/src/main/cpp/stats/include/ConsumeStats.h | 30 ------
cpp/src/main/cpp/stats/include/MetricBidiReactor.h | 1 +
.../main/cpp/stats/include/OpencensusExporter.h | 7 +-
.../include/{Exporter.h => OpencensusHandler.h} | 12 ++-
cpp/src/main/cpp/stats/include/PublishStats.h | 12 +--
cpp/src/main/cpp/stats/include/StdoutHandler.h | 76 ++++++++++++++
cpp/src/main/cpp/stats/include/Tag.h | 4 +-
cpp/src/main/cpp/stats/tests/PublishStatsTest.cpp | 2 -
38 files changed, 459 insertions(+), 316 deletions(-)
diff --git a/cpp/proto/apache/rocketmq/v2/definition.proto b/cpp/proto/apache/rocketmq/v2/definition.proto
index 67520fe..86bb3dd 100644
--- a/cpp/proto/apache/rocketmq/v2/definition.proto
+++ b/cpp/proto/apache/rocketmq/v2/definition.proto
@@ -322,16 +322,20 @@ enum Code {
ILLEGAL_MESSAGE_ID = 40009;
// Format of filter expression is illegal.
ILLEGAL_FILTER_EXPRESSION = 40010;
+ // The invisible time of request is invalid.
+ ILLEGAL_INVISIBLE_TIME = 40011;
+ // The delivery timestamp of message is invalid.
+ ILLEGAL_DELIVERY_TIME = 40012;
// Receipt handle of message is invalid.
- INVALID_RECEIPT_HANDLE = 40011;
+ INVALID_RECEIPT_HANDLE = 40013;
// Message property conflicts with its type.
- MESSAGE_PROPERTY_CONFLICT_WITH_TYPE = 40012;
+ MESSAGE_PROPERTY_CONFLICT_WITH_TYPE = 40014;
// Client type could not be recognized.
- UNRECOGNIZED_CLIENT_TYPE = 40013;
+ UNRECOGNIZED_CLIENT_TYPE = 40015;
// Message is corrupted.
- MESSAGE_CORRUPTED = 40014;
+ MESSAGE_CORRUPTED = 40016;
// Request is rejected due to missing of x-mq-client-id header.
- CLIENT_ID_REQUIRED = 40015;
+ CLIENT_ID_REQUIRED = 40017;
// Generic code indicates that the client request lacks valid authentication
// credentials for the requested resource.
diff --git a/cpp/proto/apache/rocketmq/v2/service.proto b/cpp/proto/apache/rocketmq/v2/service.proto
index c5d4cce..42bf332 100644
--- a/cpp/proto/apache/rocketmq/v2/service.proto
+++ b/cpp/proto/apache/rocketmq/v2/service.proto
@@ -182,8 +182,7 @@ message ThreadStackTrace {
message VerifyMessageCommand {
string nonce = 1;
- MessageQueue message_queue = 2;
- Message message = 3;
+ Message message = 2;
}
message VerifyMessageResult {
@@ -191,9 +190,8 @@ message VerifyMessageResult {
}
message RecoverOrphanedTransactionCommand {
- MessageQueue message_queue = 1;
- Message orphaned_transactional_message = 2;
- string transaction_id = 3;
+ Message message = 1;
+ string transaction_id = 2;
}
message Publishing {
@@ -203,21 +201,14 @@ message Publishing {
// List of topics to which messages will publish to.
repeated Resource topics = 1;
- // Publishing settings below here are from server, it is essential for
- // server to push.
- //
- // Body of message will be deflated if its size in bytes exceeds the
- // threshold.
- int32 compress_body_threshold = 2;
-
// If the message body size exceeds `max_body_size`, broker servers would
// reject the request. As a result, it is advisable that Producer performs
// client-side check validation.
- int32 max_body_size = 3;
+ int32 max_body_size = 2;
// When `validate_message_type` flag set `false`, no need to validate message's type
// with messageQueue's `accept_message_types` before publising.
- bool validate_message_type = 4;
+ bool validate_message_type = 3;
}
message Subscription {
diff --git a/cpp/src/main/cpp/base/ErrorCategory.cpp b/cpp/src/main/cpp/base/ErrorCategory.cpp
index 5ca2528..28390b8 100644
--- a/cpp/src/main/cpp/base/ErrorCategory.cpp
+++ b/cpp/src/main/cpp/base/ErrorCategory.cpp
@@ -68,7 +68,7 @@ std::string ErrorCategory::message(int code) const {
return "State of dependent procedure is not right";
case ErrorCode::TooManyRequests:
- return "Quota exchausted. The user has sent too many requests in a given "
+ return "Quota exhausted. The user has sent too many requests in a given "
"amount of time.";
case ErrorCode::UnavailableForLegalReasons:
@@ -127,6 +127,57 @@ std::string ErrorCategory::message(int code) const {
case ErrorCode::IllegalMessageTag: {
return "Format of message tag is illegal.";
}
+ case ErrorCode::InternalClientError: {
+ return "Internal client error";
+ }
+ case ErrorCode::IllegalMessageKey: {
+ return "Message key is not legal";
+ }
+ case ErrorCode::IllegalMessageProperty: {
+ return "One or multiple message properties is not legal";
+ }
+ case ErrorCode::IllegalMessageGroup: {
+ return "Message group is invalid";
+ }
+ case ErrorCode::InvalidTransactionId: {
+ return "Transaction ID is invalid";
+ }
+ case ErrorCode::IllegalMessageId: {
+ return "Message ID is invalid";
+ }
+ case ErrorCode::IllegalFilterExpression: {
+ return "Filter expression is malformed";
+ }
+ case ErrorCode::InvalidReceiptHandle: {
+ return "Receipt handle is invalid";
+ }
+ case ErrorCode::MessagePropertyConflictWithType: {
+ return "Message property conflicts with message type";
+ }
+ case ErrorCode::UnsupportedClientType: {
+ return "Server does not support the client type claimed";
+ }
+ case ErrorCode::MessageCorrupted: {
+ return "Message is corrupted";
+ }
+ case ErrorCode::ClientIdRequired: {
+ return "x-mq-client-id header meta-data is required";
+ }
+ case ErrorCode::MessageNotFound: {
+ return "No new messages are available at the moment";
+ }
+ case ErrorCode::MessageBodyTooLarge: {
+ return "Size of message body exceeds limits";
+ }
+ case ErrorCode::MessagePropertiesTooLarge: {
+ return "Size of message properties exceeds limits";
+ }
+ case ErrorCode::NotSupported: {
+ return "Action is not supported";
+ }
+ case ErrorCode::VerifyFifoMessageUnsupported: {
+ return "Verify FIFO message is not supported";
+ }
}
return "";
diff --git a/cpp/src/main/cpp/base/MixAll.cpp b/cpp/src/main/cpp/base/MixAll.cpp
index 4d244ce..0514c71 100644
--- a/cpp/src/main/cpp/base/MixAll.cpp
+++ b/cpp/src/main/cpp/base/MixAll.cpp
@@ -19,12 +19,15 @@
#include <chrono>
#include <cstdint>
#include <cstdlib>
+#include <system_error>
+#include "LoggerImpl.h"
#include "absl/random/random.h"
#include "absl/strings/str_split.h"
#include "fmt/format.h"
#include "openssl/md5.h"
#include "openssl/sha.h"
+#include "rocketmq/ErrorCode.h"
#include "zlib.h"
#ifdef _WIN32
@@ -49,7 +52,7 @@ const uint32_t MixAll::DEFAULT_CONSUME_THREAD_POOL_SIZE = 20;
const uint32_t MixAll::DEFAULT_CONSUME_MESSAGE_BATCH_SIZE = 1;
const int32_t MixAll::DEFAULT_MAX_DELIVERY_ATTEMPTS = 16;
-const RE2 MixAll::TOPIC_REGEX("[a-zA-Z0-9\\-_]{3,64}");
+const RE2 MixAll::TOPIC_REGEX("[a-zA-Z0-9\\-_%]{1,64}");
const RE2 MixAll::IP_REGEX("\\d+\\.\\d+\\.\\d+\\.\\d+");
const std::chrono::duration<long long> MixAll::DEFAULT_INVISIBLE_TIME_ = std::chrono::seconds(30);
@@ -138,26 +141,20 @@ const char* MixAll::SPAN_ANNOTATION_AWAIT_CONSUMPTION = "__await_consumption";
const char* MixAll::SPAN_ANNOTATION_MESSAGE_KEYS = "__message_keys";
const char* MixAll::SPAN_ANNOTATION_ATTR_START_TIME = "__start_time";
-bool MixAll::validate(const Message& message) {
+void MixAll::validate(const Message& message, std::error_code& ec) {
if (message.topic().empty()) {
- return false;
+ SPDLOG_WARN("Topic of the message to publish is empty");
+ ec = ErrorCode::IllegalTopic;
+ return;
}
+
const std::string& topic = message.topic();
- // Topic should not start with "CID" or "GID" which are reserved prefix
- if (absl::StartsWith(topic, "CID") || absl::StartsWith(topic, "GID")) {
- return false;
- }
// Legal topic characters are a-z, A-Z, 0-9, hyphen('-') and underline('_')
if (!RE2::FullMatch(topic, TOPIC_REGEX)) {
- return false;
+ SPDLOG_WARN("Topic of the message to publish does not match [a-zA-Z0-9\\-_%]{1,64}");
+ ec = ErrorCode::IllegalTopic;
}
-
- uint32_t body_length = message.body().length();
- if (!body_length || body_length > MAX_MESSAGE_BODY_SIZE) {
- return false;
- }
- return true;
}
uint32_t MixAll::random(uint32_t left, uint32_t right) {
diff --git a/cpp/src/main/cpp/base/include/MixAll.h b/cpp/src/main/cpp/base/include/MixAll.h
index 1483737..f30494f 100644
--- a/cpp/src/main/cpp/base/include/MixAll.h
+++ b/cpp/src/main/cpp/base/include/MixAll.h
@@ -19,6 +19,7 @@
#include <chrono>
#include <cstdint>
#include <string>
+#include <system_error>
#include "absl/strings/string_view.h"
#include "re2/re2.h"
@@ -144,7 +145,7 @@ public:
* @param message
* @return
*/
- static bool validate(const Message& message);
+ static void validate(const Message& message, std::error_code& ec);
static uint32_t random(uint32_t left, uint32_t right);
diff --git a/cpp/src/main/cpp/base/tests/MixAllTest.cpp b/cpp/src/main/cpp/base/tests/MixAllTest.cpp
index e66030e..53b019a 100644
--- a/cpp/src/main/cpp/base/tests/MixAllTest.cpp
+++ b/cpp/src/main/cpp/base/tests/MixAllTest.cpp
@@ -14,7 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+#include <system_error>
+
#include "MixAll.h"
+#include "absl/strings/str_split.h"
#include "gtest/gtest.h"
ROCKETMQ_NAMESPACE_BEGIN
@@ -24,4 +27,12 @@ TEST(MixAllTest, testOsName) {
std::cout << os_name << std::endl;
}
+TEST(MixAllTest, testValidate) {
+ auto message = Message::newBuilder().withTopic("").build();
+ std::error_code ec;
+ MixAll::validate(*message, ec);
+ ASSERT_EQ(true, (bool)ec);
+ ASSERT_TRUE(absl::StrContains(ec.message(), "illegal"));
+}
+
ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/src/main/cpp/client/BUILD.bazel b/cpp/src/main/cpp/client/BUILD.bazel
index c56f279..0af283e 100644
--- a/cpp/src/main/cpp/client/BUILD.bazel
+++ b/cpp/src/main/cpp/client/BUILD.bazel
@@ -37,5 +37,6 @@ cc_library(
"//external:gtest",
],
defines = [
+ "DEBUG_METRIC_EXPORTING",
],
)
\ No newline at end of file
diff --git a/cpp/src/main/cpp/client/ClientManagerImpl.cpp b/cpp/src/main/cpp/client/ClientManagerImpl.cpp
index 1f9eec6..78b13f7 100644
--- a/cpp/src/main/cpp/client/ClientManagerImpl.cpp
+++ b/cpp/src/main/cpp/client/ClientManagerImpl.cpp
@@ -331,7 +331,7 @@ bool ClientManagerImpl::send(const std::string& target_host, const Metadata& met
}
if (State::STARTED != client_manager_ptr->state()) {
- // TODO: Would this leak some memroy?
+ // TODO: Would this leak some memory?
return;
}
@@ -501,10 +501,7 @@ RpcClientSharedPtr ClientManagerImpl::getRpcClient(const std::string& target_hos
} else if (!search->second->ok()) {
SPDLOG_INFO("Prior RPC client to {} is not OK. Re-create one", target_host);
}
- std::vector<std::unique_ptr<grpc::experimental::ClientInterceptorFactoryInterface>> interceptor_factories;
- interceptor_factories.emplace_back(absl::make_unique<LogInterceptorFactory>());
- auto channel = grpc::experimental::CreateCustomChannelWithInterceptors(
- target_host, channel_credential_, channel_arguments_, std::move(interceptor_factories));
+ auto channel = createChannel(target_host);
std::weak_ptr<ClientManager> client_manager(shared_from_this());
client = std::make_shared<RpcClientImpl>(client_manager, channel, target_host, need_heartbeat);
rpc_clients_.insert_or_assign(target_host, client);
diff --git a/cpp/src/main/cpp/client/LogInterceptor.cpp b/cpp/src/main/cpp/client/LogInterceptor.cpp
index 09dbc9f..7702864 100644
--- a/cpp/src/main/cpp/client/LogInterceptor.cpp
+++ b/cpp/src/main/cpp/client/LogInterceptor.cpp
@@ -15,13 +15,15 @@
* limitations under the License.
*/
#include "LogInterceptor.h"
+
+#include <cstddef>
+
#include "InterceptorContinuation.h"
#include "absl/container/flat_hash_map.h"
#include "absl/strings/str_join.h"
#include "google/protobuf/message.h"
#include "rocketmq/Logger.h"
#include "spdlog/spdlog.h"
-#include <cstddef>
ROCKETMQ_NAMESPACE_BEGIN
diff --git a/cpp/src/main/cpp/client/TelemetryBidiReactor.cpp b/cpp/src/main/cpp/client/TelemetryBidiReactor.cpp
index 7479ba2..e88eb13 100644
--- a/cpp/src/main/cpp/client/TelemetryBidiReactor.cpp
+++ b/cpp/src/main/cpp/client/TelemetryBidiReactor.cpp
@@ -244,7 +244,6 @@ void TelemetryBidiReactor::applyBackoffPolicy(const rmq::Settings& settings, std
void TelemetryBidiReactor::applyPublishingConfig(const rmq::Settings& settings, std::shared_ptr<Client> client) {
client->config().publisher.max_body_size = settings.publishing().max_body_size();
- client->config().publisher.compress_body_threshold = settings.publishing().compress_body_threshold();
}
void TelemetryBidiReactor::applySubscriptionConfig(const rmq::Settings& settings, std::shared_ptr<Client> client) {
diff --git a/cpp/src/main/cpp/client/include/ClientConfig.h b/cpp/src/main/cpp/client/include/ClientConfig.h
index bade9a4..58cd1fe 100644
--- a/cpp/src/main/cpp/client/include/ClientConfig.h
+++ b/cpp/src/main/cpp/client/include/ClientConfig.h
@@ -32,7 +32,6 @@ ROCKETMQ_NAMESPACE_BEGIN
struct PublisherConfig {
std::vector<rmq::Resource> topics;
- std::uint32_t compress_body_threshold{4096};
std::uint32_t max_body_size{4194304};
};
@@ -54,7 +53,7 @@ struct ClientConfig {
rmq::ClientType client_type{rmq::ClientType::CLIENT_TYPE_UNSPECIFIED};
RetryPolicy backoff_policy;
// TODO: use std::chrono::milliseconds
- absl::Duration request_timeout;
+ absl::Duration request_timeout{absl::Seconds(3)};
std::string region{"cn-hangzhou"};
std::string resource_namespace;
std::shared_ptr<CredentialsProvider> credentials_provider;
diff --git a/cpp/src/main/cpp/client/include/ClientManager.h b/cpp/src/main/cpp/client/include/ClientManager.h
index 99ec6ad..2ec5b2e 100644
--- a/cpp/src/main/cpp/client/include/ClientManager.h
+++ b/cpp/src/main/cpp/client/include/ClientManager.h
@@ -50,6 +50,12 @@ public:
virtual std::shared_ptr<RpcClient> getRpcClient(const std::string& target_host, bool need_heartbeat) = 0;
+ /**
+ * @brief Create a Channel object
+ *
+ * @param target_host gRPC naming targets, following https://github.com/grpc/grpc/blob/master/doc/naming.md
+ * @return std::shared_ptr<grpc::Channel>
+ */
virtual std::shared_ptr<grpc::Channel> createChannel(const std::string& target_host) = 0;
virtual void resolveRoute(const std::string& target_host, const Metadata& metadata, const QueryRouteRequest& request,
diff --git a/cpp/src/main/cpp/rocketmq/ClientImpl.cpp b/cpp/src/main/cpp/rocketmq/ClientImpl.cpp
index 8275922..61e6ed9 100644
--- a/cpp/src/main/cpp/rocketmq/ClientImpl.cpp
+++ b/cpp/src/main/cpp/rocketmq/ClientImpl.cpp
@@ -16,6 +16,8 @@
*/
#include "ClientImpl.h"
+#include <apache/rocketmq/v2/definition.pb.h>
+
#include <algorithm>
#include <atomic>
#include <chrono>
@@ -37,10 +39,12 @@
#include "NamingScheme.h"
#include "SessionImpl.h"
#include "Signature.h"
+#include "StdoutHandler.h"
#include "UtilAll.h"
#include "absl/strings/numbers.h"
#include "absl/strings/str_join.h"
#include "absl/strings/str_split.h"
+#include "opencensus/stats/stats.h"
#include "rocketmq/Message.h"
#include "rocketmq/MessageListener.h"
@@ -165,6 +169,49 @@ void ClientImpl::start() {
route_update_handle_ = client_manager_->getScheduler()->schedule(route_update_functor, UPDATE_ROUTE_TASK_NAME,
std::chrono::seconds(10), std::chrono::seconds(30));
+
+ auto endpoints = client_config_.metric.endpoints;
+ std::string target;
+ switch (endpoints.scheme()) {
+ case rmq::AddressScheme::IPv4: {
+ target.append("ipv4:");
+ break;
+ }
+ case rmq::AddressScheme::IPv6: {
+ target.append("ipv6:");
+ break;
+ }
+ case rmq::AddressScheme::DOMAIN_NAME: {
+ target.append("dns:");
+ break;
+ }
+ default: {
+ SPDLOG_ERROR("Unknown metric address scheme");
+ }
+ }
+
+ bool first = true;
+ for (const auto& address : endpoints.addresses()) {
+ if (!first) {
+ target.push_back(',');
+ } else {
+ first = false;
+ }
+ target.append(address.host());
+ target.push_back(':');
+ target.append(std::to_string(address.port()));
+ }
+
+ std::weak_ptr<Client> client_weak_ptr(self());
+
+#ifdef DEBUG_METRIC_EXPORTING
+ opencensus::stats::StatsExporter::SetInterval(absl::Seconds(1));
+ opencensus::stats::StatsExporter::RegisterPushHandler(absl::make_unique<StdoutHandler>());
+#else
+ opencensus::stats::StatsExporter::SetInterval(absl::Minutes(1));
+#endif
+ SPDLOG_INFO("Export client metrics to {}", target);
+ opencensus::stats::StatsExporter::RegisterPushHandler(absl::make_unique<OpencensusHandler>(target, client_weak_ptr));
}
void ClientImpl::shutdown() {
diff --git a/cpp/src/main/cpp/rocketmq/ConsumeMessageServiceImpl.cpp b/cpp/src/main/cpp/rocketmq/ConsumeMessageServiceImpl.cpp
index 74d8176..bb56e71 100644
--- a/cpp/src/main/cpp/rocketmq/ConsumeMessageServiceImpl.cpp
+++ b/cpp/src/main/cpp/rocketmq/ConsumeMessageServiceImpl.cpp
@@ -85,18 +85,8 @@ void ConsumeMessageServiceImpl::ack(const Message& message, std::function<void(c
std::weak_ptr<PushConsumerImpl> client(consumer_);
const auto& topic = message.topic();
- // Collect metrics
- opencensus::stats::Record({{consumer->stats().processSuccess(), 1}},
- {{Tag::topicTag(), topic}, {Tag::clientIdTag(), consumer->config().client_id}});
auto callback = [cb, client, topic](const std::error_code& ec) {
auto consumer = client.lock();
- if (ec) {
- opencensus::stats::Record({{consumer->stats().ackFailure(), 1}},
- {{Tag::topicTag(), topic}, {Tag::clientIdTag(), consumer->config().client_id}});
- } else {
- opencensus::stats::Record({{consumer->stats().ackSuccess(), 1}},
- {{Tag::topicTag(), topic}, {Tag::clientIdTag(), consumer->config().client_id}});
- }
cb(ec);
};
@@ -108,9 +98,6 @@ void ConsumeMessageServiceImpl::nack(const Message& message, std::function<void(
if (!consumer) {
return;
}
- // Collect metrics
- opencensus::stats::Record({{consumer->stats().processFailure(), 1}},
- {{Tag::topicTag(), message.topic()}, {Tag::clientIdTag(), consumer->config().client_id}});
consumer->nack(message, cb);
}
diff --git a/cpp/src/main/cpp/rocketmq/ConsumeTask.cpp b/cpp/src/main/cpp/rocketmq/ConsumeTask.cpp
index 69d299a..7af1b28 100644
--- a/cpp/src/main/cpp/rocketmq/ConsumeTask.cpp
+++ b/cpp/src/main/cpp/rocketmq/ConsumeTask.cpp
@@ -21,6 +21,7 @@
#include "LoggerImpl.h"
#include "PushConsumerImpl.h"
#include "Tag.h"
+#include "rocketmq/ConsumeResult.h"
ROCKETMQ_NAMESPACE_BEGIN
@@ -128,18 +129,43 @@ void ConsumeTask::process() {
auto it = messages_.begin();
SPDLOG_DEBUG("Start to process message[message-id={}]", (*it)->id());
svc->preHandle(**it);
+
+ // Collect metrics of await_time
auto await_time = std::chrono::system_clock::now() - (*it)->extension().decode_time;
opencensus::stats::Record(
{{consumer->stats().awaitTime(), MixAll::millisecondsOf(await_time)}},
{{Tag::topicTag(), (*it)->topic()}, {Tag::clientIdTag(), consumer->config().client_id}});
std::chrono::steady_clock::time_point start = std::chrono::steady_clock::now();
+
+ // Invoke user-defined-callback
auto result = listener(**it);
+
+ // Collect metrics of process_time
auto duration = std::chrono::steady_clock::now() - start;
- opencensus::stats::Record(
- {{consumer->stats().processTime(), MixAll::millisecondsOf(duration)}},
- {{Tag::topicTag(), (*it)->topic()}, {Tag::clientIdTag(), consumer->config().client_id}});
+ switch (result) {
+ case ConsumeResult::SUCCESS: {
+ opencensus::stats::Record({{consumer->stats().processTime(), MixAll::millisecondsOf(duration)}},
+ {
+ {Tag::topicTag(), (*it)->topic()},
+ {Tag::clientIdTag(), consumer->config().client_id},
+ {Tag::invocationStatus(), "success"},
+ });
+ break;
+ }
+ case ConsumeResult::FAILURE: {
+ opencensus::stats::Record({{consumer->stats().processTime(), MixAll::millisecondsOf(duration)}},
+ {
+ {Tag::topicTag(), (*it)->topic()},
+ {Tag::clientIdTag(), consumer->config().client_id},
+ {Tag::invocationStatus(), "failure"},
+ });
+ break;
+ }
+ }
+
svc->postHandle(**it, result);
+
switch (result) {
case ConsumeResult::SUCCESS: {
auto callback = std::bind(&ConsumeTask::onAck, self, std::placeholders::_1);
diff --git a/cpp/src/main/cpp/rocketmq/ProducerImpl.cpp b/cpp/src/main/cpp/rocketmq/ProducerImpl.cpp
index 65cc5ba..2cd4399 100644
--- a/cpp/src/main/cpp/rocketmq/ProducerImpl.cpp
+++ b/cpp/src/main/cpp/rocketmq/ProducerImpl.cpp
@@ -101,8 +101,15 @@ void ProducerImpl::ensureRunning(std::error_code& ec) const noexcept {
}
}
-bool ProducerImpl::validate(const Message& message) {
- return MixAll::validate(message);
+void ProducerImpl::validate(const Message& message, std::error_code& ec) {
+ MixAll::validate(message, ec);
+
+ if (!ec) {
+ if (message.body().length() > client_config_.publisher.max_body_size) {
+ SPDLOG_WARN("Body of the message to send is too large");
+ ec = ErrorCode::PayloadTooLarge;
+ }
+ }
}
void ProducerImpl::wrapSendMessageRequest(const Message& message, SendMessageRequest& request,
@@ -338,9 +345,8 @@ void ProducerImpl::sendImpl(std::shared_ptr<SendContext> context) {
void ProducerImpl::send0(MessageConstPtr message, SendCallback callback, std::vector<rmq::MessageQueue> list) {
SendReceipt send_receipt;
std::error_code ec;
-
- if (!validate(*message)) {
- ec = ErrorCode::BadRequest;
+ validate(*message, ec);
+ if (ec) {
callback(ec, send_receipt);
return;
}
diff --git a/cpp/src/main/cpp/rocketmq/PushConsumerImpl.cpp b/cpp/src/main/cpp/rocketmq/PushConsumerImpl.cpp
index 2acff23..a1bfe90 100644
--- a/cpp/src/main/cpp/rocketmq/PushConsumerImpl.cpp
+++ b/cpp/src/main/cpp/rocketmq/PushConsumerImpl.cpp
@@ -44,7 +44,7 @@ PushConsumerImpl::~PushConsumerImpl() {
shutdown();
}
-void PushConsumerImpl::topicsOfInterest(std::vector<std::string>& topics) {
+void PushConsumerImpl::topicsOfInterest(std::vector<std::string> topics) {
absl::MutexLock lk(&topic_filter_expression_table_mtx_);
for (const auto& entry : topic_filter_expression_table_) {
topics.push_back(entry.first);
@@ -214,7 +214,7 @@ void PushConsumerImpl::queryAssignment(
absl::flat_hash_map<std::string, std::string> metadata;
Signature::sign(client_config_, metadata);
auto assignment_callback = [this, cb, topic, broker_host](const std::error_code& ec,
- const QueryAssignmentResponse& response) {
+ const QueryAssignmentResponse& response) {
if (ec) {
SPDLOG_WARN("Failed to acquire queue assignment of topic={} from brokerAddress={}", topic, broker_host);
cb(ec, nullptr);
@@ -559,4 +559,4 @@ void PushConsumerImpl::onVerifyMessage(MessageConstSharedPtr message, std::funct
}
}
-ROCKETMQ_NAMESPACE_END
\ No newline at end of file
+ROCKETMQ_NAMESPACE_END
diff --git a/cpp/src/main/cpp/rocketmq/SendContext.cpp b/cpp/src/main/cpp/rocketmq/SendContext.cpp
index 9dee0e8..f73a30d 100644
--- a/cpp/src/main/cpp/rocketmq/SendContext.cpp
+++ b/cpp/src/main/cpp/rocketmq/SendContext.cpp
@@ -49,12 +49,7 @@ void SendContext::onSuccess(const SendReceipt& send_receipt) noexcept {
{
{Tag::topicTag(), message_->topic()},
{Tag::clientIdTag(), publisher->config().client_id},
- });
-
- opencensus::stats::Record({{publisher->stats().success(), 1}},
- {
- {Tag::topicTag(), message_->topic()},
- {Tag::clientIdTag(), publisher->config().client_id},
+ {Tag::invocationStatus(), "success"},
});
}
@@ -82,12 +77,7 @@ void SendContext::onFailure(const std::error_code& ec) noexcept {
{
{Tag::topicTag(), message_->topic()},
{Tag::clientIdTag(), publisher->config().client_id},
- });
-
- opencensus::stats::Record({{publisher->stats().failure(), 1}},
- {
- {Tag::topicTag(), message_->topic()},
- {Tag::clientIdTag(), publisher->config().client_id},
+ {Tag::invocationStatus(), "failure"},
});
}
diff --git a/cpp/src/main/cpp/rocketmq/SimpleConsumerImpl.cpp b/cpp/src/main/cpp/rocketmq/SimpleConsumerImpl.cpp
index 82e6b42..6a2dc75 100644
--- a/cpp/src/main/cpp/rocketmq/SimpleConsumerImpl.cpp
+++ b/cpp/src/main/cpp/rocketmq/SimpleConsumerImpl.cpp
@@ -67,7 +67,7 @@ void SimpleConsumerImpl::buildClientSettings(rmq::Settings& settings) {
}
}
-void SimpleConsumerImpl::topicsOfInterest(std::vector<std::string>& topics) {
+void SimpleConsumerImpl::topicsOfInterest(std::vector<std::string> topics) {
absl::MutexLock lk(&subscriptions_mtx_);
for (const auto& entry : subscriptions_) {
if (std::find(topics.begin(), topics.end(), entry.first) == topics.end()) {
diff --git a/cpp/src/main/cpp/rocketmq/include/ClientImpl.h b/cpp/src/main/cpp/rocketmq/include/ClientImpl.h
index c136cc9..c095c56 100644
--- a/cpp/src/main/cpp/rocketmq/include/ClientImpl.h
+++ b/cpp/src/main/cpp/rocketmq/include/ClientImpl.h
@@ -30,6 +30,7 @@
#include "InvocationContext.h"
#include "MessageExt.h"
#include "NameServerResolver.h"
+#include "OpencensusHandler.h"
#include "RpcClient.h"
#include "Session.h"
#include "TelemetryBidiReactor.h"
@@ -144,7 +145,7 @@ protected:
absl::flat_hash_map<std::string, std::unique_ptr<Session>> session_map_ GUARDED_BY(session_map_mtx_);
absl::Mutex session_map_mtx_;
- virtual void topicsOfInterest(std::vector<std::string>& topics) {
+ virtual void topicsOfInterest(std::vector<std::string> topics) {
}
void updateRouteInfo() LOCKS_EXCLUDED(topic_route_table_mtx_);
diff --git a/cpp/src/main/cpp/rocketmq/include/ProducerImpl.h b/cpp/src/main/cpp/rocketmq/include/ProducerImpl.h
index a50c6ef..d3d865c 100644
--- a/cpp/src/main/cpp/rocketmq/include/ProducerImpl.h
+++ b/cpp/src/main/cpp/rocketmq/include/ProducerImpl.h
@@ -106,8 +106,7 @@ public:
void buildClientSettings(rmq::Settings& settings) override;
- void topicsOfInterest(std::vector<std::string> topics)
- LOCKS_EXCLUDED(topics_mtx_);
+ void topicsOfInterest(std::vector<std::string> topics) override LOCKS_EXCLUDED(topics_mtx_);
const PublishStats& stats() const { return stats_; }
@@ -152,7 +151,7 @@ private:
void ensureRunning(std::error_code& ec) const noexcept;
- bool validate(const Message& message);
+ void validate(const Message& message, std::error_code& ec);
void send0(MessageConstPtr message, SendCallback callback, std::vector<rmq::MessageQueue> list);
diff --git a/cpp/src/main/cpp/rocketmq/include/PushConsumerImpl.h b/cpp/src/main/cpp/rocketmq/include/PushConsumerImpl.h
index 0067009..95a07f2 100644
--- a/cpp/src/main/cpp/rocketmq/include/PushConsumerImpl.h
+++ b/cpp/src/main/cpp/rocketmq/include/PushConsumerImpl.h
@@ -52,7 +52,7 @@ public:
void prepareHeartbeatData(HeartbeatRequest& request) override;
- void topicsOfInterest(std::vector<std::string>& topics) override LOCKS_EXCLUDED(topic_filter_expression_table_mtx_);
+ void topicsOfInterest(std::vector<std::string> topics) override LOCKS_EXCLUDED(topic_filter_expression_table_mtx_);
void start() override;
diff --git a/cpp/src/main/cpp/rocketmq/include/SimpleConsumerImpl.h b/cpp/src/main/cpp/rocketmq/include/SimpleConsumerImpl.h
index 5a5a1b9..9fac1bb 100644
--- a/cpp/src/main/cpp/rocketmq/include/SimpleConsumerImpl.h
+++ b/cpp/src/main/cpp/rocketmq/include/SimpleConsumerImpl.h
@@ -56,7 +56,7 @@ public:
ChangeInvisibleDurationCallback callback);
protected:
- void topicsOfInterest(std::vector<std::string>& topics) override;
+ void topicsOfInterest(std::vector<std::string> topics) override;
private:
absl::flat_hash_map<std::string, FilterExpression> subscriptions_ GUARDED_BY(subscriptions_mtx_);
diff --git a/cpp/src/main/cpp/stats/ConsumeStats.cpp b/cpp/src/main/cpp/stats/ConsumeStats.cpp
index c95ad2e..df4ee7e 100644
--- a/cpp/src/main/cpp/stats/ConsumeStats.cpp
+++ b/cpp/src/main/cpp/stats/ConsumeStats.cpp
@@ -22,18 +22,7 @@
ROCKETMQ_NAMESPACE_BEGIN
ConsumeStats::ConsumeStats()
- : process_success_(
- opencensus::stats::MeasureInt64::Register("process_success", "Number of messages processed", "1")),
- process_failure_(opencensus::stats::MeasureInt64::Register(
- "process_failure", "Number of failures when processing messages", "1")),
- ack_success_(opencensus::stats::MeasureInt64::Register("ack_success", "Number of messages acknowledged", "1")),
- ack_failure_(opencensus::stats::MeasureInt64::Register(
- "ack_failure", "Number of failures when acknowledging messages", "1")),
- change_invisible_time_success_(opencensus::stats::MeasureInt64::Register(
- "change_invisible_time_success", "Number of change-invisible-time performed", "1")),
- change_invisible_time_failure_(opencensus::stats::MeasureInt64::Register(
- "change_invisible_time_failure", "Number of failures when changing message invisible time", "1")),
- cached_message_quantity_(opencensus::stats::MeasureInt64::Register(
+ : cached_message_quantity_(opencensus::stats::MeasureInt64::Register(
"cached_message_quantity", "Number of locally cached messages", "1")),
cached_message_bytes_(opencensus::stats::MeasureInt64::Register(
"cached_message_bytes", "Number of locally cached messages in bytes", "1")),
@@ -42,60 +31,6 @@ ConsumeStats::ConsumeStats()
await_time_(opencensus::stats::MeasureInt64::Register(
"await_time", "Client side queuing time of messages before getting processed", "1")),
process_time_(opencensus::stats::MeasureInt64::Register("process_time", "Process message time", "1")) {
- opencensus::stats::ViewDescriptor()
- .set_name("rocketmq_process_success_total")
- .set_description("Number of messages processed")
- .set_measure("process_success")
- .set_aggregation(opencensus::stats::Aggregation::Sum())
- .add_column(Tag::topicTag())
- .add_column(Tag::clientIdTag())
- .RegisterForExport();
-
- opencensus::stats::ViewDescriptor()
- .set_name("rocketmq_process_failure_total")
- .set_description("Number of failures on processing messages")
- .set_measure("process_failure")
- .set_aggregation(opencensus::stats::Aggregation::Sum())
- .add_column(Tag::topicTag())
- .add_column(Tag::clientIdTag())
- .RegisterForExport();
-
- opencensus::stats::ViewDescriptor()
- .set_name("rocketmq_ack_success_total")
- .set_description("Number of messages acknowledged")
- .set_measure("ack_success")
- .set_aggregation(opencensus::stats::Aggregation::Sum())
- .add_column(Tag::topicTag())
- .add_column(Tag::clientIdTag())
- .RegisterForExport();
-
- opencensus::stats::ViewDescriptor()
- .set_name("rocketmq_ack_failure_total")
- .set_description("Number of failures on acknowledging messages")
- .set_measure("ack_failure")
- .set_aggregation(opencensus::stats::Aggregation::Sum())
- .add_column(Tag::topicTag())
- .add_column(Tag::clientIdTag())
- .RegisterForExport();
-
- opencensus::stats::ViewDescriptor()
- .set_name("rocketmq_change_invisible_time_success_total")
- .set_description("Number of change-invisible-time operations")
- .set_measure("change_invisible_time_success")
- .set_aggregation(opencensus::stats::Aggregation::Sum())
- .add_column(Tag::topicTag())
- .add_column(Tag::clientIdTag())
- .RegisterForExport();
-
- opencensus::stats::ViewDescriptor()
- .set_name("rocketmq_change_invisible_time_failure_total")
- .set_description("Number of failed change-invisible-time operations")
- .set_measure("change_invisible_time_failure")
- .set_aggregation(opencensus::stats::Aggregation::Sum())
- .add_column(Tag::topicTag())
- .add_column(Tag::clientIdTag())
- .RegisterForExport();
-
opencensus::stats::ViewDescriptor()
.set_name("rocketmq_consumer_cached_messages")
.set_description("Number of messages locally cached")
diff --git a/cpp/src/main/cpp/stats/MetricBidiReactor.cpp b/cpp/src/main/cpp/stats/MetricBidiReactor.cpp
index 62983ee..43e69ed 100644
--- a/cpp/src/main/cpp/stats/MetricBidiReactor.cpp
+++ b/cpp/src/main/cpp/stats/MetricBidiReactor.cpp
@@ -16,6 +16,8 @@
*/
#include "MetricBidiReactor.h"
+#include <chrono>
+
#include "LoggerImpl.h"
#include "OpencensusExporter.h"
#include "Signature.h"
@@ -24,23 +26,22 @@ ROCKETMQ_NAMESPACE_BEGIN
MetricBidiReactor::MetricBidiReactor(std::weak_ptr<Client> client, std::weak_ptr<OpencensusExporter> exporter)
: client_(client), exporter_(exporter) {
- grpc::ClientContext context;
auto ptr = client_.lock();
Metadata metadata;
Signature::sign(ptr->config(), metadata);
for (const auto& entry : metadata) {
- context.AddMetadata(entry.first, entry.second);
+ context_.AddMetadata(entry.first, entry.second);
}
- context.set_deadline(std::chrono::system_clock::now() + absl::ToChronoMilliseconds(ptr->config().request_timeout));
+ context_.set_deadline(std::chrono::system_clock::now() + std::chrono::hours(24));
auto exporter_ptr = exporter_.lock();
if (!exporter_ptr) {
+ SPDLOG_WARN("Exporter has already been destructed");
return;
}
-
- exporter_ptr->stub()->async()->Export(&context, this);
+ exporter_ptr->stub()->async()->Export(&context_, this);
StartCall();
}
@@ -49,7 +50,7 @@ void MetricBidiReactor::OnReadDone(bool ok) {
SPDLOG_WARN("Failed to read response");
return;
}
-
+ SPDLOG_DEBUG("OnReadDone OK");
StartRead(&response_);
}
@@ -58,7 +59,8 @@ void MetricBidiReactor::OnWriteDone(bool ok) {
SPDLOG_WARN("Failed to report metrics");
return;
}
-
+ SPDLOG_DEBUG("OnWriteDone OK");
+ fireRead();
bool expected = true;
if (inflight_.compare_exchange_strong(expected, false, std::memory_order_relaxed)) {
fireWrite();
@@ -75,10 +77,15 @@ void MetricBidiReactor::OnDone(const grpc::Status& s) {
SPDLOG_DEBUG("Bi-directional stream ended. status.code={}, status.message={}", s.error_code(), s.error_message());
} else {
SPDLOG_WARN("Bi-directional stream ended. status.code={}, status.message={}", s.error_code(), s.error_message());
+ auto exporter = exporter_.lock();
+ if (exporter) {
+ exporter->resetStream();
+ }
}
}
void MetricBidiReactor::write(ExportMetricsServiceRequest request) {
+ SPDLOG_DEBUG("Append ExportMetricsServiceRequest to buffer");
{
absl::MutexLock lk(&requests_mtx_);
requests_.emplace_back(std::move(request));
@@ -99,10 +106,10 @@ void MetricBidiReactor::fireWrite() {
bool expected = false;
if (inflight_.compare_exchange_strong(expected, true, std::memory_order_relaxed)) {
absl::MutexLock lk(&requests_mtx_);
- request_.CopyFrom(requests_[0]);
+ request_ = std::move(*requests_.begin());
requests_.erase(requests_.begin());
+ SPDLOG_DEBUG("MetricBidiReactor#StartWrite");
StartWrite(&request_);
- fireRead();
}
}
diff --git a/cpp/src/main/cpp/stats/OpencensusExporter.cpp b/cpp/src/main/cpp/stats/OpencensusExporter.cpp
index 0bf13a4..effe512 100644
--- a/cpp/src/main/cpp/stats/OpencensusExporter.cpp
+++ b/cpp/src/main/cpp/stats/OpencensusExporter.cpp
@@ -17,6 +17,7 @@
#include "OpencensusExporter.h"
+#include "ClientManager.h"
#include "MetricBidiReactor.h"
#include "google/protobuf/util/time_util.h"
@@ -25,6 +26,13 @@ ROCKETMQ_NAMESPACE_BEGIN
namespace opencensus_proto = opencensus::proto::metrics::v1;
OpencensusExporter::OpencensusExporter(std::string endpoints, std::weak_ptr<Client> client) : client_(client) {
+ auto client_shared_ptr = client.lock();
+ if (client_shared_ptr) {
+ auto channel = client_shared_ptr->manager()->createChannel(endpoints);
+ stub_ = opencensus::proto::agent::metrics::v1::MetricsService::NewStub(channel);
+ } else {
+ SPDLOG_ERROR("Failed to initialize OpencensusExporter. weak_ptr to Client is nullptr");
+ }
}
void OpencensusExporter::wrap(const MetricData& data, ExportMetricsServiceRequest& request) {
@@ -33,7 +41,7 @@ void OpencensusExporter::wrap(const MetricData& data, ExportMetricsServiceReques
for (const auto& entry : data) {
const auto& view_descriptor = entry.first;
- auto metric = new opencensus::proto::metrics::v1::Metric();
+ auto metric = absl::make_unique<opencensus::proto::metrics::v1::Metric>();
auto descriptor = metric->mutable_metric_descriptor();
descriptor->set_name(view_descriptor.name());
descriptor->set_description(view_descriptor.description());
@@ -62,116 +70,100 @@ void OpencensusExporter::wrap(const MetricData& data, ExportMetricsServiceReques
auto label_keys = descriptor->mutable_label_keys();
for (const auto& column : view_descriptor.columns()) {
- auto label_key = new opencensus::proto::metrics::v1::LabelKey;
+ auto label_key = absl::make_unique<opencensus::proto::metrics::v1::LabelKey>();
label_key->set_key(column.name());
- label_keys->AddAllocated(label_key);
+ label_keys->AddAllocated(label_key.release());
}
auto time_series = metric->mutable_timeseries();
const auto& view_data = entry.second;
- auto start_times = view_data.start_times();
-
+ // TODO: Opencensus provides end-timestamp of the statistics conducted whilst OpenTelemetry requires
+ // start-timestamp. Let us ignore the difference for now.
+ auto stats_time = google::protobuf::util::TimeUtil::TimeTToTimestamp(absl::ToTimeT(view_data.end_time()));
switch (view_data.type()) {
case opencensus::stats::ViewData::Type::kInt64: {
for (const auto& entry : view_data.int_data()) {
- auto time_series_element = new opencensus::proto::metrics::v1::TimeSeries();
-
- auto search = start_times.find(entry.first);
- if (search != start_times.end()) {
- absl::Time time = search->second;
- time_series_element->mutable_start_timestamp()->CopyFrom(
- google::protobuf::util::TimeUtil::TimeTToTimestamp(absl::ToTimeT(time)));
- }
-
+ auto time_series_element = absl::make_unique<opencensus::proto::metrics::v1::TimeSeries>();
+ time_series_element->mutable_start_timestamp()->CopyFrom(stats_time);
auto label_values = time_series_element->mutable_label_values();
for (const auto& value : entry.first) {
- auto label_value = new opencensus::proto::metrics::v1::LabelValue;
+ auto label_value = absl::make_unique<opencensus::proto::metrics::v1::LabelValue>();
label_value->set_value(value);
label_value->set_has_value(true);
- label_values->AddAllocated(label_value);
+ label_values->AddAllocated(label_value.release());
}
-
- auto point = new opencensus::proto::metrics::v1::Point();
+ auto point = absl::make_unique<opencensus::proto::metrics::v1::Point>();
+ point->mutable_timestamp()->CopyFrom(stats_time);
point->set_int64_value(entry.second);
- time_series_element->mutable_points()->AddAllocated(point);
-
- time_series->AddAllocated(time_series_element);
+ time_series_element->mutable_points()->AddAllocated(point.release());
+ time_series->AddAllocated(time_series_element.release());
}
break;
}
case opencensus::stats::ViewData::Type::kDouble: {
for (const auto& entry : view_data.double_data()) {
- auto time_series_element = new opencensus::proto::metrics::v1::TimeSeries();
- auto search = start_times.find(entry.first);
- if (search != start_times.end()) {
- absl::Time time = search->second;
- time_series_element->mutable_start_timestamp()->CopyFrom(
- google::protobuf::util::TimeUtil::TimeTToTimestamp(absl::ToTimeT(time)));
- }
-
+ auto time_series_element = absl::make_unique<opencensus::proto::metrics::v1::TimeSeries>();
+ time_series_element->mutable_start_timestamp()->CopyFrom(stats_time);
auto label_values = time_series_element->mutable_label_values();
for (const auto& value : entry.first) {
- auto label_value = new opencensus::proto::metrics::v1::LabelValue;
+ auto label_value = absl::make_unique<opencensus::proto::metrics::v1::LabelValue>();
label_value->set_value(value);
label_value->set_has_value(true);
- label_values->AddAllocated(label_value);
+ label_values->AddAllocated(label_value.release());
}
-
- auto point = new opencensus::proto::metrics::v1::Point();
+ auto point = absl::make_unique<opencensus::proto::metrics::v1::Point>();
+ point->mutable_timestamp()->CopyFrom(stats_time);
point->set_double_value(entry.second);
- time_series_element->mutable_points()->AddAllocated(point);
-
- time_series->AddAllocated(time_series_element);
+ time_series_element->mutable_points()->AddAllocated(point.release());
+ time_series->AddAllocated(time_series_element.release());
}
break;
}
case opencensus::stats::ViewData::Type::kDistribution: {
for (const auto& entry : view_data.distribution_data()) {
- auto time_series_element = new opencensus::proto::metrics::v1::TimeSeries();
- auto search = start_times.find(entry.first);
- if (search != start_times.end()) {
- absl::Time time = search->second;
- time_series_element->mutable_start_timestamp()->CopyFrom(
- google::protobuf::util::TimeUtil::TimeTToTimestamp(absl::ToTimeT(time)));
- }
-
+ auto time_series_element = absl::make_unique<opencensus::proto::metrics::v1::TimeSeries>();
+ time_series_element->mutable_start_timestamp()->CopyFrom(stats_time);
auto label_values = time_series_element->mutable_label_values();
for (const auto& value : entry.first) {
- auto label_value = new opencensus::proto::metrics::v1::LabelValue;
+ auto label_value = absl::make_unique<opencensus::proto::metrics::v1::LabelValue>();
label_value->set_value(value);
label_value->set_has_value(true);
- label_values->AddAllocated(label_value);
+ label_values->AddAllocated(label_value.release());
}
- auto point = new opencensus::proto::metrics::v1::Point();
- auto distribution_value = new opencensus::proto::metrics::v1::DistributionValue;
- point->set_allocated_distribution_value(distribution_value);
- time_series_element->mutable_points()->AddAllocated(point);
+ auto point = absl::make_unique<opencensus::proto::metrics::v1::Point>();
+ point->mutable_timestamp()->CopyFrom(stats_time);
+ auto distribution_value = absl::make_unique<opencensus::proto::metrics::v1::DistributionValue>();
distribution_value->set_count(entry.second.count());
distribution_value->set_sum_of_squared_deviation(entry.second.sum_of_squared_deviation());
+ distribution_value->set_sum(entry.second.count() * entry.second.mean());
for (const auto& cnt : entry.second.bucket_counts()) {
- auto bucket = new opencensus::proto::metrics::v1::DistributionValue::Bucket();
+ auto bucket = absl::make_unique<opencensus::proto::metrics::v1::DistributionValue::Bucket>();
bucket->set_count(cnt);
- distribution_value->mutable_buckets()->AddAllocated(bucket);
+ distribution_value->mutable_buckets()->AddAllocated(bucket.release());
}
-
auto bucket_options = distribution_value->mutable_bucket_options();
-
for (const auto& boundary : entry.second.bucket_boundaries().lower_boundaries()) {
bucket_options->mutable_explicit_()->mutable_bounds()->Add(boundary);
}
+ point->set_allocated_distribution_value(distribution_value.release());
+ time_series_element->mutable_points()->AddAllocated(point.release());
- time_series->AddAllocated(time_series_element);
+ time_series->AddAllocated(time_series_element.release());
}
break;
}
}
- metrics->AddAllocated(metric);
+ if (time_series->empty()) {
+ continue;
+ }
+
+ metrics->AddAllocated(metric.release());
}
}
-void OpencensusExporter::exportMetrics(
+void OpencensusExporter::ExportViewData(
const std::vector<std::pair<opencensus::stats::ViewDescriptor, opencensus::stats::ViewData>>& data) {
opencensus::proto::agent::metrics::v1::ExportMetricsServiceRequest request;
wrap(data, request);
@@ -179,7 +171,13 @@ void OpencensusExporter::exportMetrics(
if (!bidi_reactor_) {
bidi_reactor_ = absl::make_unique<MetricBidiReactor>(client_, exporter);
}
- bidi_reactor_->write(request);
+
+ if (request.metrics_size()) {
+ SPDLOG_DEBUG("ExportMetricRequest: {}", request.DebugString());
+ bidi_reactor_->write(request);
+ } else {
+ SPDLOG_DEBUG("ExportMetricRequest contains no valid metric");
+ }
}
void OpencensusExporter::resetStream() {
diff --git a/cpp/src/main/cpp/stats/include/Tag.h b/cpp/src/main/cpp/stats/OpencensusHandler.cpp
similarity index 69%
copy from cpp/src/main/cpp/stats/include/Tag.h
copy to cpp/src/main/cpp/stats/OpencensusHandler.cpp
index ae69fcb..422108d 100644
--- a/cpp/src/main/cpp/stats/include/Tag.h
+++ b/cpp/src/main/cpp/stats/OpencensusHandler.cpp
@@ -14,22 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-#pragma once
+#include "OpencensusHandler.h"
-#include "opencensus/stats/stats.h"
-#include "rocketmq/RocketMQ.h"
+#include "MetricBidiReactor.h"
ROCKETMQ_NAMESPACE_BEGIN
-class Tag {
-public:
- static opencensus::tags::TagKey& topicTag();
+OpencensusHandler::OpencensusHandler(std::string endpoints, std::weak_ptr<Client> client)
+ : exporter_(std::make_shared<OpencensusExporter>(std::move(endpoints), std::move(client))) {
+}
- static opencensus::tags::TagKey& clientIdTag();
-
- static opencensus::tags::TagKey& userIdTag();
-
- static opencensus::tags::TagKey& deploymentTag();
-};
+void OpencensusHandler::ExportViewData(const MetricData& data) {
+ exporter_->ExportViewData(data);
+}
ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/src/main/cpp/stats/PublishStats.cpp b/cpp/src/main/cpp/stats/PublishStats.cpp
index a012515..0338efe 100644
--- a/cpp/src/main/cpp/stats/PublishStats.cpp
+++ b/cpp/src/main/cpp/stats/PublishStats.cpp
@@ -21,33 +21,13 @@
ROCKETMQ_NAMESPACE_BEGIN
PublishStats::PublishStats()
- : success_(opencensus::stats::MeasureInt64::Register("publish_success", "Number of message published", "1")),
- failure_(opencensus::stats::MeasureInt64::Register("publish_failure", "Number of publish failures", "1")),
- latency_(opencensus::stats::MeasureInt64::Register("publish_latency", "Publish latency in milliseconds", "ms")) {
- opencensus::stats::ViewDescriptor()
- .set_name("rocketmq_send_success_total")
- .set_description("Number of messages published")
- .set_measure("publish_success")
- .set_aggregation(opencensus::stats::Aggregation::Sum())
- .add_column(Tag::topicTag())
- .add_column(Tag::clientIdTag())
- .RegisterForExport();
-
- opencensus::stats::ViewDescriptor()
- .set_name("rocketmq_send_failure_total")
- .set_description("Number of publish failures")
- .set_measure("publish_failure")
- .set_aggregation(opencensus::stats::Aggregation::Sum())
- .add_column(Tag::topicTag())
- .add_column(Tag::clientIdTag())
- .RegisterForExport();
-
+ : latency_(opencensus::stats::MeasureInt64::Register("publish_latency", "Publish latency in milliseconds", "ms")) {
opencensus::stats::ViewDescriptor()
.set_name("rocketmq_send_cost_time")
.set_description("Publish latency")
.set_measure("publish_latency")
.set_aggregation(opencensus::stats::Aggregation::Distribution(
- opencensus::stats::BucketBoundaries::Explicit({5, 10, 20, 50, 500})))
+ opencensus::stats::BucketBoundaries::Explicit({1, 5, 10, 20, 50, 200, 500})))
.add_column(Tag::topicTag())
.add_column(Tag::clientIdTag())
.RegisterForExport();
diff --git a/cpp/src/main/cpp/stats/StdoutHandler.cpp b/cpp/src/main/cpp/stats/StdoutHandler.cpp
new file mode 100644
index 0000000..9d18d78
--- /dev/null
+++ b/cpp/src/main/cpp/stats/StdoutHandler.cpp
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "StdoutHandler.h"
+
+#include "rocketmq/RocketMQ.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+void StdoutHandler::ExportViewData(
+ const std::vector<std::pair<opencensus::stats::ViewDescriptor, opencensus::stats::ViewData>>& data) {
+ for (const auto& datum : data) {
+ const auto& view_data = datum.second;
+ const auto& descriptor = datum.first;
+ auto start_times = view_data.start_times();
+ auto columns = descriptor.columns();
+
+ switch (view_data.type()) {
+ case opencensus::stats::ViewData::Type::kInt64: {
+ auto data_map = view_data.int_data();
+ for (const auto& entry : data_map) {
+ absl::Time time = start_times[entry.first];
+ std::string line;
+ line.append(absl::FormatTime(time)).append(" ");
+ line.append(descriptor.name());
+ line.append("{");
+ for (std::size_t i = 0; i < columns.size(); i++) {
+ line.append(columns[i].name()).append("=").append(entry.first[i]);
+ if (i < columns.size() - 1) {
+ line.append(", ");
+ } else {
+ line.append("} ==> ");
+ }
+ }
+ line.append(std::to_string(entry.second));
+ println(line);
+ }
+ break;
+ }
+ case opencensus::stats::ViewData::Type::kDouble: {
+ exportDatum(datum.first, view_data.start_time(), view_data.end_time(), view_data.double_data());
+ break;
+ }
+ case opencensus::stats::ViewData::Type::kDistribution: {
+ for (const auto& entry : view_data.distribution_data()) {
+ std::string line(descriptor.name());
+ line.append("{");
+ for (std::size_t i = 0; i < columns.size(); i++) {
+ line.append(columns[i].name()).append("=").append(entry.first[i]);
+ if (i < columns.size() - 1) {
+ line.append(", ");
+ } else {
+ line.append("} ==> ");
+ }
+ }
+ line.append(entry.second.DebugString());
+ println(line);
+
+ println(absl::StrJoin(entry.second.bucket_boundaries().lower_boundaries(), ","));
+ }
+ break;
+ }
+ }
+ }
+}
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/src/main/cpp/stats/Tag.cpp b/cpp/src/main/cpp/stats/Tag.cpp
index 9599233..62f2076 100644
--- a/cpp/src/main/cpp/stats/Tag.cpp
+++ b/cpp/src/main/cpp/stats/Tag.cpp
@@ -28,14 +28,9 @@ opencensus::tags::TagKey& Tag::clientIdTag() {
return client_id_tag;
}
-opencensus::tags::TagKey& Tag::userIdTag() {
- static opencensus::tags::TagKey uid_tag = opencensus::tags::TagKey::Register("uid");
- return uid_tag;
-}
-
-opencensus::tags::TagKey& Tag::deploymentTag() {
- static opencensus::tags::TagKey deployment_tag = opencensus::tags::TagKey::Register("deployment");
- return deployment_tag;
+opencensus::tags::TagKey& Tag::invocationStatus() {
+ static opencensus::tags::TagKey invocation_status = opencensus::tags::TagKey::Register("invocation_status");
+ return invocation_status;
}
ROCKETMQ_NAMESPACE_END
diff --git a/cpp/src/main/cpp/stats/include/ConsumeStats.h b/cpp/src/main/cpp/stats/include/ConsumeStats.h
index a70f324..470a12d 100644
--- a/cpp/src/main/cpp/stats/include/ConsumeStats.h
+++ b/cpp/src/main/cpp/stats/include/ConsumeStats.h
@@ -25,30 +25,6 @@ class ConsumeStats {
public:
ConsumeStats();
- const opencensus::stats::MeasureInt64& processSuccess() const {
- return process_success_;
- }
-
- const opencensus::stats::MeasureInt64& processFailure() const {
- return process_failure_;
- }
-
- const opencensus::stats::MeasureInt64& ackSuccess() const {
- return ack_success_;
- }
-
- const opencensus::stats::MeasureInt64& ackFailure() const {
- return ack_failure_;
- }
-
- const opencensus::stats::MeasureInt64& changeInvisibleTimeSuccess() const {
- return change_invisible_time_success_;
- }
-
- const opencensus::stats::MeasureInt64& changeInvisibleTimeFailure() const {
- return change_invisible_time_failure_;
- }
-
const opencensus::stats::MeasureInt64& cachedMessageQuantity() const {
return cached_message_quantity_;
}
@@ -70,12 +46,6 @@ public:
}
private:
- opencensus::stats::MeasureInt64 process_success_;
- opencensus::stats::MeasureInt64 process_failure_;
- opencensus::stats::MeasureInt64 ack_success_;
- opencensus::stats::MeasureInt64 ack_failure_;
- opencensus::stats::MeasureInt64 change_invisible_time_success_;
- opencensus::stats::MeasureInt64 change_invisible_time_failure_;
opencensus::stats::MeasureInt64 cached_message_quantity_;
opencensus::stats::MeasureInt64 cached_message_bytes_;
diff --git a/cpp/src/main/cpp/stats/include/MetricBidiReactor.h b/cpp/src/main/cpp/stats/include/MetricBidiReactor.h
index cc08c1d..0a10cd8 100644
--- a/cpp/src/main/cpp/stats/include/MetricBidiReactor.h
+++ b/cpp/src/main/cpp/stats/include/MetricBidiReactor.h
@@ -59,6 +59,7 @@ public:
private:
std::weak_ptr<Client> client_;
std::weak_ptr<OpencensusExporter> exporter_;
+ grpc::ClientContext context_;
ExportMetricsServiceRequest request_;
diff --git a/cpp/src/main/cpp/stats/include/OpencensusExporter.h b/cpp/src/main/cpp/stats/include/OpencensusExporter.h
index 7920ff5..161843c 100644
--- a/cpp/src/main/cpp/stats/include/OpencensusExporter.h
+++ b/cpp/src/main/cpp/stats/include/OpencensusExporter.h
@@ -17,9 +17,9 @@
#pragma once
#include "Client.h"
-#include "Exporter.h"
#include "grpcpp/grpcpp.h"
#include "opencensus/proto/agent/metrics/v1/metrics_service.grpc.pb.h"
+#include "opencensus/stats/stats.h"
ROCKETMQ_NAMESPACE_BEGIN
@@ -30,11 +30,12 @@ using StubPtr = std::unique_ptr<Stub>;
using MetricData = std::vector<std::pair<opencensus::stats::ViewDescriptor, opencensus::stats::ViewData>>;
using ExportMetricsServiceRequest = opencensus::proto::agent::metrics::v1::ExportMetricsServiceRequest;
-class OpencensusExporter : public Exporter, public std::enable_shared_from_this<OpencensusExporter> {
+class OpencensusExporter : public opencensus::stats::StatsExporter::Handler,
+ public std::enable_shared_from_this<OpencensusExporter> {
public:
OpencensusExporter(std::string endpoints, std::weak_ptr<Client> client);
- void exportMetrics(const MetricData& data) override;
+ void ExportViewData(const MetricData& data) override;
static void wrap(const MetricData& data, ExportMetricsServiceRequest& request);
diff --git a/cpp/src/main/cpp/stats/include/Exporter.h b/cpp/src/main/cpp/stats/include/OpencensusHandler.h
similarity index 73%
rename from cpp/src/main/cpp/stats/include/Exporter.h
rename to cpp/src/main/cpp/stats/include/OpencensusHandler.h
index 7f2f24a..5c6ec1f 100644
--- a/cpp/src/main/cpp/stats/include/Exporter.h
+++ b/cpp/src/main/cpp/stats/include/OpencensusHandler.h
@@ -16,15 +16,17 @@
*/
#pragma once
-#include "opencensus/stats/stats.h"
-#include "rocketmq/RocketMQ.h"
+#include "OpencensusExporter.h"
ROCKETMQ_NAMESPACE_BEGIN
-class Exporter {
+class OpencensusHandler : public opencensus::stats::StatsExporter::Handler {
public:
- virtual void exportMetrics(
- const std::vector<std::pair<opencensus::stats::ViewDescriptor, opencensus::stats::ViewData>>& data) = 0;
+ OpencensusHandler(std::string endpoints, std::weak_ptr<Client> client);
+
+ void ExportViewData(const MetricData& data) override;
+
+ std::shared_ptr<OpencensusExporter> exporter_;
};
ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/src/main/cpp/stats/include/PublishStats.h b/cpp/src/main/cpp/stats/include/PublishStats.h
index d9d750a..3872808 100644
--- a/cpp/src/main/cpp/stats/include/PublishStats.h
+++ b/cpp/src/main/cpp/stats/include/PublishStats.h
@@ -27,22 +27,12 @@ ROCKETMQ_NAMESPACE_BEGIN
class PublishStats {
public:
PublishStats();
-
- const opencensus::stats::MeasureInt64& success() const {
- return success_;
- }
-
- const opencensus::stats::MeasureInt64& failure() const {
- return failure_;
- }
-
+
const opencensus::stats::MeasureInt64& latency() const {
return latency_;
}
private:
- opencensus::stats::MeasureInt64 success_;
- opencensus::stats::MeasureInt64 failure_;
opencensus::stats::MeasureInt64 latency_;
};
diff --git a/cpp/src/main/cpp/stats/include/StdoutHandler.h b/cpp/src/main/cpp/stats/include/StdoutHandler.h
new file mode 100644
index 0000000..1bfc3dc
--- /dev/null
+++ b/cpp/src/main/cpp/stats/include/StdoutHandler.h
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <mutex>
+#include <string>
+
+#include "opencensus/stats/stats.h"
+#include "rocketmq/RocketMQ.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+class StdoutHandler : public opencensus::stats::StatsExporter::Handler {
+public:
+ void ExportViewData(
+ const std::vector<std::pair<opencensus::stats::ViewDescriptor, opencensus::stats::ViewData>>& data) override;
+
+private:
+ template <typename T>
+ void exportDatum(const opencensus::stats::ViewDescriptor& descriptor,
+ absl::Time start_time,
+ absl::Time end_time,
+ const opencensus::stats::ViewData::DataMap<T>& data) {
+ if (data.empty()) {
+ // std::cout << "No data for " << descriptor.name() << std::endl;
+ return;
+ }
+
+ for (const auto& row : data) {
+ for (std::size_t column = 0; column < descriptor.columns().size(); column++) {
+ std::cout << descriptor.name() << "[" << descriptor.columns()[column].name() << "=" << row.first[column] << "]"
+ << dataToString(row.second) << std::endl;
+ }
+ }
+ }
+
+ std::mutex console_mtx_;
+
+ void println(const std::string& line) {
+ std::lock_guard<std::mutex> lk(console_mtx_);
+ std::cout << line << std::endl;
+ }
+
+ // Functions to format data for different aggregation types.
+ std::string dataToString(double data) {
+ return absl::StrCat(": ", data, "\n");
+ }
+ std::string dataToString(int64_t data) {
+ return absl::StrCat(": ", data, "\n");
+ }
+ std::string dataToString(const opencensus::stats::Distribution& data) {
+ std::string output = "\n";
+ std::vector<std::string> lines = absl::StrSplit(data.DebugString(), '\n');
+ // Add indent.
+ for (const auto& line : lines) {
+ absl::StrAppend(&output, " ", line, "\n");
+ }
+ return output;
+ }
+};
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/src/main/cpp/stats/include/Tag.h b/cpp/src/main/cpp/stats/include/Tag.h
index ae69fcb..df78650 100644
--- a/cpp/src/main/cpp/stats/include/Tag.h
+++ b/cpp/src/main/cpp/stats/include/Tag.h
@@ -27,9 +27,7 @@ public:
static opencensus::tags::TagKey& clientIdTag();
- static opencensus::tags::TagKey& userIdTag();
-
- static opencensus::tags::TagKey& deploymentTag();
+ static opencensus::tags::TagKey& invocationStatus();
};
ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/src/main/cpp/stats/tests/PublishStatsTest.cpp b/cpp/src/main/cpp/stats/tests/PublishStatsTest.cpp
index b6fb7d3..7c4d997 100644
--- a/cpp/src/main/cpp/stats/tests/PublishStatsTest.cpp
+++ b/cpp/src/main/cpp/stats/tests/PublishStatsTest.cpp
@@ -136,8 +136,6 @@ TEST(StatsTest, testBasics) {
std::atomic_bool stopped{false};
auto generator = [&]() {
while (!stopped) {
- opencensus::stats::Record({{metrics.success(), 1}}, {{Tag::topicTag(), t1}, {Tag::clientIdTag(), "client-0"}});
- opencensus::stats::Record({{metrics.success(), 100}}, {{Tag::topicTag(), t2}, {Tag::clientIdTag(), "client-0"}});
for (std::size_t i = 0; i < 10; i++) {
opencensus::stats::Record({{metrics.latency(), i * 10}},
{{Tag::topicTag(), t1}, {Tag::clientIdTag(), "client-0"}});