You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@impala.apache.org by "Michael Ho (JIRA)" <ji...@apache.org> on 2017/10/25 01:16:00 UTC
[jira] [Resolved] (IMPALA-6041) RowBatch is serialized once per
destination channel for Broadcast exchange
[ https://issues.apache.org/jira/browse/IMPALA-6041?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Michael Ho resolved IMPALA-6041.
--------------------------------
Resolution: Fixed
Fix Version/s: Not Applicable
The code which contains this problem was never merged upstream.
> RowBatch is serialized once per destination channel for Broadcast exchange
> ---------------------------------------------------------------------------
>
> Key: IMPALA-6041
> URL: https://issues.apache.org/jira/browse/IMPALA-6041
> Project: IMPALA
> Issue Type: Sub-task
> Components: Distributed Exec
> Reporter: Mostafa Mokhtar
> Assignee: Michael Ho
> Priority: Blocker
> Fix For: Not Applicable
>
>
> While running scalability tests using KRPC I noticed that broadcast operations where the source file exists on one node are very slow, compared to without KRPC.
> Further investigation showed that the RowBatch is serialized once per channel for KRPC wasting lots of CPU.
> Baseline code
> {code}
> // current_thrift_batch_ is *not* the one that was written by the last call
> // to Serialize()
> RETURN_IF_ERROR(SerializeBatch(batch, current_thrift_batch_, channels_.size()));
> // SendBatch() will block if there are still in-flight rpcs (and those will
> // reference the previously written thrift batch)
> for (int i = 0; i < channels_.size(); ++i) {
> RETURN_IF_ERROR(channels_[i]->SendBatch(current_thrift_batch_));
> }
> {code}
> KRPC code
> {code}
> if (partition_type_ == TPartitionType::UNPARTITIONED || channels_.size() == 1) {
> // SendBatch() will block if there are still in-flight rpcs (and those will
> // reference the previously written thrift batch)
> for (int i = 0; i < channels_.size(); ++i) {
> RETURN_IF_ERROR(channels_[i]->SendBatch(batch));
> }
> }
> {code}
> KRPC broadcast fragment
> {code}
> Averaged Fragment F01:(Total: 20s344ms, non-child: 494.783ms, % non-child: 2.43%)
> split sizes: min: 36.65 MB, max: 36.65 MB, avg: 36.65 MB, stddev: 0
> completion times: min:20s353ms max:20s353ms mean: 20s353ms stddev:0.000ns
> execution rates: min:1.80 MB/sec max:1.80 MB/sec mean:1.80 MB/sec stddev:0.02 B/sec
> num instances: 1
> - AverageThreadTokens: 1.70
> - BloomFilterBytes: 0
> - PeakMemoryUsage: 52.35 MB (54889274)
> - PeakReservation: 0
> - PeakUsedReservation: 0
> - PerHostPeakMemUsage: 175.82 MB (184360081)
> - RowsProduced: 402.00K (402000)
> - TotalNetworkReceiveTime: 0.000ns
> - TotalNetworkSendTime: 134.533ms
> - TotalStorageWaitTime: 18.921ms
> - TotalThreadsInvoluntaryContextSwitches: 2.18K (2177)
> - TotalThreadsTotalWallClockTime: 34s438ms
> - TotalThreadsSysTime: 247.962ms
> - TotalThreadsUserTime: 20s127ms
> - TotalThreadsVoluntaryContextSwitches: 410 (410)
> Fragment Instance Lifecycle Timings:
> - ExecTime: 20s313ms
> - ExecTreeExecTime: 44.718ms
> - OpenTime: 12.231ms
> - ExecTreeOpenTime: 28.946us
> - PrepareTime: 18.960ms
> - ExecTreePrepareTime: 126.983us
> KrpcDataStreamSender (dst_id=7):(Total: 19s776ms, non-child: 19s776ms, % non-child: 100.00%)
> - BytesSent: 3.63 GB (3902930720)
> - OverallThroughput: 188.21 MB/sec
> - PeakMemoryUsage: 405.23 KB (414960)
> - RowsReturned: 402.00K (402000)
> - RpcRetry: 0 (0)
> - SerializeBatchTime: 19s771ms
> - UncompressedRowBatchSize: 7.85 GB (8427946930)
> {code}
> Baseline fragment
> {code}
> Averaged Fragment F01:(Total: 3s440ms, non-child: 1s331ms, % non-child: 38.71%)
> split sizes: min: 36.65 MB, max: 36.65 MB, avg: 36.65 MB, stddev: 0
> completion times: min:3s449ms max:3s449ms mean: 3s449ms stddev:0.000ns
> execution rates: min:10.62 MB/sec max:10.62 MB/sec mean:10.62 MB/sec stddev:0.70 B/sec
> num instances: 1
> - AverageThreadTokens: 1.71
> - BloomFilterBytes: 0
> - PeakMemoryUsage: 52.35 MB (54889274)
> - PeakReservation: 0
> - PeakUsedReservation: 0
> - PerHostPeakMemUsage: 175.82 MB (184362391)
> - RowsProduced: 402.00K (402000)
> - TotalNetworkReceiveTime: 0.000ns
> - TotalNetworkSendTime: 2s538ms
> - TotalStorageWaitTime: 19.156ms
> - TotalThreadsInvoluntaryContextSwitches: 673 (673)
> - TotalThreadsTotalWallClockTime: 5s778ms
> - TotalThreadsSysTime: 265.959ms
> - TotalThreadsUserTime: 760.884ms
> - TotalThreadsVoluntaryContextSwitches: 3.10K (3104)
> Fragment Instance Lifecycle Timings:
> - ExecTime: 3s396ms
> - ExecTreeExecTime: 47.197ms
> - OpenTime: 13.312ms
> - ExecTreeOpenTime: 31.004us
> - PrepareTime: 31.097ms
> - ExecTreePrepareTime: 112.645us
> DataStreamSender (dst_id=7):(Total: 2s034ms, non-child: 2s034ms, % non-child: 100.00%)
> - BytesSent: 3.63 GB (3902930720)
> - NetworkThroughput(*): 2.40 GB/sec
> - OverallThroughput: 1.79 GB/sec
> - PeakMemoryUsage: 405.23 KB (414960)
> - RowsReturned: 402.00K (402000)
> - SerializeBatchTime: 507.673ms
> - TransmitDataRPCTime: 1s512ms
> - UncompressedRowBatchSize: 7.85 GB (8427946930)
> {code}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)