You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@impala.apache.org by "Michael Ho (JIRA)" <ji...@apache.org> on 2017/10/25 01:16:00 UTC

[jira] [Resolved] (IMPALA-6041) RowBatch is serialized once per destination channel for Broadcast exchange

     [ https://issues.apache.org/jira/browse/IMPALA-6041?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Michael Ho resolved IMPALA-6041.
--------------------------------
       Resolution: Fixed
    Fix Version/s: Not Applicable

The code which contains this problem was never merged upstream.

> RowBatch is serialized once per destination channel for Broadcast exchange 
> ---------------------------------------------------------------------------
>
>                 Key: IMPALA-6041
>                 URL: https://issues.apache.org/jira/browse/IMPALA-6041
>             Project: IMPALA
>          Issue Type: Sub-task
>          Components: Distributed Exec
>            Reporter: Mostafa Mokhtar
>            Assignee: Michael Ho
>            Priority: Blocker
>             Fix For: Not Applicable
>
>
> While running scalability tests using KRPC I noticed that broadcast operations where the source file exists on one node are very slow, compared to without KRPC. 
> Further investigation showed that the RowBatch is serialized once per channel for KRPC wasting lots of CPU. 
> Baseline code
> {code}
>    // current_thrift_batch_ is *not* the one that was written by the last call
>     // to Serialize()
>     RETURN_IF_ERROR(SerializeBatch(batch, current_thrift_batch_, channels_.size()));
>     // SendBatch() will block if there are still in-flight rpcs (and those will
>     // reference the previously written thrift batch)
>     for (int i = 0; i < channels_.size(); ++i) {
>       RETURN_IF_ERROR(channels_[i]->SendBatch(current_thrift_batch_));
>     }
> {code}
> KRPC code
> {code}
>   if (partition_type_ == TPartitionType::UNPARTITIONED || channels_.size() == 1) {
>     // SendBatch() will block if there are still in-flight rpcs (and those will
>     // reference the previously written thrift batch)
>     for (int i = 0; i < channels_.size(); ++i) {
>       RETURN_IF_ERROR(channels_[i]->SendBatch(batch));
>     }
>   }
> {code}
> KRPC broadcast fragment 
> {code}
> Averaged Fragment F01:(Total: 20s344ms, non-child: 494.783ms, % non-child: 2.43%)
>       split sizes:  min: 36.65 MB, max: 36.65 MB, avg: 36.65 MB, stddev: 0
>       completion times: min:20s353ms  max:20s353ms  mean: 20s353ms  stddev:0.000ns
>       execution rates: min:1.80 MB/sec  max:1.80 MB/sec  mean:1.80 MB/sec  stddev:0.02 B/sec
>       num instances: 1
>        - AverageThreadTokens: 1.70 
>        - BloomFilterBytes: 0
>        - PeakMemoryUsage: 52.35 MB (54889274)
>        - PeakReservation: 0
>        - PeakUsedReservation: 0
>        - PerHostPeakMemUsage: 175.82 MB (184360081)
>        - RowsProduced: 402.00K (402000)
>        - TotalNetworkReceiveTime: 0.000ns
>        - TotalNetworkSendTime: 134.533ms
>        - TotalStorageWaitTime: 18.921ms
>        - TotalThreadsInvoluntaryContextSwitches: 2.18K (2177)
>        - TotalThreadsTotalWallClockTime: 34s438ms
>          - TotalThreadsSysTime: 247.962ms
>          - TotalThreadsUserTime: 20s127ms
>        - TotalThreadsVoluntaryContextSwitches: 410 (410)
>       Fragment Instance Lifecycle Timings:
>          - ExecTime: 20s313ms
>            - ExecTreeExecTime: 44.718ms
>          - OpenTime: 12.231ms
>            - ExecTreeOpenTime: 28.946us
>          - PrepareTime: 18.960ms
>            - ExecTreePrepareTime: 126.983us
>       KrpcDataStreamSender (dst_id=7):(Total: 19s776ms, non-child: 19s776ms, % non-child: 100.00%)
>          - BytesSent: 3.63 GB (3902930720)
>          - OverallThroughput: 188.21 MB/sec
>          - PeakMemoryUsage: 405.23 KB (414960)
>          - RowsReturned: 402.00K (402000)
>          - RpcRetry: 0 (0)
>          - SerializeBatchTime: 19s771ms
>          - UncompressedRowBatchSize: 7.85 GB (8427946930)
> {code}
> Baseline fragment
> {code}
> Averaged Fragment F01:(Total: 3s440ms, non-child: 1s331ms, % non-child: 38.71%)
>       split sizes:  min: 36.65 MB, max: 36.65 MB, avg: 36.65 MB, stddev: 0
>       completion times: min:3s449ms  max:3s449ms  mean: 3s449ms  stddev:0.000ns
>       execution rates: min:10.62 MB/sec  max:10.62 MB/sec  mean:10.62 MB/sec  stddev:0.70 B/sec
>       num instances: 1
>        - AverageThreadTokens: 1.71 
>        - BloomFilterBytes: 0
>        - PeakMemoryUsage: 52.35 MB (54889274)
>        - PeakReservation: 0
>        - PeakUsedReservation: 0
>        - PerHostPeakMemUsage: 175.82 MB (184362391)
>        - RowsProduced: 402.00K (402000)
>        - TotalNetworkReceiveTime: 0.000ns
>        - TotalNetworkSendTime: 2s538ms
>        - TotalStorageWaitTime: 19.156ms
>        - TotalThreadsInvoluntaryContextSwitches: 673 (673)
>        - TotalThreadsTotalWallClockTime: 5s778ms
>          - TotalThreadsSysTime: 265.959ms
>          - TotalThreadsUserTime: 760.884ms
>        - TotalThreadsVoluntaryContextSwitches: 3.10K (3104)
>       Fragment Instance Lifecycle Timings:
>          - ExecTime: 3s396ms
>            - ExecTreeExecTime: 47.197ms
>          - OpenTime: 13.312ms
>            - ExecTreeOpenTime: 31.004us
>          - PrepareTime: 31.097ms
>            - ExecTreePrepareTime: 112.645us
>       DataStreamSender (dst_id=7):(Total: 2s034ms, non-child: 2s034ms, % non-child: 100.00%)
>          - BytesSent: 3.63 GB (3902930720)
>          - NetworkThroughput(*): 2.40 GB/sec
>          - OverallThroughput: 1.79 GB/sec
>          - PeakMemoryUsage: 405.23 KB (414960)
>          - RowsReturned: 402.00K (402000)
>          - SerializeBatchTime: 507.673ms
>          - TransmitDataRPCTime: 1s512ms
>          - UncompressedRowBatchSize: 7.85 GB (8427946930)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)