You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by if...@apache.org on 2020/12/29 03:36:36 UTC

[rocketmq-client-cpp] 19/29: refactor: ExpressionType

This is an automated email from the ASF dual-hosted git repository.

ifplusor pushed a commit to branch re_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git

commit 67c55f8374f11004b94eceaf7b7df6e1314a4d36
Author: James Yin <yw...@hotmail.com>
AuthorDate: Mon Sep 21 15:43:31 2020 +0800

    refactor: ExpressionType
---
 include/ExpressionType.h                    | 37 +++++++++++++++++++++++++++++
 src/consumer/DefaultMQPushConsumerImpl.cpp  |  1 +
 src/consumer/ExpressionType.cpp             | 28 ++++++++++++++++++++++
 src/consumer/PullAPIWrapper.cpp             | 21 ++++++++--------
 src/consumer/PullAPIWrapper.h               | 21 ++++++++--------
 src/protocol/heartbeat/SubscriptionData.hpp | 16 +++++++++----
 6 files changed, 100 insertions(+), 24 deletions(-)

diff --git a/include/ExpressionType.h b/include/ExpressionType.h
new file mode 100644
index 0000000..cf243b3
--- /dev/null
+++ b/include/ExpressionType.h
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ROCKETMQ_EXPRESSIONTYPE_H_
+#define ROCKETMQ_EXPRESSIONTYPE_H_
+
+#include <string>  // std::string
+
+#include "RocketMQClient.h"
+
+namespace rocketmq {
+
+class ROCKETMQCLIENT_API ExpressionType {
+ public:
+  static const std::string SQL92;
+  static const std::string TAG;
+
+ public:
+  static bool isTagType(const std::string& type);
+};
+
+}  // namespace rocketmq
+
+#endif  // ROCKETMQ_EXPRESSIONTYPE_H_
diff --git a/src/consumer/DefaultMQPushConsumerImpl.cpp b/src/consumer/DefaultMQPushConsumerImpl.cpp
index 841d4ee..2b3a7c4 100644
--- a/src/consumer/DefaultMQPushConsumerImpl.cpp
+++ b/src/consumer/DefaultMQPushConsumerImpl.cpp
@@ -481,6 +481,7 @@ void DefaultMQPushConsumerImpl::pullMessage(PullRequestPtr pull_request) {
     auto* callback = new AsyncPullCallback(shared_from_this(), pull_request, subscription_data);
     pull_api_wrapper_->pullKernelImpl(message_queue,                                        // mq
                                       subExpression,                                        // subExpression
+                                      subscription_data->expression_type(),                 // expressionType
                                       subscription_data->sub_version(),                     // subVersion
                                       pull_request->next_offset(),                          // offset
                                       getDefaultMQPushConsumerConfig()->pull_batch_size(),  // maxNums
diff --git a/src/consumer/ExpressionType.cpp b/src/consumer/ExpressionType.cpp
new file mode 100644
index 0000000..979412f
--- /dev/null
+++ b/src/consumer/ExpressionType.cpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "ExpressionType.h"
+
+namespace rocketmq {
+
+const std::string ExpressionType::SQL92 = "SQL92";
+const std::string ExpressionType::TAG = "TAG";
+
+bool ExpressionType::isTagType(const std::string& type) {
+  return type.empty() || TAG == type;
+}
+
+}  // namespace rocketmq
diff --git a/src/consumer/PullAPIWrapper.cpp b/src/consumer/PullAPIWrapper.cpp
index e671118..88e96f6 100644
--- a/src/consumer/PullAPIWrapper.cpp
+++ b/src/consumer/PullAPIWrapper.cpp
@@ -94,16 +94,17 @@ PullResult PullAPIWrapper::processPullResult(const MQMessageQueue& mq,
                     pullResultExt.max_offset(), std::move(msgListFilterAgain));
 }
 
-PullResult* PullAPIWrapper::pullKernelImpl(const MQMessageQueue& mq,             // 1
-                                           const std::string& subExpression,     // 2
-                                           int64_t subVersion,                   // 3
-                                           int64_t offset,                       // 4
-                                           int maxNums,                          // 5
-                                           int sysFlag,                          // 6
-                                           int64_t commitOffset,                 // 7
-                                           int brokerSuspendMaxTimeMillis,       // 8
-                                           int timeoutMillis,                    // 9
-                                           CommunicationMode communicationMode,  // 10
+PullResult* PullAPIWrapper::pullKernelImpl(const MQMessageQueue& mq,
+                                           const std::string& subExpression,
+                                           const std::string& expressionType,
+                                           int64_t subVersion,
+                                           int64_t offset,
+                                           int maxNums,
+                                           int sysFlag,
+                                           int64_t commitOffset,
+                                           int brokerSuspendMaxTimeMillis,
+                                           int timeoutMillis,
+                                           CommunicationMode communicationMode,
                                            PullCallback* pullCallback) {
   std::unique_ptr<FindBrokerResult> findBrokerResult(
       client_instance_->findBrokerAddressInSubscribe(mq.broker_name(), recalculatePullFromWhichNode(mq), false));
diff --git a/src/consumer/PullAPIWrapper.h b/src/consumer/PullAPIWrapper.h
index 7edc227..73d98d9 100644
--- a/src/consumer/PullAPIWrapper.h
+++ b/src/consumer/PullAPIWrapper.h
@@ -34,16 +34,17 @@ class PullAPIWrapper {
 
   PullResult processPullResult(const MQMessageQueue& mq, PullResult& pullResult, SubscriptionData* subscriptionData);
 
-  PullResult* pullKernelImpl(const MQMessageQueue& mq,             // 1
-                             const std::string& subExpression,     // 2
-                             int64_t subVersion,                   // 3
-                             int64_t offset,                       // 4
-                             int maxNums,                          // 5
-                             int sysFlag,                          // 6
-                             int64_t commitOffset,                 // 7
-                             int brokerSuspendMaxTimeMillis,       // 8
-                             int timeoutMillis,                    // 9
-                             CommunicationMode communicationMode,  // 10
+  PullResult* pullKernelImpl(const MQMessageQueue& mq,
+                             const std::string& subExpression,
+                             const std::string& expressionType,
+                             int64_t subVersion,
+                             int64_t offset,
+                             int maxNums,
+                             int sysFlag,
+                             int64_t commitOffset,
+                             int brokerSuspendMaxTimeMillis,
+                             int timeoutMillis,
+                             CommunicationMode communicationMode,
                              PullCallback* pullCallback);
 
  private:
diff --git a/src/protocol/heartbeat/SubscriptionData.hpp b/src/protocol/heartbeat/SubscriptionData.hpp
index 82f8334..e7ad684 100644
--- a/src/protocol/heartbeat/SubscriptionData.hpp
+++ b/src/protocol/heartbeat/SubscriptionData.hpp
@@ -24,15 +24,19 @@
 
 #include <json/json.h>
 
+#include "ExpressionType.h"
 #include "UtilAll.h"
 
 namespace rocketmq {
 
 class SubscriptionData {
  public:
-  SubscriptionData() : sub_version_(UtilAll::currentTimeMillis()) {}
+  SubscriptionData() : sub_version_(UtilAll::currentTimeMillis()), expression_type_(ExpressionType::TAG) {}
   SubscriptionData(const std::string& topic, const std::string& subString)
-      : topic_(topic), sub_string_(subString), sub_version_(UtilAll::currentTimeMillis()) {}
+      : topic_(topic),
+        sub_string_(subString),
+        sub_version_(UtilAll::currentTimeMillis()),
+        expression_type_(ExpressionType::TAG) {}
 
   SubscriptionData(const SubscriptionData& other) {
     sub_string_ = other.sub_string_;
@@ -40,14 +44,15 @@ class SubscriptionData {
     tag_set_ = other.tag_set_;
     topic_ = other.topic_;
     code_set_ = other.code_set_;
+    expression_type_ = other.expression_type_;
   }
 
   virtual ~SubscriptionData() = default;
 
   bool operator==(const SubscriptionData& other) const {
     // FIXME: tags
-    return topic_ == other.topic_ && sub_string_ == other.sub_string_ && sub_version_ == other.sub_version_ &&
-           tag_set_.size() == other.tag_set_.size();
+    return expression_type_ == expression_type_ && topic_ == other.topic_ && sub_string_ == other.sub_string_ &&
+           sub_version_ == other.sub_version_ && tag_set_.size() == other.tag_set_.size();
   }
   bool operator!=(const SubscriptionData& other) const { return !operator==(other); }
 
@@ -93,12 +98,15 @@ class SubscriptionData {
 
   inline std::vector<int32_t>& code_set() { return code_set_; }
 
+  inline const std::string& expression_type() { return expression_type_; }
+
  private:
   std::string topic_;
   std::string sub_string_;
   int64_t sub_version_;
   std::vector<std::string> tag_set_;
   std::vector<int32_t> code_set_;
+  std::string expression_type_;
 };
 
 }  // namespace rocketmq