You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/07/04 05:08:17 UTC

[rocketmq-clients] branch cpp updated (4a718ee -> 2ba08b9)

This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a change to branch cpp
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


 discard 4a718ee  Fix include
 discard fad375e  Fix to make it compile on Windows
     new 2ba08b9  Fix to make it compile on Windows

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (4a718ee)
            \
             N -- N -- N   refs/heads/cpp (2ba08b9)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:


[rocketmq-clients] 01/01: Fix to make it compile on Windows

Posted by li...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch cpp
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git

commit 2ba08b93e0babfe1da272af390e7a94fdd2b655e
Author: Li Zhanhui <li...@gmail.com>
AuthorDate: Mon Jul 4 11:37:33 2022 +0800

    Fix to make it compile on Windows
---
 cpp/.bazelrc                                    |  4 +---
 cpp/api/rocketmq/Tracing.h                      | 10 ++++++++--
 cpp/src/main/cpp/base/Tracing.cpp               | 26 -------------------------
 cpp/src/main/cpp/base/tests/RetryPolicyTest.cpp | 14 +++++++------
 cpp/src/main/cpp/client/RpcClientImpl.cpp       |  4 +---
 cpp/src/main/cpp/client/include/ClientConfig.h  |  7 ++++---
 cpp/src/main/cpp/client/include/RpcClient.h     |  7 ++-----
 cpp/src/main/cpp/client/include/RpcClientImpl.h |  7 ++-----
 cpp/src/main/cpp/rocketmq/ProcessQueueImpl.cpp  |  4 ++--
 cpp/src/main/cpp/rocketmq/ProducerImpl.cpp      | 13 +++++++------
 cpp/src/main/cpp/rocketmq/include/ClientImpl.h  |  6 ++++++
 11 files changed, 41 insertions(+), 61 deletions(-)

diff --git a/cpp/.bazelrc b/cpp/.bazelrc
index d5ec2a9..6b7b1da 100644
--- a/cpp/.bazelrc
+++ b/cpp/.bazelrc
@@ -13,8 +13,6 @@ run --color=yes
 build --color=yes
 
 build --host_force_python=PY3
-build --host_javabase=@bazel_tools//tools/jdk:remote_jdk11
-build --javabase=@bazel_tools//tools/jdk:remote_jdk11
 
 # https://docs.bazel.build/versions/main/command-line-reference.html#flag--enable_platform_specific_config
 # If true, Bazel picks up host-OS-specific config lines from bazelrc files. For example, if the host OS is Linux and
@@ -48,7 +46,7 @@ build --action_env=LD_LIBRARY_PATH
 build --action_env=LLVM_CONFIG
 build --action_env=PATH
 
-build --copt=-maes
+build:linux --copt=-maes
 
 # Common flags for sanitizers
 build:sanitizer --define tcmalloc=disabled
diff --git a/cpp/api/rocketmq/Tracing.h b/cpp/api/rocketmq/Tracing.h
index bc12362..cdec455 100644
--- a/cpp/api/rocketmq/Tracing.h
+++ b/cpp/api/rocketmq/Tracing.h
@@ -16,12 +16,18 @@
  */
 #pragma once
 
-#include "opencensus/trace/sampler.h"
+#include <memory>
 
 #include "RocketMQ.h"
+#include "opencensus/trace/sampler.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 
-opencensus::trace::Sampler* traceSampler() __attribute__((weak));
+class TracingSamplerProvider {
+public:
+  virtual ~TracingSamplerProvider() = default;
+
+  virtual std::unique_ptr<opencensus::trace::Sampler> tracingSampler() = 0;
+};
 
 ROCKETMQ_NAMESPACE_END
diff --git a/cpp/src/main/cpp/base/Tracing.cpp b/cpp/src/main/cpp/base/Tracing.cpp
deleted file mode 100644
index 9eb7335..0000000
--- a/cpp/src/main/cpp/base/Tracing.cpp
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "rocketmq/Tracing.h"
-
-ROCKETMQ_NAMESPACE_BEGIN
-
-opencensus::trace::Sampler* traceSampler() {
-  static opencensus::trace::NeverSampler sampler;
-  return &sampler;
-}
-
-ROCKETMQ_NAMESPACE_END
diff --git a/cpp/src/main/cpp/base/tests/RetryPolicyTest.cpp b/cpp/src/main/cpp/base/tests/RetryPolicyTest.cpp
index 168b87a..1e3c7bf 100644
--- a/cpp/src/main/cpp/base/tests/RetryPolicyTest.cpp
+++ b/cpp/src/main/cpp/base/tests/RetryPolicyTest.cpp
@@ -20,12 +20,14 @@
 ROCKETMQ_NAMESPACE_BEGIN
 
 TEST(RetryPolicyTest, testBackoff) {
-  RetryPolicy policy{.max_attempt = 3,
-                     .strategy = BackoffStrategy::Customized,
-                     .initial = absl::Milliseconds(0),
-                     .max = absl::Milliseconds(0),
-                     .multiplier = 0.0f,
-                     .next = {absl::Milliseconds(10), absl::Milliseconds(100), absl::Milliseconds(500)}};
+  RetryPolicy policy;
+  policy.max_attempt = 3;
+  policy.strategy = BackoffStrategy::Customized;
+  policy.initial = absl::Milliseconds(0);
+  policy.max = absl::Milliseconds(0);
+  policy.multiplier = 0.0f;
+  policy.next = {absl::Milliseconds(10), absl::Milliseconds(100), absl::Milliseconds(500)};
+
   ASSERT_EQ(policy.backoff(1), 10);
   ASSERT_EQ(policy.backoff(2), 100);
   ASSERT_EQ(policy.backoff(3), 500);
diff --git a/cpp/src/main/cpp/client/RpcClientImpl.cpp b/cpp/src/main/cpp/client/RpcClientImpl.cpp
index 623547e..35016c3 100644
--- a/cpp/src/main/cpp/client/RpcClientImpl.cpp
+++ b/cpp/src/main/cpp/client/RpcClientImpl.cpp
@@ -21,14 +21,12 @@
 #include <sstream>
 #include <thread>
 
-#include "absl/time/time.h"
-
 #include "ClientManager.h"
 #include "ReceiveMessageStreamReader.h"
 #include "RpcClient.h"
 #include "TelemetryBidiReactor.h"
 #include "TlsHelper.h"
-#include "include/ReceiveMessageContext.h"
+#include "absl/time/time.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 
diff --git a/cpp/src/main/cpp/client/include/ClientConfig.h b/cpp/src/main/cpp/client/include/ClientConfig.h
index 66ca230..bade9a4 100644
--- a/cpp/src/main/cpp/client/include/ClientConfig.h
+++ b/cpp/src/main/cpp/client/include/ClientConfig.h
@@ -21,12 +21,12 @@
 #include <string>
 #include <vector>
 
-#include "absl/container/flat_hash_map.h"
-#include "absl/time/time.h"
-
 #include "Protocol.h"
 #include "RetryPolicy.h"
+#include "absl/container/flat_hash_map.h"
+#include "absl/time/time.h"
 #include "rocketmq/CredentialsProvider.h"
+#include "rocketmq/Tracing.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 
@@ -61,6 +61,7 @@ struct ClientConfig {
   PublisherConfig publisher;
   SubscriberConfig subscriber;
   Metric metric;
+  std::unique_ptr<opencensus::trace::Sampler> sampler_;
 };
 
 ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/src/main/cpp/client/include/RpcClient.h b/cpp/src/main/cpp/client/include/RpcClient.h
index 2fc8448..fbb3017 100644
--- a/cpp/src/main/cpp/client/include/RpcClient.h
+++ b/cpp/src/main/cpp/client/include/RpcClient.h
@@ -21,15 +21,12 @@
 #include <memory>
 #include <string>
 
-#include "ReceiveMessageResult.h"
+#include "Protocol.h"
+#include "ReceiveMessageContext.h"
 #include "absl/container/flat_hash_map.h"
 #include "absl/strings/string_view.h"
 #include "grpcpp/grpcpp.h"
 
-#include "InvocationContext.h"
-#include "Protocol.h"
-#include "ReceiveMessageContext.h"
-
 ROCKETMQ_NAMESPACE_BEGIN
 
 using Channel = grpc::Channel;
diff --git a/cpp/src/main/cpp/client/include/RpcClientImpl.h b/cpp/src/main/cpp/client/include/RpcClientImpl.h
index 6406c74..35316ec 100644
--- a/cpp/src/main/cpp/client/include/RpcClientImpl.h
+++ b/cpp/src/main/cpp/client/include/RpcClientImpl.h
@@ -18,14 +18,11 @@
 
 #include <memory>
 
-#include "InvocationContext.h"
-#include "ReceiveMessageCallback.h"
-#include "ReceiveMessageContext.h"
-#include "absl/container/flat_hash_map.h"
-
 #include "Client.h"
 #include "ClientManager.h"
+#include "ReceiveMessageContext.h"
 #include "RpcClient.h"
+#include "absl/container/flat_hash_map.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 
diff --git a/cpp/src/main/cpp/rocketmq/ProcessQueueImpl.cpp b/cpp/src/main/cpp/rocketmq/ProcessQueueImpl.cpp
index 0eeb08e..70ab77b 100644
--- a/cpp/src/main/cpp/rocketmq/ProcessQueueImpl.cpp
+++ b/cpp/src/main/cpp/rocketmq/ProcessQueueImpl.cpp
@@ -22,13 +22,13 @@
 #include <system_error>
 #include <utility>
 
+#include "AsyncReceiveMessageCallback.h"
 #include "ClientManagerImpl.h"
 #include "MetadataConstants.h"
 #include "Protocol.h"
 #include "PushConsumerImpl.h"
 #include "ReceiveMessageResult.h"
 #include "Signature.h"
-#include "include/AsyncReceiveMessageCallback.h"
 #include "rocketmq/MessageListener.h"
 
 using namespace std::chrono;
@@ -113,7 +113,7 @@ void ProcessQueueImpl::popMessage() {
 
   std::weak_ptr<AsyncReceiveMessageCallback> cb{receive_callback_};
   auto callback = [cb](const std::error_code& ec, const ReceiveMessageResult& result) {
-    auto recv_cb = cb.lock();
+    std::shared_ptr<AsyncReceiveMessageCallback> recv_cb = cb.lock();
     if (recv_cb) {
       recv_cb->onCompletion(ec, result);
     }
diff --git a/cpp/src/main/cpp/rocketmq/ProducerImpl.cpp b/cpp/src/main/cpp/rocketmq/ProducerImpl.cpp
index 4b6c3f0..65cc5ba 100644
--- a/cpp/src/main/cpp/rocketmq/ProducerImpl.cpp
+++ b/cpp/src/main/cpp/rocketmq/ProducerImpl.cpp
@@ -290,16 +290,17 @@ void ProducerImpl::sendImpl(std::shared_ptr<SendContext> context) {
 
   {
     // Trace Send RPC
-    if (context->message_->traceContext().has_value()) {
+    if (context->message_->traceContext().has_value() && client_config_.sampler_) {
       auto span_context =
           opencensus::trace::propagation::FromTraceParentHeader(context->message_->traceContext().value());
       auto span = opencensus::trace::Span::BlankSpan();
       std::string span_name = resourceNamespace() + "/" + context->message_->topic() + " " +
                               MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_SEND_OPERATION;
       if (span_context.IsValid()) {
-        span = opencensus::trace::Span::StartSpanWithRemoteParent(span_name, span_context, {traceSampler()});
+        span = opencensus::trace::Span::StartSpanWithRemoteParent(span_name, span_context,
+                                                                  {client_config_.sampler_.get()});
       } else {
-        span = opencensus::trace::Span::StartSpan(span_name, nullptr, {traceSampler()});
+        span = opencensus::trace::Span::StartSpan(span_name, nullptr, {client_config_.sampler_.get()});
       }
       span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_OPERATION,
                         MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_SEND_OPERATION);
@@ -380,7 +381,7 @@ bool ProducerImpl::endTransaction0(const Transaction& transaction, TransactionSt
   bool completed = false;
   bool success = false;
   auto span = opencensus::trace::Span::BlankSpan();
-  if (!transaction.traceContext().empty()) {
+  if (!transaction.traceContext().empty() && client_config_.sampler_) {
     // Trace transactional message
     opencensus::trace::SpanContext span_context =
         opencensus::trace::propagation::FromTraceParentHeader(transaction.traceContext());
@@ -389,9 +390,9 @@ bool ProducerImpl::endTransaction0(const Transaction& transaction, TransactionSt
                                            : MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_ROLLBACK_OPERATION;
     std::string span_name = resourceNamespace() + "/" + transaction.topic() + " " + trace_operation_name;
     if (span_context.IsValid()) {
-      span = opencensus::trace::Span::StartSpanWithRemoteParent(span_name, span_context, {traceSampler()});
+      span = opencensus::trace::Span::StartSpanWithRemoteParent(span_name, span_context, {client_config_.sampler_.get()});
     } else {
-      span = opencensus::trace::Span::StartSpan(span_name, nullptr, {traceSampler()});
+      span = opencensus::trace::Span::StartSpan(span_name, nullptr, {client_config_.sampler_.get()});
     }
     span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_OPERATION, trace_operation_name);
     span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_OPERATION, trace_operation_name);
diff --git a/cpp/src/main/cpp/rocketmq/include/ClientImpl.h b/cpp/src/main/cpp/rocketmq/include/ClientImpl.h
index ff61af3..c136cc9 100644
--- a/cpp/src/main/cpp/rocketmq/include/ClientImpl.h
+++ b/cpp/src/main/cpp/rocketmq/include/ClientImpl.h
@@ -110,6 +110,12 @@ public:
   virtual void buildClientSettings(rmq::Settings& settings) {
   }
 
+  void registerTracingSampler(TracingSamplerProvider *provider) {
+    if (provider) {
+      client_config_.sampler_ = provider->tracingSampler();
+    }
+  }
+
 protected:
   ClientConfig client_config_;