You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2022/12/22 09:25:30 UTC

[GitHub] [doris] github-actions[bot] commented on a diff in pull request #15283: [refactor](non-vec) delete non-vec data sink

github-actions[bot] commented on code in PR #15283:
URL: https://github.com/apache/doris/pull/15283#discussion_r1055255868


##########
be/src/vec/sink/vtablet_sink.cpp:
##########
@@ -257,6 +465,33 @@ int VNodeChannel::try_send_and_fetch_status(RuntimeState* state,
     return _send_finished ? 0 : 1;
 }
 
+void VNodeChannel::_cancel_with_msg(const std::string& msg) {

Review Comment:
   warning: method '_cancel_with_msg' can be made const [readability-make-member-function-const]
   
   ```suggestion
   void VNodeChannel::_cancel_with_msg(const std::string& msg) const {
   ```
   
   be/src/vec/sink/vtablet_sink.h:212:
   ```diff
   -     void _cancel_with_msg(const std::string& msg);
   +     void _cancel_with_msg(const std::string& msg) const;
   ```
   



##########
be/src/exec/parquet_scanner.h:
##########
@@ -0,0 +1,85 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <map>
+#include <memory>
+#include <sstream>
+#include <string>
+#include <vector>
+
+#include "common/status.h"
+#include "exec/base_scanner.h"
+#include "gen_cpp/PlanNodes_types.h"
+#include "gen_cpp/Types_types.h"
+#include "runtime/mem_pool.h"
+#include "util/runtime_profile.h"
+#include "util/slice.h"
+
+namespace doris {
+
+class Tuple;
+class SlotDescriptor;
+struct Slice;
+class ParquetReaderWrap;
+class RuntimeState;
+class ExprContext;
+class TupleDescriptor;
+class TupleRow;
+class RowDescriptor;
+class RuntimeProfile;
+class StreamLoadPipe;
+
+// Broker scanner convert the data read from broker to doris's tuple.
+class ParquetScanner : public BaseScanner {
+public:
+    ParquetScanner(RuntimeState* state, RuntimeProfile* profile,
+                   const TBrokerScanRangeParams& params,
+                   const std::vector<TBrokerRangeDesc>& ranges,
+                   const std::vector<TNetworkAddress>& broker_addresses,
+                   const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter);
+
+    ~ParquetScanner() override;
+
+    // Open this scanner, will initialize information need to
+    Status open() override;
+
+    // Get next tuple
+    Status get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, bool* fill_tuple) override;
+
+    Status get_next(vectorized::Block* block, bool* eof) override {
+        return Status::NotSupported("Not Implemented get block");
+    }
+
+    // Close this scanner
+    void close() override;
+
+protected:
+    // Read next buffer from reader
+    Status open_next_reader();
+
+protected:

Review Comment:
   warning: redundant access specifier has the same accessibility as the previous access specifier [readability-redundant-access-specifiers]
   
   ```suggestion
   
   ```
   **be/src/exec/parquet_scanner.h:71:** previously declared here
   ```cpp
   protected:
   ^
   ```
   



##########
be/src/vec/sink/vtablet_sink.h:
##########
@@ -29,36 +29,265 @@ class VExprContext;
 
 namespace stream_load {
 
-class VNodeChannel : public NodeChannel {
+// The counter of add_batch rpc of a single node
+struct AddBatchCounter {
+    // total execution time of a add_batch rpc
+    int64_t add_batch_execution_time_us = 0;
+    // lock waiting time in a add_batch rpc
+    int64_t add_batch_wait_execution_time_us = 0;
+    // number of add_batch call
+    int64_t add_batch_num = 0;
+    // time passed between marked close and finish close
+    int64_t close_wait_time_ms = 0;
+
+    AddBatchCounter& operator+=(const AddBatchCounter& rhs) {
+        add_batch_execution_time_us += rhs.add_batch_execution_time_us;
+        add_batch_wait_execution_time_us += rhs.add_batch_wait_execution_time_us;
+        add_batch_num += rhs.add_batch_num;
+        close_wait_time_ms += rhs.close_wait_time_ms;
+        return *this;
+    }
+    friend AddBatchCounter operator+(const AddBatchCounter& lhs, const AddBatchCounter& rhs) {
+        AddBatchCounter sum = lhs;
+        sum += rhs;
+        return sum;
+    }
+};
+
+// It's very error-prone to guarantee the handler capture vars' & this closure's destruct sequence.
+// So using create() to get the closure pointer is recommended. We can delete the closure ptr before the capture vars destruction.
+// Delete this point is safe, don't worry about RPC callback will run after ReusableClosure deleted.
+template <typename T>
+class ReusableClosure final : public google::protobuf::Closure {
 public:
-    VNodeChannel(OlapTableSink* parent, IndexChannel* index_channel, int64_t node_id);
+    ReusableClosure() : cid(INVALID_BTHREAD_ID) {}

Review Comment:
   warning: use '= default' to define a trivial default constructor [modernize-use-equals-default]
   
   ```suggestion
       ReusableClosure() : cid(INVALID_BTHREAD_ID) = default;
   ```
   



##########
be/src/vec/sink/vtablet_sink.h:
##########
@@ -29,36 +29,265 @@
 
 namespace stream_load {
 
-class VNodeChannel : public NodeChannel {
+// The counter of add_batch rpc of a single node
+struct AddBatchCounter {
+    // total execution time of a add_batch rpc
+    int64_t add_batch_execution_time_us = 0;
+    // lock waiting time in a add_batch rpc
+    int64_t add_batch_wait_execution_time_us = 0;
+    // number of add_batch call
+    int64_t add_batch_num = 0;
+    // time passed between marked close and finish close
+    int64_t close_wait_time_ms = 0;
+
+    AddBatchCounter& operator+=(const AddBatchCounter& rhs) {
+        add_batch_execution_time_us += rhs.add_batch_execution_time_us;
+        add_batch_wait_execution_time_us += rhs.add_batch_wait_execution_time_us;
+        add_batch_num += rhs.add_batch_num;
+        close_wait_time_ms += rhs.close_wait_time_ms;
+        return *this;
+    }
+    friend AddBatchCounter operator+(const AddBatchCounter& lhs, const AddBatchCounter& rhs) {
+        AddBatchCounter sum = lhs;
+        sum += rhs;
+        return sum;
+    }
+};
+
+// It's very error-prone to guarantee the handler capture vars' & this closure's destruct sequence.
+// So using create() to get the closure pointer is recommended. We can delete the closure ptr before the capture vars destruction.
+// Delete this point is safe, don't worry about RPC callback will run after ReusableClosure deleted.
+template <typename T>
+class ReusableClosure final : public google::protobuf::Closure {
 public:
-    VNodeChannel(OlapTableSink* parent, IndexChannel* index_channel, int64_t node_id);
+    ReusableClosure() : cid(INVALID_BTHREAD_ID) {}
+    ~ReusableClosure() override {
+        // shouldn't delete when Run() is calling or going to be called, wait for current Run() done.
+        join();
+        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());
+        cntl.Reset();
+    }
+
+    static ReusableClosure<T>* create() { return new ReusableClosure<T>(); }
+
+    void addFailedHandler(const std::function<void(bool)>& fn) { failed_handler = fn; }
+    void addSuccessHandler(const std::function<void(const T&, bool)>& fn) { success_handler = fn; }
+
+    void join() {
+        // We rely on in_flight to assure one rpc is running,
+        // while cid is not reliable due to memory order.
+        // in_flight is written before getting callid,
+        // so we can not use memory fence to synchronize.
+        while (_packet_in_flight) {
+            // cid here is complicated
+            if (cid != INVALID_BTHREAD_ID) {
+                // actually cid may be the last rpc call id.
+                brpc::Join(cid);
+            }
+            if (_packet_in_flight) {
+                std::this_thread::sleep_for(std::chrono::milliseconds(10));
+            }
+        }
+    }
 
-    ~VNodeChannel() override;
+    // plz follow this order: reset() -> set_in_flight() -> send brpc batch
+    void reset() {
+        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());
+        cntl.Reset();
+        cid = cntl.call_id();
+    }
 
-    Status init(RuntimeState* state) override;
+    bool try_set_in_flight() {
+        bool value = false;
+        return _packet_in_flight.compare_exchange_strong(value, true);
+    }
 
-    Status open_wait() override;
+    void clear_in_flight() { _packet_in_flight = false; }
+
+    bool is_packet_in_flight() { return _packet_in_flight; }
+
+    void end_mark() {
+        DCHECK(_is_last_rpc == false);
+        _is_last_rpc = true;
+    }
+
+    void Run() override {
+        DCHECK(_packet_in_flight);
+        if (cntl.Failed()) {
+            LOG(WARNING) << "failed to send brpc batch, error=" << berror(cntl.ErrorCode())
+                         << ", error_text=" << cntl.ErrorText();
+            failed_handler(_is_last_rpc);
+        } else {
+            success_handler(result, _is_last_rpc);
+        }
+        clear_in_flight();
+    }
+
+    brpc::Controller cntl;
+    T result;
+
+private:
+    brpc::CallId cid;
+    std::atomic<bool> _packet_in_flight {false};
+    std::atomic<bool> _is_last_rpc {false};
+    std::function<void(bool)> failed_handler;
+    std::function<void(const T&, bool)> success_handler;
+};
+
+class IndexChannel;
+class VOlapTableSink;
+
+class VNodeChannel {
+public:
+    VNodeChannel(VOlapTableSink* parent, IndexChannel* index_channel, int64_t node_id);
+
+    ~VNodeChannel();
+
+    // called before open, used to add tablet located in this backend
+    void add_tablet(const TTabletWithPartition& tablet) { _all_tablets.emplace_back(tablet); }
+
+    void add_slave_tablet_nodes(int64_t tablet_id, const std::vector<int64_t>& slave_nodes) {
+        _slave_tablet_nodes[tablet_id] = slave_nodes;
+    }
+
+    void open();
+
+    Status init(RuntimeState* state);
+
+    Status open_wait();
 
     Status add_block(vectorized::Block* block,
                      const std::pair<std::unique_ptr<vectorized::IColumn::Selector>,
-                                     std::vector<int64_t>>& payload) override;
+                                     std::vector<int64_t>>& payload);
 
     int try_send_and_fetch_status(RuntimeState* state,
-                                  std::unique_ptr<ThreadPoolToken>& thread_pool_token) override;
+                                  std::unique_ptr<ThreadPoolToken>& thread_pool_token);
 
     void try_send_block(RuntimeState* state);
 
-    void clear_all_blocks() override;
+    void clear_all_blocks();
 
     // two ways to stop channel:
     // 1. mark_close()->close_wait() PS. close_wait() will block waiting for the last AddBatch rpc response.
     // 2. just cancel()
-    void mark_close() override;
+    void mark_close();
+
+    // two ways to stop channel:
+    // 1. mark_close()->close_wait() PS. close_wait() will block waiting for the last AddBatch rpc response.
+    // 2. just cancel()
+    Status close_wait(RuntimeState* state);
+
+    void cancel(const std::string& cancel_msg);
+
+    void time_report(std::unordered_map<int64_t, AddBatchCounter>* add_batch_counter_map,

Review Comment:
   warning: method 'time_report' can be made const [readability-make-member-function-const]
   
   be/src/vec/sink/vtablet_sink.h:185:
   ```diff
   -                      int64_t* total_add_batch_num) {
   +                      int64_t* total_add_batch_num) const {
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org