You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2020/04/28 13:02:14 UTC

[GitHub] [incubator-doris] imay commented on a change in pull request #3143: Non blocking OlapTableSink

imay commented on a change in pull request #3143:
URL: https://github.com/apache/incubator-doris/pull/3143#discussion_r416586004



##########
File path: be/src/exec/tablet_sink.h
##########
@@ -68,99 +154,126 @@ class NodeChannel {
 
     Status add_row(Tuple* tuple, int64_t tablet_id);
 
-    Status close(RuntimeState* state);
+    // two ways to stop channel:
+    // 1. mark_close()->close_wait() PS. close_wait() will block waiting for the last AddBatch rpc response.
+    // 2. just cancel()
+    Status mark_close();
     Status close_wait(RuntimeState* state);
 
     void cancel();
 
-    int64_t node_id() const { return _node_id; }
+    // return:
+    // 0: stopped, send finished(eos request has been sent), or any internal error;
+    // 1: running, haven't reach eos.
+    // only allow 1 rpc in flight
+    int try_send_and_fetch_status();
+
+    void time_report(std::unordered_map<int64_t, AddBatchCounter>& add_batch_counter_map,
+                     int64_t* serialize_batch_ns, int64_t* mem_exceeded_block_ns,
+                     int64_t* queue_push_lock_ns, int64_t* actual_consume_ns) {
+        add_batch_counter_map[_node_id] += _add_batch_counter;
+        *serialize_batch_ns += _serialize_batch_ns;
+        *mem_exceeded_block_ns += _mem_exceeded_block_ns;
+        *queue_push_lock_ns += _queue_push_lock_ns;
+        *actual_consume_ns += _actual_consume_ns;
+    }
 
-    void set_failed() { _already_failed = true; }
-    bool already_failed() const { return _already_failed; }
+    int64_t node_id() const { return _node_id; }
     const NodeInfo* node_info() const { return _node_info; }
+    std::string print_load_info() { return _load_info; }
+    std::string name() const {
+        return "NodeChannel[" + std::to_string(_index_id) + "-" + std::to_string(_node_id) + "]";
+    }
 
-private:
-    Status _send_cur_batch(bool eos = false);
-    // wait inflight packet finish, return error if inflight packet return failed
-    Status _wait_in_flight_packet();
-
-    Status _close(RuntimeState* state);
+    Status none_of(std::initializer_list<bool> vars);
 
 private:
     OlapTableSink* _parent = nullptr;
     int64_t _index_id = -1;
     int64_t _node_id = -1;
     int32_t _schema_hash = 0;
+    std::string _load_info;
 
     TupleDescriptor* _tuple_desc = nullptr;
     const NodeInfo* _node_info = nullptr;
 
-    bool _already_failed = false;
-    bool _has_in_flight_packet = false;
     // this should be set in init() using config
     int _rpc_timeout_ms = 60000;
     int64_t _next_packet_seq = 0;
 
-    std::unique_ptr<RowBatch> _batch;
+    // user cancel or get some errors
+    std::atomic<bool> _cancelled{false};
+
+    std::atomic<bool> _send_finished{false};
+    std::atomic<bool> _add_batches_finished{false};
+
+    bool _eos_is_produced{false}; // only for restricting producer behaviors
+
+    std::unique_ptr<RowDescriptor> _row_desc;
+    int _batch_size = 0;
+    std::unique_ptr<RowBatch> _cur_batch;
+    PTabletWriterAddBatchRequest _cur_add_batch_request;
+
+    std::mutex _pending_batches_lock;
+    using AddBatchReq = std::pair<std::unique_ptr<RowBatch>, PTabletWriterAddBatchRequest>;
+    std::queue<AddBatchReq> _pending_batches;
+    std::atomic<int> _pending_batches_num{0};
+
     palo::PInternalService_Stub* _stub = nullptr;
     RefCountClosure<PTabletWriterOpenResult>* _open_closure = nullptr;
-    RefCountClosure<PTabletWriterAddBatchResult>* _add_batch_closure = nullptr;
+    ReusableClosure<PTabletWriterAddBatchResult>* _add_batch_closure = nullptr;
 
     std::vector<TTabletWithPartition> _all_tablets;
-    PTabletWriterAddBatchRequest _add_batch_request;
+    std::vector<TTabletCommitInfo> _tablet_commit_infos;
+
+    AddBatchCounter _add_batch_counter;
+    int64_t _serialize_batch_ns = 0;
+
+    int64_t _mem_exceeded_block_ns = 0;
+    int64_t _queue_push_lock_ns = 0;
+    int64_t _actual_consume_ns = 0;
 };
 
 class IndexChannel {
 public:
     IndexChannel(OlapTableSink* parent, int64_t index_id, int32_t schema_hash)
-            : _parent(parent), _index_id(index_id),
-            _schema_hash(schema_hash) {
-    }
+            : _parent(parent), _index_id(index_id), _schema_hash(schema_hash) {}
     ~IndexChannel();
 
-    Status init(RuntimeState* state,
-                const std::vector<TTabletWithPartition>& tablets);
-    Status open();
-    Status add_row(Tuple* tuple, int64_t tablet_id);
+    Status init(RuntimeState* state, const std::vector<TTabletWithPartition>& tablets);
 
-    Status close(RuntimeState* state);
+    Status add_row(Tuple* tuple, int64_t tablet_id);
 
-    void cancel();
+    void for_each_node_channel(std::function<void(NodeChannel*)> func) {

Review comment:
       ```suggestion
       void for_each_node_channel(const std::function<void(NodeChannel*)>& func) {
   ```

##########
File path: be/src/common/config.h
##########
@@ -296,6 +296,8 @@ namespace config {
     // you may need to increase this timeout if using larger 'streaming_load_max_mb',
     // or encounter 'tablet writer write failed' error when loading.
     // CONF_Int32(tablet_writer_rpc_timeout_sec, "600");
+    // OlapTableSink sender's send interval, should be less than the real response time of a tablet writer rpc.
+    CONF_Int32(olap_table_sink_send_interval_ms, "10");

Review comment:
       ```suggestion
       CONF_mInt32(olap_table_sink_send_interval_ms, "10");
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org