You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2021/10/22 10:17:14 UTC
[rocketmq-client-cpp] branch main updated: Implement trace for
standard consumption (#378)
This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 6548bab Implement trace for standard consumption (#378)
6548bab is described below
commit 6548babfd7dbcacce86416bfca2850391a7bfd76
Author: aaron ai <ya...@gmail.com>
AuthorDate: Fri Oct 22 18:17:10 2021 +0800
Implement trace for standard consumption (#378)
---
src/main/cpp/base/MixAll.cpp | 7 +---
src/main/cpp/base/include/MixAll.h | 7 +---
.../cpp/rocketmq/ConsumeStandardMessageService.cpp | 44 +++++++++++-----------
src/main/cpp/rocketmq/ProducerImpl.cpp | 12 +++---
src/main/cpp/tracing/exporters/OtlpExporter.cpp | 8 ++--
5 files changed, 37 insertions(+), 41 deletions(-)
diff --git a/src/main/cpp/base/MixAll.cpp b/src/main/cpp/base/MixAll.cpp
index 93d2d38..ba69ce4 100644
--- a/src/main/cpp/base/MixAll.cpp
+++ b/src/main/cpp/base/MixAll.cpp
@@ -66,6 +66,8 @@ const uint32_t MixAll::DEFAULT_COMPRESS_BODY_THRESHOLD_ = 1024 * 1024 * 4;
const char* MixAll::HOME_PROFILE_ENV_ = "HOME";
const char* MixAll::MESSAGE_KEY_SEPARATOR = " ";
+const char* MixAll::OTLP_NAME_VALUE = "org.apache.rocketmq.message";
+
const char* MixAll::TRACE_RESOURCE_ATTRIBUTE_KEY_TELEMETRY_SDK_LANGUAGE = "telemetry.sdk.language";
const char* MixAll::TRACE_RESOURCE_ATTRIBUTE_VALUE_TELEMETRY_SDK_LANGUAGE = "cpp";
@@ -73,11 +75,7 @@ const char* MixAll::TRACE_RESOURCE_ATTRIBUTE_KEY_HOST_NAME = "host.name";
const char* MixAll::TRACE_RESOURCE_ATTRIBUTE_KEY_SERVICE_NAME = "service.name";
const char* MixAll::TRACE_RESOURCE_ATTRIBUTE_VALUE_SERVICE_NAME = "rocketmq-client";
-// Span name list
-const char* MixAll::SPAN_NAME_SEND_MESSAGE = "SendMessage";
const char* MixAll::SPAN_NAME_END_TRANSACTION = "EndTransaction";
-const char* MixAll::SPAN_NAME_AWAIT_CONSUMPTION = "AwaitingConsumption";
-const char* MixAll::SPAN_NAME_CONSUME_MESSAGE = "ConsumeMessage";
const char* MixAll::SPAN_NAME_PULL_MESSAGE = "PullMessage";
// Span attributes follows to the opentelemetry specification, refers to:
@@ -133,7 +131,6 @@ const char* MixAll::SPAN_ATTRIBUTE_VALUE_MESSAGING_SEND_OPERATION = "send";
const char* MixAll::SPAN_ATTRIBUTE_VALUE_MESSAGING_RECEIVE_OPERATION = "receive";
const char* MixAll::SPAN_ATTRIBUTE_VALUE_MESSAGING_PROCESS_OPERATION = "process";
-const char* MixAll::SPAN_ATTRIBUTE_KEY_HOST_NAME = "host.name";
const char* MixAll::SPAN_ATTRIBUTE_KEY_TRANSACTION_RESOLUTION = "commitAction";
// Span annotation
diff --git a/src/main/cpp/base/include/MixAll.h b/src/main/cpp/base/include/MixAll.h
index 8a47ecf..d25f4b8 100644
--- a/src/main/cpp/base/include/MixAll.h
+++ b/src/main/cpp/base/include/MixAll.h
@@ -61,6 +61,8 @@ public:
static const char* MESSAGE_KEY_SEPARATOR;
+ static const char* OTLP_NAME_VALUE;
+
static const char* TRACE_RESOURCE_ATTRIBUTE_KEY_TELEMETRY_SDK_LANGUAGE;
static const char* TRACE_RESOURCE_ATTRIBUTE_VALUE_TELEMETRY_SDK_LANGUAGE;
@@ -68,11 +70,7 @@ public:
static const char* TRACE_RESOURCE_ATTRIBUTE_KEY_SERVICE_NAME;
static const char* TRACE_RESOURCE_ATTRIBUTE_VALUE_SERVICE_NAME;
- // Tracing span name list
- static const char* SPAN_NAME_SEND_MESSAGE;
static const char* SPAN_NAME_END_TRANSACTION;
- static const char* SPAN_NAME_AWAIT_CONSUMPTION;
- static const char* SPAN_NAME_CONSUME_MESSAGE;
static const char* SPAN_NAME_PULL_MESSAGE;
// RocketMQ span attribute name list
@@ -125,7 +123,6 @@ public:
static const char* SPAN_ATTRIBUTE_VALUE_MESSAGING_RECEIVE_OPERATION;
static const char* SPAN_ATTRIBUTE_VALUE_MESSAGING_PROCESS_OPERATION;
- static const char* SPAN_ATTRIBUTE_KEY_HOST_NAME;
static const char* SPAN_ATTRIBUTE_KEY_TRANSACTION_RESOLUTION;
// Tracing annotation
diff --git a/src/main/cpp/rocketmq/ConsumeStandardMessageService.cpp b/src/main/cpp/rocketmq/ConsumeStandardMessageService.cpp
index b13b34a..46ef575 100644
--- a/src/main/cpp/rocketmq/ConsumeStandardMessageService.cpp
+++ b/src/main/cpp/rocketmq/ConsumeStandardMessageService.cpp
@@ -19,6 +19,7 @@
#include <system_error>
#include <utility>
+#include "TracingUtility.h"
#include "absl/memory/memory.h"
#include "absl/strings/str_join.h"
#include "absl/time/time.h"
@@ -36,6 +37,7 @@
#include "UtilAll.h"
#include "rocketmq/ConsumeType.h"
#include "rocketmq/MQMessage.h"
+#include "rocketmq/MQMessageExt.h"
#include "rocketmq/MessageListener.h"
ROCKETMQ_NAMESPACE_BEGIN
@@ -139,33 +141,30 @@ void ConsumeStandardMessageService::consumeTask(const ProcessQueueWeakPtr& proce
auto span_context = opencensus::trace::propagation::FromTraceParentHeader(msg.traceContext());
auto span = opencensus::trace::Span::BlankSpan();
+ std::string span_name = consumer->resourceNamespace() + "/" + msg.getTopic() + " " +
+ MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_AWAIT_OPERATION;
if (span_context.IsValid()) {
- span = opencensus::trace::Span::StartSpanWithRemoteParent(MixAll::SPAN_NAME_AWAIT_CONSUMPTION, span_context,
- &Samplers::always());
+ span = opencensus::trace::Span::StartSpanWithRemoteParent(span_name, span_context, &Samplers::always());
} else {
- span = opencensus::trace::Span::StartSpan(MixAll::SPAN_NAME_AWAIT_CONSUMPTION, nullptr, {&Samplers::always()});
+ span = opencensus::trace::Span::StartSpan(span_name, nullptr, {&Samplers::always()});
}
-
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_ACCESS_KEY,
- consumer->credentialsProvider()->getCredentials().accessKey());
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_NAMESPACE, consumer->resourceNamespace());
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_DESTINATION, msg.getTopic());
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_ID, msg.getMsgId());
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_CLIENT_GROUP, consumer->getGroupName());
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_TAG, msg.getTags());
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_OPERATION,
+ MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_AWAIT_OPERATION);
+ TracingUtility::addUniversalSpanAttributes(msg, *consumer, span);
const auto& keys = msg.getKeys();
span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_KEYS,
absl::StrJoin(keys.begin(), keys.end(), MixAll::MESSAGE_KEY_SEPARATOR));
span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_ATTEMPT, msg.getDeliveryAttempt());
span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_AVAILABLE_TIMESTAMP,
absl::FormatTime(absl::FromUnixMillis(msg.getStoreTimestamp())));
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_HOST_NAME, UtilAll::hostname());
absl::Time decoded_timestamp = MessageAccessor::decodedTimestamp(msg);
span.AddAnnotation(
MixAll::SPAN_ANNOTATION_AWAIT_CONSUMPTION,
{{MixAll::SPAN_ANNOTATION_ATTR_START_TIME,
opencensus::trace::AttributeValueRef(absl::ToInt64Milliseconds(decoded_timestamp - absl::UnixEpoch()))}});
span.End();
+ MessageAccessor::setTraceContext(const_cast<MQMessageExt&>(msg),
+ opencensus::trace::propagation::ToTraceParentHeader(span.context()));
}
}
@@ -175,27 +174,28 @@ void ConsumeStandardMessageService::consumeTask(const ProcessQueueWeakPtr& proce
for (const auto& msg : msgs) {
auto span_context = opencensus::trace::propagation::FromTraceParentHeader(msg.traceContext());
auto span = opencensus::trace::Span::BlankSpan();
+ std::string span_name = consumer->resourceNamespace() + "/" + msg.getTopic() + " " +
+ MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_PROCESS_OPERATION;
if (span_context.IsValid()) {
- span = opencensus::trace::Span::StartSpanWithRemoteParent(MixAll::SPAN_NAME_CONSUME_MESSAGE, span_context);
+ span = opencensus::trace::Span::StartSpanWithRemoteParent(span_name, span_context);
} else {
- span = opencensus::trace::Span::StartSpan(MixAll::SPAN_NAME_CONSUME_MESSAGE);
+ span = opencensus::trace::Span::StartSpan(span_name);
}
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_ACCESS_KEY,
- consumer->credentialsProvider()->getCredentials().accessKey());
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_NAMESPACE, consumer->resourceNamespace());
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_DESTINATION, msg.getTopic());
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_ID, msg.getMsgId());
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_CLIENT_GROUP, consumer->getGroupName());
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_TAG, msg.getTags());
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_OPERATION,
+ MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_PROCESS_OPERATION);
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_OPERATION,
+ MixAll::SPAN_ATTRIBUTE_VALUE_MESSAGING_PROCESS_OPERATION);
+ TracingUtility::addUniversalSpanAttributes(msg, *consumer, span);
const auto& keys = msg.getKeys();
span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_KEYS,
absl::StrJoin(keys.begin(), keys.end(), MixAll::MESSAGE_KEY_SEPARATOR));
span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_ATTEMPT, msg.getDeliveryAttempt());
span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_AVAILABLE_TIMESTAMP,
absl::FormatTime(absl::FromUnixMillis(msg.getStoreTimestamp())));
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_HOST_NAME, UtilAll::hostname());
span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_BATCH_SIZE, msgs.size());
spans.emplace_back(std::move(span));
+ MessageAccessor::setTraceContext(const_cast<MQMessageExt&>(msg),
+ opencensus::trace::propagation::ToTraceParentHeader(span.context()));
}
}
diff --git a/src/main/cpp/rocketmq/ProducerImpl.cpp b/src/main/cpp/rocketmq/ProducerImpl.cpp
index 870bf48..4780220 100644
--- a/src/main/cpp/rocketmq/ProducerImpl.cpp
+++ b/src/main/cpp/rocketmq/ProducerImpl.cpp
@@ -310,11 +310,6 @@ void ProducerImpl::sendImpl(RetrySendCallback* callback) {
return;
}
- SendMessageRequest request;
- wrapSendMessageRequest(callback->message(), request, callback->messageQueue());
- Metadata metadata;
- Signature::sign(this, metadata);
-
{
// Trace Send RPC
auto& message = callback->message();
@@ -348,6 +343,12 @@ void ProducerImpl::sendImpl(RetrySendCallback* callback) {
callback->message().traceContext(opencensus::trace::propagation::ToTraceParentHeader(span.context()));
callback->span() = span;
}
+
+ SendMessageRequest request;
+ wrapSendMessageRequest(callback->message(), request, callback->messageQueue());
+ Metadata metadata;
+ Signature::sign(this, metadata);
+
client_manager_->send(target, metadata, request, callback);
}
@@ -417,7 +418,6 @@ bool ProducerImpl::endTransaction0(const std::string& target, const std::string&
span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_NAMESPACE, resourceNamespace());
span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_CLIENT_GROUP, getGroupName());
span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_ID, message_id);
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_HOST_NAME, UtilAll::hostname());
switch (resolution) {
case TransactionState::COMMIT:
span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_TRANSACTION_RESOLUTION, "commit");
diff --git a/src/main/cpp/tracing/exporters/OtlpExporter.cpp b/src/main/cpp/tracing/exporters/OtlpExporter.cpp
index 1ebbc22..e1c1735 100644
--- a/src/main/cpp/tracing/exporters/OtlpExporter.cpp
+++ b/src/main/cpp/tracing/exporters/OtlpExporter.cpp
@@ -15,6 +15,7 @@
* limitations under the License.
*/
#include "OtlpExporter.h"
+#include "ClientConfigImpl.h"
#include "InvocationContext.h"
#include "MixAll.h"
#include "Signature.h"
@@ -223,7 +224,7 @@ void OtlpExporterHandler::Export(const std::vector<::opencensus::trace::exporter
for (const auto& span : spans) {
auto item = new trace::Span();
- span.context().span_id().CopyTo(trace_id_buf);
+ span.context().trace_id().CopyTo(trace_id_buf);
item->set_trace_id(&trace_id_buf, TRACE_ID_SIZE);
span.context().span_id().CopyTo(span_id_buf);
@@ -345,8 +346,9 @@ void OtlpExporterHandler::Export(const std::vector<::opencensus::trace::exporter
item->mutable_status()->set_message(span.status().error_message());
}
- // item->mutable_status()->set_code()
-
+ instrument_library_span->mutable_instrumentation_library()->mutable_name()->assign(MixAll::OTLP_NAME_VALUE);
+ instrument_library_span->mutable_instrumentation_library()->mutable_version()->assign(
+ ClientConfigImpl::CLIENT_VERSION);
instrument_library_span->mutable_spans()->AddAllocated(item);
}
resource->mutable_instrumentation_library_spans()->AddAllocated(instrument_library_span);