You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2020/02/28 05:14:25 UTC

[GitHub] [incubator-doris] imay commented on a change in pull request #2956: Add multi-thread ver olaptablesink for LOAD

imay commented on a change in pull request #2956: Add multi-thread ver olaptablesink for LOAD
URL: https://github.com/apache/incubator-doris/pull/2956#discussion_r385502512
 
 

 ##########
 File path: be/src/exec/tablet_sink.h
 ##########
 @@ -141,26 +191,78 @@ class IndexChannel {
     std::unordered_map<int64_t, NodeChannel*> _node_channels;
     // from tablet_id to backend channel
     std::unordered_map<int64_t, std::vector<NodeChannel*>> _channels_by_tablet;
+
+    int64_t _serialize_batch_ns = 0;
+    int64_t _wait_in_flight_packet_ns = 0;
+
+    // BeId -> AddBatchCounter
+    std::unordered_map<int64_t, AddBatchCounter> _add_batch_counter_map;
 };
 
-// The counter of add_batch rpc of a single node
-struct AddBatchCounter {
-    // total execution time of a add_batch rpc
-    int64_t add_batch_execution_time_ns = 0;
-    // lock waiting time in a add_batch rpc
-    int64_t add_batch_wait_lock_time_ns = 0;
-    // number of add_batch call
-    int64_t add_batch_num = 0;
+// RowBuffer is used for multi-thread version of OlapTableSink, it's single-productor/single-consumer.
+// In multi-thread version, OlapTableSink will create multi RowBuffers, and create the same number threads to exec RowBuffer::consume_process.
+// Only one thread(OlapTableSink::send) exec push op, use modular hashing(node_id%buffer_num) to specify the buffer for which the row should be pushed into.
+class RowBuffer {
+public:
+    RowBuffer(TupleDescriptor* tuple_desc, int64_t byte_limit, int64_t size_limit)
+            : _off(false),
+              _consume_err(false),
+              _tuple_desc(tuple_desc),
+              _queue_runtime_size(size_limit),
+              _queue(size_limit),
+              _mem_tracker(new MemTracker(byte_limit)),
+              _buffer_pool(new MemPool(_mem_tracker.get())) {}
+
+    // push method won't generate error, it returns error only if buffer is not workable
+    // only be called from the producer thread
+    Status push(IndexChannel* index_ch, NodeChannel* node_ch, int64_t tablet_id, Tuple* tuple);
+
+    // the thread function of consumer thread
+    bool consume_process(int buffer_id);
+
+    // disable pushing item to buffer, but items in buffer will continue to be consumed
+    void turn_off() { _off = true; }
+
+    // there's no need for productor to differentiate off and error
+    bool workable() { return !_off && !_consume_err; }
+
+    void report_time(int buffer_id) {
+        LOG(INFO) << "buffer " << buffer_id << " time report: {consumed rows: " << _consume_count
+                  << ", mem_handle: " << _mem_handle_ns / 1e9
+                  << "s, deep_copy: " << _deep_copy_ns / 1e9
+                  << "s, spsc push block if full: " << _spsc_push_ns / 1e9
+                  << "s, consume: " << _consume_ns / 1e9
+                  << "s, actual consume: " << _actual_consume_ns / 1e9 << "s}";
+    }
+
+private:
+    std::atomic<bool> _off;
+    std::atomic<bool> _consume_err;
+
+    TupleDescriptor* _tuple_desc = nullptr;
+
+    std::size_t _queue_runtime_size;
+    // https://www.boost.org/doc/libs/1_64_0/doc/html/lockfree/examples.html#lockfree.examples.waitfree_single_producer_single_consumer_queue
+    boost::lockfree::spsc_queue<std::tuple<IndexChannel*, NodeChannel*, int64_t, Tuple*>> _queue;
+
+    boost::scoped_ptr<MemTracker> _mem_tracker;
+    boost::scoped_ptr<MemPool> _buffer_pool;
 
 Review comment:
   better to use std::unique_ptr

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org