You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by gx...@apache.org on 2020/09/15 11:47:01 UTC
[incubator-tubemq] 45/50: [TUBEMQ-293]C++ SDK Create Future class
(#257)
This is an automated email from the ASF dual-hosted git repository.
gxcheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
commit 8d20dc3279aa43e7ff00a5f6750b31fb3b79d09e
Author: charlely <41...@users.noreply.github.com>
AuthorDate: Mon Sep 14 17:21:42 2020 +0800
[TUBEMQ-293]C++ SDK Create Future class (#257)
Co-authored-by: charleli <ch...@tencent.com>
---
tubemq-client-twins/tubemq-client-cpp/src/future.h | 143 +++++++++++++++++++++
1 file changed, 143 insertions(+)
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/future.h b/tubemq-client-twins/tubemq-client-cpp/src/future.h
new file mode 100644
index 0000000..181a0b4
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-cpp/src/future.h
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _TUBEMQ_FUTURE_H_
+#define _TUBEMQ_FUTURE_H_
+
+#include <condition_variable>
+#include <functional>
+#include <memory>
+#include <mutex>
+#include <vector>
+
+#include "tubemq/tubemq_errcode.h"
+
+namespace tubemq {
+
+template <typename Value>
+struct FutureInnerState {
+ std::mutex mutex_;
+ std::condition_variable condition_;
+ ErrorCode error_code_;
+ Value value_;
+ bool ready_ = false;
+ bool failed_ = false;
+ using FutureCallBackFunc = std::function<void(ErrorCode, const Value&)>;
+ std::vector<FutureCallBackFunc> callbacks_;
+};
+
+template <typename Value>
+class Future {
+ public:
+ using FutureInnerStatePtr = std::shared_ptr<FutureInnerState<Value> >;
+ using FutureCallBackFunc = std::function<void(ErrorCode, const Value&)>;
+ Future& AddCallBack(FutureCallBackFunc callback) {
+ Lock lock(state_->mutex_);
+
+ if (state_->ready_) {
+ lock.unlock();
+ callback(state_->error_code_, state_->value_);
+ } else {
+ state_->callbacks_.push_back(callback);
+ }
+ return *this;
+ }
+
+ ErrorCode Get(Value& value) {
+ Lock lock(state_->mutex_);
+
+ if (!state_->ready_) {
+ // Wait for error_code_
+ while (!state_->ready_) {
+ state_->condition_.wait(lock);
+ }
+ }
+
+ value = state_->value_;
+ return state_->error_code_;
+ }
+
+ private:
+ using Lock = std::unique_lock<std::mutex>;
+ explicit Future(FutureInnerStatePtr state) : state_(state) {}
+ FutureInnerStatePtr state_;
+
+ template <typename V>
+ friend class Promise;
+};
+
+template <typename Value>
+class Promise {
+ public:
+ using FutureInnerStatePtr = std::shared_ptr<FutureInnerState<Value> >;
+ using FutureCallBackFunc = std::function<void(ErrorCode, const Value&)>;
+ Promise() : state_(std::make_shared<FutureInnerState<Value> >()) {}
+
+ bool SetValue(const Value& value) {
+ Lock lock(state_->mutex_);
+
+ if (state_->ready_) {
+ return false;
+ }
+
+ state_->value_ = value;
+ state_->ready_ = true;
+
+ callbackAndNotify();
+ return true;
+ }
+
+ bool SetFailed(const ErrorCode& error_code_) {
+ Lock lock(state_->mutex_);
+
+ if (state_->ready_) {
+ return false;
+ }
+
+ state_->error_code_ = error_code_;
+ state_->ready_ = true;
+ state_->failed_ = true;
+
+ callbackAndNotify();
+ return true;
+ }
+
+ bool IsReady() const { return state_->ready_; }
+
+ bool IsFailed() const { return state_->failed_; }
+
+ Future<Value> GetFuture() const { return Future<Value>(state_); }
+
+ private:
+ void callbackAndNotify() {
+ for (auto callback : state_->callbacks_) {
+ callback(state_->error_code_, state_->value_);
+ }
+ state_->callbacks_.clear();
+ state_->condition_.notify_all();
+ }
+
+ private:
+ using Lock = std::unique_lock<std::mutex>;
+ FutureInnerStatePtr state_;
+};
+
+} /* namespace tubemq */
+
+#endif /* _TUBEMQ_FUTURE_H_ */