You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2020/02/20 10:09:05 UTC

[GitHub] [incubator-doris] vagetablechicken opened a new pull request #2956: Add multi-ver olaptablesink for LOAD

vagetablechicken opened a new pull request #2956: Add multi-ver olaptablesink for LOAD
URL: https://github.com/apache/incubator-doris/pull/2956
 
 
   Ref #2780 
   
   So we can use 
   ```
   LOAD LABEL ...
           PROPERTIES 
           ( 
           "buffer_num"="5", 
           "mem_limit_per_buf"="5368709120", 
           "size_limit_per_buf"="62914560"
           );
   ```
   to use multi-ver olaptablesink,  to accelerate the time-consuming load.
   
   - [ ] add configuration references
   
   - [ ] add doc

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morningman commented on a change in pull request #2956: Add multi-thread ver olaptablesink for LOAD

Posted by GitBox <gi...@apache.org>.
morningman commented on a change in pull request #2956: Add multi-thread ver olaptablesink for LOAD
URL: https://github.com/apache/incubator-doris/pull/2956#discussion_r382893956
 
 

 ##########
 File path: be/src/exec/tablet_sink.cpp
 ##########
 @@ -550,6 +648,12 @@ Status OlapTableSink::open(RuntimeState* state) {
     for (auto channel : _channels) {
         RETURN_IF_ERROR(channel->open());
     }
+
+    if (_use_multi_thread()) {
+        for (int i = 0; i < _buffers.size(); ++i) {
+            _send_threads.emplace_back(boost::thread(&RowBuffer::consume_process, _buffers[i], i));
 
 Review comment:
   using `std::thread` instead

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] vagetablechicken commented on issue #2956: Add multi-thread ver olaptablesink for LOAD

Posted by GitBox <gi...@apache.org>.
vagetablechicken commented on issue #2956: Add multi-thread ver olaptablesink for LOAD
URL: https://github.com/apache/incubator-doris/pull/2956#issuecomment-596889947
 
 
   After talking the matter over, we decided against this multi-thread version. I will submit a new design, just use one sender thread send batches asynchronously.
   Thanks for the reviews!

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morningman commented on a change in pull request #2956: Add multi-thread ver olaptablesink for LOAD

Posted by GitBox <gi...@apache.org>.
morningman commented on a change in pull request #2956: Add multi-thread ver olaptablesink for LOAD
URL: https://github.com/apache/incubator-doris/pull/2956#discussion_r382036639
 
 

 ##########
 File path: be/src/exec/tablet_sink.h
 ##########
 @@ -141,26 +191,75 @@ class IndexChannel {
     std::unordered_map<int64_t, NodeChannel*> _node_channels;
     // from tablet_id to backend channel
     std::unordered_map<int64_t, std::vector<NodeChannel*>> _channels_by_tablet;
+
+    int64_t _serialize_batch_ns = 0;
+    int64_t _wait_in_flight_packet_ns = 0;
+
+    // BeId -> AddBatchCounter
+    std::unordered_map<int64_t, AddBatchCounter> _add_batch_counter_map;
 };
 
-// The counter of add_batch rpc of a single node
-struct AddBatchCounter {
-    // total execution time of a add_batch rpc
-    int64_t add_batch_execution_time_ns = 0;
-    // lock waiting time in a add_batch rpc
-    int64_t add_batch_wait_lock_time_ns = 0;
-    // number of add_batch call
-    int64_t add_batch_num = 0;
+class RowBuffer {
 
 Review comment:
   Add some comments for this class

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morningman commented on a change in pull request #2956: Add multi-thread ver olaptablesink for LOAD

Posted by GitBox <gi...@apache.org>.
morningman commented on a change in pull request #2956: Add multi-thread ver olaptablesink for LOAD
URL: https://github.com/apache/incubator-doris/pull/2956#discussion_r382893830
 
 

 ##########
 File path: be/src/exec/tablet_sink.h
 ##########
 @@ -206,6 +289,12 @@ class OlapTableSink : public DataSink {
     // invalid row number is set in Bitmap
     int _validate_data(RuntimeState* state, RowBatch* batch, Bitmap* filter_bitmap);
 
+    bool _use_multi_thread() { return _buffer_num != 0; }
 
 Review comment:
   ```suggestion
       bool _use_multi_thread() { return _buffer_num > 0; }
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] imay commented on a change in pull request #2956: Add multi-thread ver olaptablesink for LOAD

Posted by GitBox <gi...@apache.org>.
imay commented on a change in pull request #2956: Add multi-thread ver olaptablesink for LOAD
URL: https://github.com/apache/incubator-doris/pull/2956#discussion_r385502512
 
 

 ##########
 File path: be/src/exec/tablet_sink.h
 ##########
 @@ -141,26 +191,78 @@ class IndexChannel {
     std::unordered_map<int64_t, NodeChannel*> _node_channels;
     // from tablet_id to backend channel
     std::unordered_map<int64_t, std::vector<NodeChannel*>> _channels_by_tablet;
+
+    int64_t _serialize_batch_ns = 0;
+    int64_t _wait_in_flight_packet_ns = 0;
+
+    // BeId -> AddBatchCounter
+    std::unordered_map<int64_t, AddBatchCounter> _add_batch_counter_map;
 };
 
-// The counter of add_batch rpc of a single node
-struct AddBatchCounter {
-    // total execution time of a add_batch rpc
-    int64_t add_batch_execution_time_ns = 0;
-    // lock waiting time in a add_batch rpc
-    int64_t add_batch_wait_lock_time_ns = 0;
-    // number of add_batch call
-    int64_t add_batch_num = 0;
+// RowBuffer is used for multi-thread version of OlapTableSink, it's single-productor/single-consumer.
+// In multi-thread version, OlapTableSink will create multi RowBuffers, and create the same number threads to exec RowBuffer::consume_process.
+// Only one thread(OlapTableSink::send) exec push op, use modular hashing(node_id%buffer_num) to specify the buffer for which the row should be pushed into.
+class RowBuffer {
+public:
+    RowBuffer(TupleDescriptor* tuple_desc, int64_t byte_limit, int64_t size_limit)
+            : _off(false),
+              _consume_err(false),
+              _tuple_desc(tuple_desc),
+              _queue_runtime_size(size_limit),
+              _queue(size_limit),
+              _mem_tracker(new MemTracker(byte_limit)),
+              _buffer_pool(new MemPool(_mem_tracker.get())) {}
+
+    // push method won't generate error, it returns error only if buffer is not workable
+    // only be called from the producer thread
+    Status push(IndexChannel* index_ch, NodeChannel* node_ch, int64_t tablet_id, Tuple* tuple);
+
+    // the thread function of consumer thread
+    bool consume_process(int buffer_id);
+
+    // disable pushing item to buffer, but items in buffer will continue to be consumed
+    void turn_off() { _off = true; }
+
+    // there's no need for productor to differentiate off and error
+    bool workable() { return !_off && !_consume_err; }
+
+    void report_time(int buffer_id) {
+        LOG(INFO) << "buffer " << buffer_id << " time report: {consumed rows: " << _consume_count
+                  << ", mem_handle: " << _mem_handle_ns / 1e9
+                  << "s, deep_copy: " << _deep_copy_ns / 1e9
+                  << "s, spsc push block if full: " << _spsc_push_ns / 1e9
+                  << "s, consume: " << _consume_ns / 1e9
+                  << "s, actual consume: " << _actual_consume_ns / 1e9 << "s}";
+    }
+
+private:
+    std::atomic<bool> _off;
+    std::atomic<bool> _consume_err;
+
+    TupleDescriptor* _tuple_desc = nullptr;
+
+    std::size_t _queue_runtime_size;
+    // https://www.boost.org/doc/libs/1_64_0/doc/html/lockfree/examples.html#lockfree.examples.waitfree_single_producer_single_consumer_queue
+    boost::lockfree::spsc_queue<std::tuple<IndexChannel*, NodeChannel*, int64_t, Tuple*>> _queue;
+
+    boost::scoped_ptr<MemTracker> _mem_tracker;
+    boost::scoped_ptr<MemPool> _buffer_pool;
 
 Review comment:
   better to use std::unique_ptr

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] vagetablechicken closed pull request #2956: Add multi-thread ver olaptablesink for LOAD

Posted by GitBox <gi...@apache.org>.
vagetablechicken closed pull request #2956: Add multi-thread ver olaptablesink for LOAD
URL: https://github.com/apache/incubator-doris/pull/2956
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morningman commented on a change in pull request #2956: Add multi-thread ver olaptablesink for LOAD

Posted by GitBox <gi...@apache.org>.
morningman commented on a change in pull request #2956: Add multi-thread ver olaptablesink for LOAD
URL: https://github.com/apache/incubator-doris/pull/2956#discussion_r382032981
 
 

 ##########
 File path: fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
 ##########
 @@ -17,6 +17,20 @@
 
 package org.apache.doris.load.loadv2;
 
+import com.google.common.base.Joiner;
 
 Review comment:
   Plz reorganize your imports order:
   
   org.apache.doris
   com
   org
   java
   javax

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morningman commented on a change in pull request #2956: Add multi-thread ver olaptablesink for LOAD

Posted by GitBox <gi...@apache.org>.
morningman commented on a change in pull request #2956: Add multi-thread ver olaptablesink for LOAD
URL: https://github.com/apache/incubator-doris/pull/2956#discussion_r382035039
 
 

 ##########
 File path: gensrc/thrift/DataSinks.thrift
 ##########
 @@ -105,6 +105,9 @@ struct TOlapTableSink {
     12: required Descriptors.TOlapTableLocationParam location
     13: required Descriptors.TPaloNodesInfo nodes_info
     14: optional i64 load_channel_timeout_s // the timeout of load channels in second
+    15: optional i32 buffer_num
 
 Review comment:
   Add some comment for these fields

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] vagetablechicken edited a comment on issue #2956: Add multi-thread ver olaptablesink for LOAD

Posted by GitBox <gi...@apache.org>.
vagetablechicken edited a comment on issue #2956: Add multi-thread ver olaptablesink for LOAD
URL: https://github.com/apache/incubator-doris/pull/2956#issuecomment-596889947
 
 
   After talking the matter over, we decided against this multi-thread version. I will submit a new design, just use one sender thread to send batches asynchronously.
   Thanks for the reviews!

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] imay commented on a change in pull request #2956: Add multi-thread ver olaptablesink for LOAD

Posted by GitBox <gi...@apache.org>.
imay commented on a change in pull request #2956: Add multi-thread ver olaptablesink for LOAD
URL: https://github.com/apache/incubator-doris/pull/2956#discussion_r385501833
 
 

 ##########
 File path: be/src/exec/tablet_sink.h
 ##########
 @@ -141,26 +191,78 @@ class IndexChannel {
     std::unordered_map<int64_t, NodeChannel*> _node_channels;
     // from tablet_id to backend channel
     std::unordered_map<int64_t, std::vector<NodeChannel*>> _channels_by_tablet;
+
+    int64_t _serialize_batch_ns = 0;
+    int64_t _wait_in_flight_packet_ns = 0;
+
+    // BeId -> AddBatchCounter
+    std::unordered_map<int64_t, AddBatchCounter> _add_batch_counter_map;
 };
 
-// The counter of add_batch rpc of a single node
-struct AddBatchCounter {
-    // total execution time of a add_batch rpc
-    int64_t add_batch_execution_time_ns = 0;
-    // lock waiting time in a add_batch rpc
-    int64_t add_batch_wait_lock_time_ns = 0;
-    // number of add_batch call
-    int64_t add_batch_num = 0;
+// RowBuffer is used for multi-thread version of OlapTableSink, it's single-productor/single-consumer.
+// In multi-thread version, OlapTableSink will create multi RowBuffers, and create the same number threads to exec RowBuffer::consume_process.
+// Only one thread(OlapTableSink::send) exec push op, use modular hashing(node_id%buffer_num) to specify the buffer for which the row should be pushed into.
+class RowBuffer {
 
 Review comment:
   This class can be moved into table_sink.cpp files

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morningman commented on a change in pull request #2956: Add multi-thread ver olaptablesink for LOAD

Posted by GitBox <gi...@apache.org>.
morningman commented on a change in pull request #2956: Add multi-thread ver olaptablesink for LOAD
URL: https://github.com/apache/incubator-doris/pull/2956#discussion_r382034126
 
 

 ##########
 File path: fe/src/main/java/org/apache/doris/planner/OlapTableSink.java
 ##########
 @@ -88,12 +92,29 @@ public OlapTableSink(OlapTable dstTable, TupleDescriptor tupleDescriptor, List<L
         }
     }
 
+    public OlapTableSink(OlapTable dstTable, TupleDescriptor tupleDescriptor, List<Long> partitionIds, int bufferNum,
+        long memLimitPerBuf, long sizeLimitPerBuf) {
+        this(dstTable, tupleDescriptor, partitionIds);
+        this.bufferNum = bufferNum;
+        this.memLimitPerBuf = memLimitPerBuf;
+        this.sizeLimitPerBuf = sizeLimitPerBuf;
+    }
+
     public void init(TUniqueId loadId, long txnId, long dbId, long loadChannelTimeoutS) throws AnalysisException {
         TOlapTableSink tSink = new TOlapTableSink();
         tSink.setLoad_id(loadId);
         tSink.setTxn_id(txnId);
         tSink.setDb_id(dbId);
         tSink.setLoad_channel_timeout_s(loadChannelTimeoutS);
+
+        if (bufferNum > 0) {
+            tSink.setBuffer_num(bufferNum);
+            if (memLimitPerBuf > 0)
 
 Review comment:
   warp with `{}`, even if there is only one line after `if`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org