You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2020/01/16 10:12:25 UTC

[GitHub] [incubator-doris] vagetablechicken opened a new issue #2780: OlapTableSink::send is low efficient?

vagetablechicken opened a new issue #2780: OlapTableSink::send is low efficient?
URL: https://github.com/apache/incubator-doris/issues/2780
 
 
   **Problem**
   When we use broker load, OlapTableSink::send() takes the longest time, almost all of the plan_fragment active time.
   Example in one BE:
   ```
   Fragment f59d832368a84c94-be109f903cf4698d:(Active: 3h36m, % non-child: 0.00%)
      - AverageThreadTokens: 1.00
      - PeakReservation: 0
      - PeakUsedReservation: 0
      - RowsProduced: 168.61M
      - SizeProduced: 30.25 GB
     BlockMgr:
        - BlockWritesOutstanding: 0
        - BlocksCreated: 0
        - BlocksRecycled: 0
        - BufferedPins: 0
        - BytesWritten: 0
        - MaxBlockSize: 8.00 MB
        - MemoryLimit: 2.00 GB
        - TotalBufferWaitTime: 0.000ns
        - TotalEncryptionTime: 0.000ns
        - TotalIntegrityCheckTime: 0.000ns
        - TotalReadBlockTime: 0.000ns
     OlapTableSink:(Active: 3h35m, % non-child: 0.00%)
        - CloseTime: 102.932ms
        - ConvertBatchTime: 0.000ns
        - OpenTime: 247.194ms
        - RowsFiltered: 0
        - RowsRead: 168.61M
        - RowsReturned: 168.61M
        - SendDataTime: 3h34m
        - SerializeBatchTime: 8m26s
        - ValidateDataTime: 19s554ms
        - WaitInFlightPacketTime: 3h23m
     BROKER_SCAN_NODE (id=0):(Active: 1m8s, % non-child: 0.00%)
        - BytesRead: 0
        - MemoryUsed: 0
        - NumThread: 0
        - PerReadThreadRawHdfsThroughput: 0.00 /sec
        - RowsRead: 168.61M
        - RowsReturned: 168.61M
        - RowsReturnedRate: 2.48 M/sec
        - ScanRangesComplete: 0
        - ScannerThreadsInvoluntaryContextSwitches: 0
        - ScannerThreadsTotalWallClockTime: 0.000ns
          - MaterializeTupleTime(*): 5m37s
          - ScannerThreadsSysTime: 0.000ns
          - ScannerThreadsUserTime: 0.000ns
        - ScannerThreadsVoluntaryContextSwitches: 0
        - TotalRawReadTime(*): 38m58s
        - TotalReadThroughput: 0.00 /sec
        - WaitScannerTime: 1m7s
   ```
   
   As can be seen above, WaitInFlightPacketTime is the most time-consuming portion.
   
   **Analysis**
   I describe the whole progress here.
   
   PlanFragmentExecutor pseudo code:
   ```
   while(1){
       batch=get_one_batch();
       OlapTableSink::send(batch);
   }
   ```
   Then, OlapTableSink::send() pseudo code:
   ```
   for(row in batch){
       channel=get_corresponding_channel(row);
   
       // channel::add_row() explanation:
       ok=channel::add_row_in_cur_batch(row);
       if(!ok){
           if(channel::has_in_flight_packet){
               channel::wait_in_flight_packet(); // (*)
           }
           channel::send_add_batch_req();
           channel::add_row_in_cur_batch(row);
       }
       // channel::add_row() end
   }
   ```
   
   So if we trigger channel::wait_in_flight_packet(), it will block the whole process. But there's no need to block other channels add_row(). 
   For example, channel0 is waiting in_flight_packet, we can still add row to other channels.
   
   **Better solutions(preliminary thoughts)**
   * make channel::add_row() non-blocking. It might be a massive change.
   * make channel::add_row() less blocking. e.g. avoid adding row to channel0 immediately after channel0 send a add_batch request.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morningman commented on issue #2780: OlapTableSink::send is low efficient?

Posted by GitBox <gi...@apache.org>.
morningman commented on issue #2780: OlapTableSink::send is low efficient?
URL: https://github.com/apache/incubator-doris/issues/2780#issuecomment-575144104
 
 
   Yes, you are right. Currently, there is only one thread to do `add rows -> send data -> wait in flight`. And actually, `add rows` and `send data` can be done paralleled.
   
   A simple implementation is to use one thread to add rows, and a thread pool to do the data sending. But may double the memory consumption cause it may has 2 row batch in memory 
   same time.
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] vagetablechicken commented on issue #2780: OlapTableSink::send is low efficient?

Posted by GitBox <gi...@apache.org>.
vagetablechicken commented on issue #2780: OlapTableSink::send is low efficient?
URL: https://github.com/apache/incubator-doris/issues/2780#issuecomment-588210735
 
 
   > The enhancement of this test case seems not obvious?
   
   the origin ver is being tested. It typically takes 5h. I haven't tested it recently, so I ran this test again.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] vagetablechicken commented on issue #2780: OlapTableSink::send is low efficient?

Posted by GitBox <gi...@apache.org>.
vagetablechicken commented on issue #2780: OlapTableSink::send is low efficient?
URL: https://github.com/apache/incubator-doris/issues/2780#issuecomment-575530354
 
 
   @morningman 
   That means the memory usage of a row batch may be great?  So thus, we should stop pushing more batch as follows:
   https://github.com/apache/incubator-doris/blob/8df63bc191fd123df5986084c1e453db732fda29/be/src/exec/broker_scan_node.cpp#L366-L378
   
   I think even if in the single-thread model,  it's more than 1 row batch in memory.
   Let's consider the worst-case scenarios. 
   
   ### The single-thread model
   
   Divide into three roles:
   
   **1 scanner (holds one _batch_queue)**:
   
   The mem usage of _batch_queue is denoted by Size<sub>queue</sub>.
   _batch_queue is not empty and memory exceed limit(aka mem_limit), thus
   
   Size<sub>queue</sub> > mem_limit
   
   **1 plan_fragment_executor (holds one _row_batch)**:
   _row_batch comes from _batch_queue, denoted by Size<sub>batch</sub>. If it's too large, we can assume:
   
   Size<sub>batch</sub> > mem_limit
   
   **N<sub>be</sub> NodeChannels(holds deep_copy rows, aka "_batch")**
   
   If "_batch"s in all channels are near full capacity, every channel has batch_size-1 rows, the mem usage is approximately equal to Size<sub>batch</sub>.
   The mem usage of all NodeChannels is N<sub>be</sub>*Size<sub>batch</sub>.
   
   So, Mem<sub>worst</sub> = Size<sub>queue</sub>+(1+N<sub>be</sub>)*Size<sub>batch</sub>.
   
   Mem<sub>worst</sub> > (2+N<sub>be</sub>)*mem_limit.
   
   ### The multi-thread model
   
   If we use a thread pool or whatever to do data sending, we should add a new role, sender. The best parallelism is that one sender is in charge of one channel.
   
   
   **1 scanner (holds one _batch_queue)** SAME
   
   **1 plan_fragment_executor (holds one _row_batch)** SAME
   
   **N<sub>be</sub> sender**
   senders will buffer rows, this should be strictly controlled of mem usage.
   The mem usage of one sender is denoted by Size<sub>sender</sub>.
   The sum is N<sub>be</sub>*Size<sub>sender</sub>.
   
   **N<sub>be</sub> NodeChannels(holds deep_copy rows, aka "_batch")** SAME
   
   So, Mem<sub>worst</sub> = Size<sub>queue</sub>+(1+N<sub>be</sub>)*Size<sub>batch</sub>+N<sub>be</sub>*Size<sub>sender</sub>.
   Mem<sub>worst</sub> > (2+N<sub>be</sub>)*mem_limit + N<sub>be</sub>*Size<sub>sender</sub>
   
   If Size<sub>sender</sub> is significantly smaller than mem_limit, it can be ignored.
   If Size<sub>sender</sub> is smaller than the bytes per row, it still might bring some benefits, cause it improves the parallel degree of wait_in_flight_packet.
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] vagetablechicken edited a comment on issue #2780: OlapTableSink::send is low efficient?

Posted by GitBox <gi...@apache.org>.
vagetablechicken edited a comment on issue #2780: OlapTableSink::send is low efficient?
URL: https://github.com/apache/incubator-doris/issues/2780#issuecomment-588156273
 
 
   # OlapTableSink Multithreading Solution Design
   
   ## single-thread model (original model)
   ![image](https://user-images.githubusercontent.com/24697960/74130744-bbfe7700-4c1d-11ea-89e0-f6e9748bafe0.png)
   
   ## multi-thread model
   ![image](https://user-images.githubusercontent.com/24697960/74131130-a2116400-4c1e-11ea-9696-3934555e870b.png)
   
   When we create an OlapTableSink, prepare N buffers and N send threads.
    
   If a row needs to be added to NodeChannel(node_id=A, only consider node_id),  the main thread copy it to buffer A%N. 
   
   We can limit the buffer size(use MemTracker), the mem_limit is configurable. If a buffer is full, we block adding rows to this buffer , until all rows in buffer have been consumed. 
   
   ### Extra cost of multi-thread ver 
   thread: buffer_num
   mem: buffer_num*(mem_limit+buffer_running_mem)
   
   ### single/multi version switch
   The acceleration effect is evident for large data imports. But no need for every OlapTableSink.
   So I design it:
   We can select origin(single-thread) version or multi-thread version by set broker load property, and configure the buffer_num & mem_limit_per_buffer.
   We add fields to TOlapTableSink, like
   ```
   struct TOlapTableSink {
       ...
       14: optional i64 load_channel_timeout_s // the timeout of load channels in second
       15: optional i32 buffer_num
       16: optional i64 mem_limit_per_buf
       17: optional i64 size_limit_per_buf
   }
   ```
   `buffer_num > 0` means using "multi-thread & buffer" mode. 
   `buffer_num = 0` or not set means using "single-thread" mode(the origin mode).
   (`buffer_num = 1` is hard to define. We should avoid setting buffer_num to 1.)
   
   So we can use 
   ```
   LOAD LABEL ...
           PROPERTIES 
           ( 
           "buffer_num"="5", 
           "mem_limit_per_buf"="5368709120", 
           "size_limit_per_buf"="62914560"
           );
   ```
   
   # A Test Case
   cluster: 5 be
   {"ScannedRows":895737370,"TaskNumber":1,"FileNumber":300,"FileSize":60216822679} // 56G
   
   ## origin ver
   | LoadStartTime       | LoadFinishTime      |
   |----|----|
    | 2020-02-19 20:44:00 | 2020-02-20 00:35:01 |
   
   ## multi-thread ver
   buffer_num/mem_limit_per_buf/size_limit_per_buf = 5/1G/30M
   | LoadStartTime       | LoadFinishTime      |
   |----|----|
    | 2020-02-18 17:57:38 | 2020-02-18 20:05:13 |
   
   buffer_num/mem_limit_per_buf/size_limit_per_buf = 5/5G/30M
   | LoadStartTime       | LoadFinishTime      |
   |----|----|
    | 2020-02-18 20:14:58 | 2020-02-18 22:19:54 |
   
   buffer_num/mem_limit_per_buf/size_limit_per_buf = 5/5G/60M
   | LoadStartTime       | LoadFinishTime      |
   |----|----|
    | 2020-02-18 22:28:08 | 2020-02-19 00:36:28 |
   
   ## Test Analysis
   56G broker load, the origin ver will run for 4 hours, the multi-ver can halve the time.
   But increasing mem_limit & size_limit doesn't do much good.(not bad, cause we can only use minimum mem,  the more memory is waste. 😀 )
   
   1. Because we use modular hashing(node_id%buffer_num), the buffer_num should <= be_num.
   buffer_num=be_num is ideal.
   1. In single-thread ver, we only have a thread to add_batch/wait_packet. In multi-thread ver, we have buffer_num thread to do them. But the add_batch/wait_packet in one thread are in sequence, can't save time in one thread. Thus, the `actual consume` time is the ideal of load run time. For example, 
   `mem_handle: 3506.55s, deep_copy: 22.1017s, spsc push block if full: 7.81192s, consume: 7298.54s, actual consume: 7285s.` means the ideal run time is 7285s, our configuration can make the process 7298s. It means little to further optimize.
   1. we use spsc_queue(ringbuffer) to be the buffer, it has fixed length(size_limit_per_buf). So, the size_limit_per_buf should not be too big. We can use size_limit_per_buf*30B to estimate the mem of queue.
   
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] vagetablechicken commented on issue #2780: OlapTableSink::send is low efficient?

Posted by GitBox <gi...@apache.org>.
vagetablechicken commented on issue #2780: OlapTableSink::send is low efficient?
URL: https://github.com/apache/incubator-doris/issues/2780#issuecomment-600976814
 
 
   > The new non-blocking approach looks good. It should improve the overall load performance particularly in case where you have some slower receiver nodes.
   > 
   > BTW you should cap the total number of pending batches in order to control the memory usage.
   
   There's no need to limit mem in sink node. When we create a new RowBatch in NodeChannel, we use sink node's mem_tracker. As we know, the sink node & the scan node has the same ancestor mem_tracker(query mem tracker, default 2GB), so the mem limit is a matter for scan node.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] chaoyli commented on issue #2780: OlapTableSink::send is low efficient?

Posted by GitBox <gi...@apache.org>.
chaoyli commented on issue #2780: OlapTableSink::send is low efficient?
URL: https://github.com/apache/incubator-doris/issues/2780#issuecomment-588163478
 
 
   The enhancement of this test case seems not obvious?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] vagetablechicken commented on issue #2780: OlapTableSink::send is low efficient?

Posted by GitBox <gi...@apache.org>.
vagetablechicken commented on issue #2780: OlapTableSink::send is low efficient?
URL: https://github.com/apache/incubator-doris/issues/2780#issuecomment-596974767
 
 
   https://github.com/apache/incubator-doris/pull/2956#issuecomment-596889947
   As mentioned, here‘s the new design of OlapTableSink--add one sender thread to do non-blocking sending.
   Let me explain the meaning of non-blocking.
   
   The origin version of OlapTableSink can be abstracted as one queue(contains all batches in all node channels). One thread consumes the queue's items, one by one.  When it wants to send a batch of the NodeChannel which has a in flight packet(rpc hasn’t returned response), it must wait(rpc join).
   For example:
   The batch which index id=0 && node id=2, is denoted by "B1(0-2)".
   The Abstract queue as shown below. 
   
   B0(0-1) | B1(0-2) | B2(0-1) | B3(1-4) | ... | ...
   -- | -- | -- | -- | -- | --
   
   When we are sending B2(0-1), we must wait for the B0 response. But if we set aside B2(0-1) and send the next item "B3(1-4)", it won't be blocked.
   
   So I used to split the one queue into multi queues(abandoned), as follows.( For details, see https://github.com/apache/incubator-doris/issues/2780#issuecomment-588156273)
   Batches queue0:
   
   B0(0-0) | B1(0-0) | B2(2-0) | B3(4-0) | ... | ...
   -- | -- | -- | -- | -- | --
   
   
   Batches queue1:
   
   B0(2-1) | B1(0-1) | B2(0-1) | B3(1-1) | ... | ...
   -- | -- | -- | -- | -- | --
   
   
   Batches queue2:
   
   B0(0-2) | B1(0-2) | B2(3-2) | B3(1-2) | ... | ...
   -- | -- | -- | -- | -- | --
   
   
   Each queue needs one thread to consume items. Block time is shared by multiple queues. But it's still a block way.
   
   ### The new design is non-blocking. 
   We can save batches in NodeChannels(pending batches), and **try to send** a pending batch. If the current channel has a in flight packet, we just skip sending in this round.
   
   The implementation is coming soon.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] vagetablechicken edited a comment on issue #2780: OlapTableSink::send is low efficient?

Posted by GitBox <gi...@apache.org>.
vagetablechicken edited a comment on issue #2780: OlapTableSink::send is low efficient?
URL: https://github.com/apache/incubator-doris/issues/2780#issuecomment-588156273
 
 
   # OlapTableSink Multithreading Solution Design
   
   ## single-thread model (original model)
   ![image](https://user-images.githubusercontent.com/24697960/74130744-bbfe7700-4c1d-11ea-89e0-f6e9748bafe0.png)
   
   ## multi-thread model
   ![image](https://user-images.githubusercontent.com/24697960/74131130-a2116400-4c1e-11ea-9696-3934555e870b.png)
   
   When we create an OlapTableSink, prepare N buffers and N send threads.
    
   If a row needs to be added to NodeChannel(node_id=A, only consider node_id),  the main thread copy it to buffer A%N. 
   
   We can limit the buffer size(use MemTracker), the mem_limit is configurable. If a buffer is full, we block adding rows to this buffer , until all rows in buffer have been consumed. 
   
   ### Extra cost of multi-thread ver 
   thread: buffer_num
   mem: buffer_num*(mem_limit+buffer_running_mem)
   
   ### single/multi version switch
   The acceleration effect is evident for large data imports. But no need for every OlapTableSink.
   So I design it:
   We can select origin(single-thread) version or multi-thread version by set broker load property, and configure the buffer_num & mem_limit_per_buffer.
   We add fields to TOlapTableSink, like
   ```
   struct TOlapTableSink {
       ...
       14: optional i64 load_channel_timeout_s // the timeout of load channels in second
       15: optional i32 buffer_num
       16: optional i64 mem_limit_per_buf
       17: optional i64 size_limit_per_buf
   }
   ```
   `buffer_num > 0` means using "multi-thread & buffer" mode. 
   `buffer_num = 0` or not set means using "single-thread" mode(the origin mode).
   (`buffer_num = 1` is hard to define. We should avoid setting buffer_num to 1.)
   
   So we can use 
   ```
   LOAD LABEL ...
           PROPERTIES 
           ( 
           "buffer_num"="5", 
           "mem_limit_per_buf"="5368709120", 
           "size_limit_per_buf"="62914560"
           );
   ```
   
   # A Test Case
   cluster: 5 be
   {"ScannedRows":895737370,"TaskNumber":1,"FileNumber":300,"FileSize":60216822679} // 56G
   
   ## origin ver
   | LoadStartTime       | LoadFinishTime      |
   |----|----|
    | 2020-02-19 20:44:00 | 2020-02-20 00:35:01 |
   
   ## multi-thread ver
   buffer_num/mem_limit_per_buf/size_limit_per_buf = 5/1G/30M
   | LoadStartTime       | LoadFinishTime      |
   |----|----|
    | 2020-02-18 17:57:38 | 2020-02-18 20:05:13 |
   
   buffer_num/mem_limit_per_buf/size_limit_per_buf = 5/5G/30M
   | LoadStartTime       | LoadFinishTime      |
   |----|----|
    | 2020-02-18 20:14:58 | 2020-02-18 22:19:54 |
   
   buffer_num/mem_limit_per_buf/size_limit_per_buf = 5/5G/60M
   | LoadStartTime       | LoadFinishTime      |
   |----|----|
    | 2020-02-18 22:28:08 | 2020-02-19 00:36:28 |
   
   ## Test Analysis
   56G broker load, the origin ver will run for 4 hours, the multi-ver can halve the time.
   But increasing mem_limit & size_limit doesn't do much good.(not bad, cause we can only use minimum mem,  the more memory is waste. 😀 )
   
   1. Because we use modular hashing(node_id%buffer_num), the buffer_num should <= be_num.
   buffer_num=be_num is ideal.
   1. In single-thread ver, we only have a thread to add_batch/wait_packet. In multi-thread ver, we have buffer_num thread to do them. But the add_batch/wait_packet in one thread are in sequence, can't save time in one thread. Thus, the `actual consume` time is the ideal of load run time. For example, 
   `mem_handle: 3506.55s, deep_copy: 22.1017s, spsc push block if full: 7.81192s, consume: 7298.54s, actual consume: 7285s.` means the ideal run time is 7285s, our configuration can make the process 7298s. It means little to further optimize.
   
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] gaodayue commented on issue #2780: OlapTableSink::send is low efficient?

Posted by GitBox <gi...@apache.org>.
gaodayue commented on issue #2780: OlapTableSink::send is low efficient?
URL: https://github.com/apache/incubator-doris/issues/2780#issuecomment-597489601
 
 
   The new non-blocking approach looks good. It should improve the overall load performance particularly in case where you have some slower receiver nodes.
   
   BTW you should cap the total number of pending batches in order to control the memory usage.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] vagetablechicken edited a comment on issue #2780: OlapTableSink::send is low efficient?

Posted by GitBox <gi...@apache.org>.
vagetablechicken edited a comment on issue #2780: OlapTableSink::send is low efficient?
URL: https://github.com/apache/incubator-doris/issues/2780#issuecomment-575530354
 
 
   @morningman 
   That means the memory usage of a row batch may be great?  So thus, we should stop pushing more batch as follows:
   https://github.com/apache/incubator-doris/blob/8df63bc191fd123df5986084c1e453db732fda29/be/src/exec/broker_scan_node.cpp#L366-L378
   
   I think even if in the single-thread model,  it's more than 1 row batch in memory.
   Let's consider the worst-case scenarios. 
   
   ### The single-thread model
   
   Divide into three roles:
   
   **1 scanner (holds one _batch_queue)**:
   
   The mem usage of _batch_queue is denoted by Size<sub>queue</sub>.
   _batch_queue is not empty and memory exceed limit(aka mem_limit), thus
   
   Size<sub>queue</sub> > mem_limit
   
   **1 plan_fragment_executor (holds one _row_batch)**:
   _row_batch comes from _batch_queue, denoted by Size<sub>batch</sub>. If it's too large, we can assume:
   
   Size<sub>batch</sub> > mem_limit
   
   **N<sub>nc</sub> NodeChannels(holds deep_copy rows, aka "_batch")**
   
   If "_batch"s in all channels are near full capacity, every channel has batch_size-1 rows, the mem usage is approximately equal to Size<sub>batch</sub>.
   The mem usage of all NodeChannels is N<sub>be</sub>*Size<sub>batch</sub>.
   
   So, Mem<sub>worst</sub> = Size<sub>queue</sub>+(1+N<sub>nc</sub>)*Size<sub>batch</sub>.
   
   Mem<sub>worst</sub> > (2+N<sub>nc</sub>)*mem_limit.
   
   ### The multi-thread model
   
   If we use a thread pool or whatever to do data sending, we should add a new role, sender. The best parallelism is that one sender is in charge of one channel.
   
   
   **1 scanner (holds one _batch_queue)** SAME
   
   **1 plan_fragment_executor (holds one _row_batch)** SAME
   
   **N<sub>nc</sub> send_worker**
   send_workers will buffer rows, this should be strictly controlled of mem usage.
   The mem usage of one sender is denoted by Size<sub>buf</sub>.
   The sum is N<sub>nc</sub>*Size<sub>buf</sub>.
   
   **N<sub>ch</sub> NodeChannels(holds deep_copy rows, aka "_batch")** SAME
   
   So, Mem<sub>worst</sub> = Size<sub>queue</sub>+(1+N<sub>nc</sub>)*Size<sub>batch</sub>+N<sub>nc</sub>*Size<sub>buf</sub>.
   Mem<sub>worst</sub> > (2+N<sub>nc</sub>)*mem_limit + N<sub>nc</sub>*Size<sub>buf</sub>
   
   If Size<sub>buf</sub> is significantly smaller than mem_limit, it can be ignored.
   If Size<sub>buf</sub> is smaller than the bytes per row, it still might bring some benefits, cause it improves the parallel degree of wait_in_flight_packet.
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] vagetablechicken edited a comment on issue #2780: OlapTableSink::send is low efficient?

Posted by GitBox <gi...@apache.org>.
vagetablechicken edited a comment on issue #2780: OlapTableSink::send is low efficient?
URL: https://github.com/apache/incubator-doris/issues/2780#issuecomment-575530354
 
 
   @morningman 
   That means the memory usage of a row batch may be great?  So thus, we should stop pushing more batch as follows:
   https://github.com/apache/incubator-doris/blob/8df63bc191fd123df5986084c1e453db732fda29/be/src/exec/broker_scan_node.cpp#L366-L378
   
   I think even if in the single-thread model,  it's more than 1 row batch in memory.
   Let's consider the worst-case scenarios. 
   
   ### The single-thread model
   
   Divide into three roles:
   
   **1 scanner (holds one _batch_queue)**:
   
   The mem usage of _batch_queue is denoted by Size<sub>queue</sub>.
   _batch_queue is not empty and memory exceed limit(aka mem_limit), thus
   
   Size<sub>queue</sub> > mem_limit
   
   **1 plan_fragment_executor (holds one _row_batch)**:
   _row_batch comes from _batch_queue, denoted by Size<sub>batch</sub>. If it's too large, we can assume:
   
   Size<sub>batch</sub> > mem_limit
   
   **N<sub>nc</sub> NodeChannels(holds deep_copy rows, aka "_batch")**
   
   If "_batch"s in all channels are near full capacity, every channel has batch_size-1 rows, the mem usage is approximately equal to Size<sub>batch</sub>.
   The mem usage of all NodeChannels is N<sub>nc</sub>*Size<sub>batch</sub>.
   
   So, Mem<sub>worst</sub> = Size<sub>queue</sub>+(1+N<sub>nc</sub>)*Size<sub>batch</sub>.
   
   Mem<sub>worst</sub> > (2+N<sub>nc</sub>)*mem_limit.
   
   ### The multi-thread model
   
   If we use a thread pool or whatever to do data sending, we should add a new role, sender. The best parallelism is that one sender is in charge of one channel.
   
   
   **1 scanner (holds one _batch_queue)** SAME
   
   **1 plan_fragment_executor (holds one _row_batch)** SAME
   
   **N<sub>nc</sub> send_worker**
   send_workers will buffer rows, this should be strictly controlled of mem usage.
   The mem usage of one sender is denoted by Size<sub>buf</sub>.
   The sum is N<sub>nc</sub>*Size<sub>buf</sub>.
   
   **N<sub>ch</sub> NodeChannels(holds deep_copy rows, aka "_batch")** SAME
   
   So, Mem<sub>worst</sub> = Size<sub>queue</sub>+(1+N<sub>nc</sub>)*Size<sub>batch</sub>+N<sub>nc</sub>*Size<sub>buf</sub>.
   Mem<sub>worst</sub> > (2+N<sub>nc</sub>)*mem_limit + N<sub>nc</sub>*Size<sub>buf</sub>
   
   If Size<sub>buf</sub> is significantly smaller than mem_limit, it can be ignored.
   If Size<sub>buf</sub> is smaller than the bytes per row, it still might bring some benefits, cause it improves the parallel degree of wait_in_flight_packet.
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] vagetablechicken commented on issue #2780: OlapTableSink::send is low efficient?

Posted by GitBox <gi...@apache.org>.
vagetablechicken commented on issue #2780: OlapTableSink::send is low efficient?
URL: https://github.com/apache/incubator-doris/issues/2780#issuecomment-588156273
 
 
   # OlapTableSink Multithreading Solution Design
   
   ## single-thread model (original model)
   ![image](https://user-images.githubusercontent.com/24697960/74130744-bbfe7700-4c1d-11ea-89e0-f6e9748bafe0.png)
   
   ## multi-thread model
   ![image](https://user-images.githubusercontent.com/24697960/74131130-a2116400-4c1e-11ea-9696-3934555e870b.png)
   
   When we create an OlapTableSink, prepare N buffers and N send threads.
    
   If a row needs to be added to NodeChannel(node_id=A, only consider node_id),  the main thread copy it to buffer A%N. 
   
   We can limit the buffer size(use MemTracker), the mem_limit is configurable. If a buffer is full, we block adding rows to this buffer , until all rows in buffer have been consumed. 
   
   ### Extra cost of multi-thread ver 
   thread: buffer_num
   mem: buffer_num*(mem_limit+buffer_running_mem)
   
   ### single/multi version switch
   The acceleration effect is evident for large data imports. But no need for every OlapTableSink.
   So I design it:
   We can select origin(single-thread) version or multi-thread version by set broker load property, and configure the buffer_num & mem_limit_per_buffer.
   We add fields to TOlapTableSink, like
   ```
   struct TOlapTableSink {
       ...
       14: optional i64 load_channel_timeout_s // the timeout of load channels in second
       15: optional i32 buffer_num
       16: optional i64 mem_limit_per_buf
       17: optional i64 size_limit_per_buf
   }
   ```
   `buffer_num > 0` means using "multi-thread & buffer" mode. 
   `buffer_num = 0` or not set means using "single-thread" mode(the origin mode).
   (`buffer_num = 1` is hard to define. We should avoid setting buffer_num to 1.)
   
   So we can use 
   ```
   LOAD LABEL ...
           PROPERTIES 
           ( 
           "buffer_num"="5", 
           "mem_limit_per_buf"="5368709120", 
           "size_limit_per_buf"="62914560"
           );
   ```
   
   # A Test Case
   cluster: 5 be
   {"ScannedRows":895737370,"TaskNumber":1,"FileNumber":300,"FileSize":60216822679} // 56G
   
   origin ver
   | LoadStartTime       | LoadFinishTime      |
   |----|----|
   
   - [ ] TODO
   
   multi-thread ver
   buffer_num/mem_limit_per_buf/size_limit_per_buf = 5/1G/30M
   | LoadStartTime       | LoadFinishTime      |
   |----|----|
    | 2020-02-18 17:57:38 | 2020-02-18 20:05:13 |
   
   buffer_num/mem_limit_per_buf/size_limit_per_buf = 5/5G/30M
   | LoadStartTime       | LoadFinishTime      |
   |----|----|
    | 2020-02-18 20:14:58 | 2020-02-18 22:19:54 |
   
   buffer_num/mem_limit_per_buf/size_limit_per_buf = 5/5G/60M
   | LoadStartTime       | LoadFinishTime      |
   |----|----|
    | 2020-02-18 22:28:08 | 2020-02-19 00:36:28 |
   
   - [ ] TODO test analysis

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org