You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by if...@apache.org on 2020/12/29 03:36:36 UTC
[rocketmq-client-cpp] 19/29: refactor: ExpressionType
This is an automated email from the ASF dual-hosted git repository.
ifplusor pushed a commit to branch re_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git
commit 67c55f8374f11004b94eceaf7b7df6e1314a4d36
Author: James Yin <yw...@hotmail.com>
AuthorDate: Mon Sep 21 15:43:31 2020 +0800
refactor: ExpressionType
---
include/ExpressionType.h | 37 +++++++++++++++++++++++++++++
src/consumer/DefaultMQPushConsumerImpl.cpp | 1 +
src/consumer/ExpressionType.cpp | 28 ++++++++++++++++++++++
src/consumer/PullAPIWrapper.cpp | 21 ++++++++--------
src/consumer/PullAPIWrapper.h | 21 ++++++++--------
src/protocol/heartbeat/SubscriptionData.hpp | 16 +++++++++----
6 files changed, 100 insertions(+), 24 deletions(-)
diff --git a/include/ExpressionType.h b/include/ExpressionType.h
new file mode 100644
index 0000000..cf243b3
--- /dev/null
+++ b/include/ExpressionType.h
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ROCKETMQ_EXPRESSIONTYPE_H_
+#define ROCKETMQ_EXPRESSIONTYPE_H_
+
+#include <string> // std::string
+
+#include "RocketMQClient.h"
+
+namespace rocketmq {
+
+class ROCKETMQCLIENT_API ExpressionType {
+ public:
+ static const std::string SQL92;
+ static const std::string TAG;
+
+ public:
+ static bool isTagType(const std::string& type);
+};
+
+} // namespace rocketmq
+
+#endif // ROCKETMQ_EXPRESSIONTYPE_H_
diff --git a/src/consumer/DefaultMQPushConsumerImpl.cpp b/src/consumer/DefaultMQPushConsumerImpl.cpp
index 841d4ee..2b3a7c4 100644
--- a/src/consumer/DefaultMQPushConsumerImpl.cpp
+++ b/src/consumer/DefaultMQPushConsumerImpl.cpp
@@ -481,6 +481,7 @@ void DefaultMQPushConsumerImpl::pullMessage(PullRequestPtr pull_request) {
auto* callback = new AsyncPullCallback(shared_from_this(), pull_request, subscription_data);
pull_api_wrapper_->pullKernelImpl(message_queue, // mq
subExpression, // subExpression
+ subscription_data->expression_type(), // expressionType
subscription_data->sub_version(), // subVersion
pull_request->next_offset(), // offset
getDefaultMQPushConsumerConfig()->pull_batch_size(), // maxNums
diff --git a/src/consumer/ExpressionType.cpp b/src/consumer/ExpressionType.cpp
new file mode 100644
index 0000000..979412f
--- /dev/null
+++ b/src/consumer/ExpressionType.cpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "ExpressionType.h"
+
+namespace rocketmq {
+
+const std::string ExpressionType::SQL92 = "SQL92";
+const std::string ExpressionType::TAG = "TAG";
+
+bool ExpressionType::isTagType(const std::string& type) {
+ return type.empty() || TAG == type;
+}
+
+} // namespace rocketmq
diff --git a/src/consumer/PullAPIWrapper.cpp b/src/consumer/PullAPIWrapper.cpp
index e671118..88e96f6 100644
--- a/src/consumer/PullAPIWrapper.cpp
+++ b/src/consumer/PullAPIWrapper.cpp
@@ -94,16 +94,17 @@ PullResult PullAPIWrapper::processPullResult(const MQMessageQueue& mq,
pullResultExt.max_offset(), std::move(msgListFilterAgain));
}
-PullResult* PullAPIWrapper::pullKernelImpl(const MQMessageQueue& mq, // 1
- const std::string& subExpression, // 2
- int64_t subVersion, // 3
- int64_t offset, // 4
- int maxNums, // 5
- int sysFlag, // 6
- int64_t commitOffset, // 7
- int brokerSuspendMaxTimeMillis, // 8
- int timeoutMillis, // 9
- CommunicationMode communicationMode, // 10
+PullResult* PullAPIWrapper::pullKernelImpl(const MQMessageQueue& mq,
+ const std::string& subExpression,
+ const std::string& expressionType,
+ int64_t subVersion,
+ int64_t offset,
+ int maxNums,
+ int sysFlag,
+ int64_t commitOffset,
+ int brokerSuspendMaxTimeMillis,
+ int timeoutMillis,
+ CommunicationMode communicationMode,
PullCallback* pullCallback) {
std::unique_ptr<FindBrokerResult> findBrokerResult(
client_instance_->findBrokerAddressInSubscribe(mq.broker_name(), recalculatePullFromWhichNode(mq), false));
diff --git a/src/consumer/PullAPIWrapper.h b/src/consumer/PullAPIWrapper.h
index 7edc227..73d98d9 100644
--- a/src/consumer/PullAPIWrapper.h
+++ b/src/consumer/PullAPIWrapper.h
@@ -34,16 +34,17 @@ class PullAPIWrapper {
PullResult processPullResult(const MQMessageQueue& mq, PullResult& pullResult, SubscriptionData* subscriptionData);
- PullResult* pullKernelImpl(const MQMessageQueue& mq, // 1
- const std::string& subExpression, // 2
- int64_t subVersion, // 3
- int64_t offset, // 4
- int maxNums, // 5
- int sysFlag, // 6
- int64_t commitOffset, // 7
- int brokerSuspendMaxTimeMillis, // 8
- int timeoutMillis, // 9
- CommunicationMode communicationMode, // 10
+ PullResult* pullKernelImpl(const MQMessageQueue& mq,
+ const std::string& subExpression,
+ const std::string& expressionType,
+ int64_t subVersion,
+ int64_t offset,
+ int maxNums,
+ int sysFlag,
+ int64_t commitOffset,
+ int brokerSuspendMaxTimeMillis,
+ int timeoutMillis,
+ CommunicationMode communicationMode,
PullCallback* pullCallback);
private:
diff --git a/src/protocol/heartbeat/SubscriptionData.hpp b/src/protocol/heartbeat/SubscriptionData.hpp
index 82f8334..e7ad684 100644
--- a/src/protocol/heartbeat/SubscriptionData.hpp
+++ b/src/protocol/heartbeat/SubscriptionData.hpp
@@ -24,15 +24,19 @@
#include <json/json.h>
+#include "ExpressionType.h"
#include "UtilAll.h"
namespace rocketmq {
class SubscriptionData {
public:
- SubscriptionData() : sub_version_(UtilAll::currentTimeMillis()) {}
+ SubscriptionData() : sub_version_(UtilAll::currentTimeMillis()), expression_type_(ExpressionType::TAG) {}
SubscriptionData(const std::string& topic, const std::string& subString)
- : topic_(topic), sub_string_(subString), sub_version_(UtilAll::currentTimeMillis()) {}
+ : topic_(topic),
+ sub_string_(subString),
+ sub_version_(UtilAll::currentTimeMillis()),
+ expression_type_(ExpressionType::TAG) {}
SubscriptionData(const SubscriptionData& other) {
sub_string_ = other.sub_string_;
@@ -40,14 +44,15 @@ class SubscriptionData {
tag_set_ = other.tag_set_;
topic_ = other.topic_;
code_set_ = other.code_set_;
+ expression_type_ = other.expression_type_;
}
virtual ~SubscriptionData() = default;
bool operator==(const SubscriptionData& other) const {
// FIXME: tags
- return topic_ == other.topic_ && sub_string_ == other.sub_string_ && sub_version_ == other.sub_version_ &&
- tag_set_.size() == other.tag_set_.size();
+ return expression_type_ == expression_type_ && topic_ == other.topic_ && sub_string_ == other.sub_string_ &&
+ sub_version_ == other.sub_version_ && tag_set_.size() == other.tag_set_.size();
}
bool operator!=(const SubscriptionData& other) const { return !operator==(other); }
@@ -93,12 +98,15 @@ class SubscriptionData {
inline std::vector<int32_t>& code_set() { return code_set_; }
+ inline const std::string& expression_type() { return expression_type_; }
+
private:
std::string topic_;
std::string sub_string_;
int64_t sub_version_;
std::vector<std::string> tag_set_;
std::vector<int32_t> code_set_;
+ std::string expression_type_;
};
} // namespace rocketmq