You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Stephan Ewen (Jira)" <ji...@apache.org> on 2019/09/19 12:09:00 UTC

[jira] [Resolved] (FLINK-11859) Improve SpanningRecordSerializer performance by serializing record length to serialization buffer directly

     [ https://issues.apache.org/jira/browse/FLINK-11859?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Stephan Ewen resolved FLINK-11859.
----------------------------------
    Resolution: Fixed

Fixed via 35e57a8460c7f03010972f587bb24052ea694cce

> Improve SpanningRecordSerializer performance by serializing record length to serialization buffer directly
> ----------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-11859
>                 URL: https://issues.apache.org/jira/browse/FLINK-11859
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Network
>            Reporter: Yingjie Cao
>            Assignee: Yingjie Cao
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.10.0
>
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> In the current implementation of SpanningRecordSerializer, the length of a record is serialized to an intermediate length buffer and then copied to the target buffer. Actually, the length filed can be serialized directly to the data buffer (serializationBuffer), which can avoid the copy of length buffer. Though the total bytes copied remain unchanged, it one copy of a small record which incurs high overhead. The flink-benchmarks shows it can improve performance and the test results are as follows.
> Result with the optimization:
> |Benchmark|Mode|Threads|Samples|Score|Score Error (99.9%)|Unit|Param: channelsFlushTimeout|Param: stateBackend|
> |KeyByBenchmarks.arrayKeyBy|thrpt|1|30|2228.049605|77.631804|ops/ms| | |
> |KeyByBenchmarks.tupleKeyBy|thrpt|1|30|3968.361739|193.501755|ops/ms| | |
> |MemoryStateBackendBenchmark.stateBackends|thrpt|1|30|3030.016702|29.272713|ops/ms| |MEMORY|
> |MemoryStateBackendBenchmark.stateBackends|thrpt|1|30|2754.77678|26.215395|ops/ms| |FS|
> |MemoryStateBackendBenchmark.stateBackends|thrpt|1|30|3001.957606|29.288019|ops/ms| |FS_ASYNC|
> |RocksStateBackendBenchmark.stateBackends|thrpt|1|30|123.698984|3.339233|ops/ms| |ROCKS|
> |RocksStateBackendBenchmark.stateBackends|thrpt|1|30|126.252137|1.137735|ops/ms| |ROCKS_INC|
> |SerializationFrameworkMiniBenchmarks.serializerAvro|thrpt|1|30|323.658098|5.855697|ops/ms| | |
> |SerializationFrameworkMiniBenchmarks.serializerKryo|thrpt|1|30|183.34423|3.710787|ops/ms| | |
> |SerializationFrameworkMiniBenchmarks.serializerPojo|thrpt|1|30|404.380233|5.131744|ops/ms| | |
> |SerializationFrameworkMiniBenchmarks.serializerRow|thrpt|1|30|527.193369|10.176726|ops/ms| | |
> |SerializationFrameworkMiniBenchmarks.serializerTuple|thrpt|1|30|550.073024|11.724412|ops/ms| | |
> |StreamNetworkBroadcastThroughputBenchmarkExecutor.networkBroadcastThroughput|thrpt|1|30|564.690627|13.766809|ops/ms| | |
> |StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|49918.11806|2324.234776|ops/ms|100,100ms| |
> |StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|10443.63491|315.835962|ops/ms|100,100ms,SSL| |
> |StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|21387.47608|2779.832704|ops/ms|1000,1ms| |
> |StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|26585.85453|860.243347|ops/ms|1000,100ms| |
> |StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|8252.563405|947.129028|ops/ms|1000,100ms,SSL| |
> |SumLongsBenchmark.benchmarkCount|thrpt|1|30|8806.021402|263.995836|ops/ms| | |
> |WindowBenchmarks.globalWindow|thrpt|1|30|4573.620126|112.099391|ops/ms| | |
> |WindowBenchmarks.sessionWindow|thrpt|1|30|585.246412|7.026569|ops/ms| | |
> |WindowBenchmarks.slidingWindow|thrpt|1|30|449.302134|4.123669|ops/ms| | |
> |WindowBenchmarks.tumblingWindow|thrpt|1|30|2979.806858|33.818909|ops/ms| | |
> |StreamNetworkLatencyBenchmarkExecutor.networkLatency1to1|avgt|1|30|12.842865|0.13796|ms/op| | |
> Result without the optimization:
>  
> |Benchmark|Mode|Threads|Samples|Score|Score Error (99.9%)|Unit|Param: channelsFlushTimeout|Param: stateBackend|
> |KeyByBenchmarks.arrayKeyBy|thrpt|1|30|2060.241715|59.898485|ops/ms| | |
> |KeyByBenchmarks.tupleKeyBy|thrpt|1|30|3645.306819|223.821719|ops/ms| | |
> |MemoryStateBackendBenchmark.stateBackends|thrpt|1|30|2992.698822|36.978115|ops/ms| |MEMORY|
> |MemoryStateBackendBenchmark.stateBackends|thrpt|1|30|2756.10949|27.798937|ops/ms| |FS|
> |MemoryStateBackendBenchmark.stateBackends|thrpt|1|30|2965.969876|44.159793|ops/ms| |FS_ASYNC|
> |RocksStateBackendBenchmark.stateBackends|thrpt|1|30|125.506942|1.245978|ops/ms| |ROCKS|
> |RocksStateBackendBenchmark.stateBackends|thrpt|1|30|127.258737|1.190588|ops/ms| |ROCKS_INC|
> |SerializationFrameworkMiniBenchmarks.serializerAvro|thrpt|1|30|316.497954|8.309241|ops/ms| | |
> |SerializationFrameworkMiniBenchmarks.serializerKryo|thrpt|1|30|189.065149|6.302073|ops/ms| | |
> |SerializationFrameworkMiniBenchmarks.serializerPojo|thrpt|1|30|391.51305|7.750728|ops/ms| | |
> |SerializationFrameworkMiniBenchmarks.serializerRow|thrpt|1|30|513.611151|10.640899|ops/ms| | |
> |SerializationFrameworkMiniBenchmarks.serializerTuple|thrpt|1|30|534.184947|14.370082|ops/ms| | |
> |StreamNetworkBroadcastThroughputBenchmarkExecutor.networkBroadcastThroughput|thrpt|1|30|483.388618|19.506723|ops/ms| | |
> |StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|42777.70615|4981.87539|ops/ms|100,100ms| |
> |StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|10201.48525|286.248845|ops/ms|100,100ms,SSL| |
> |StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|20788.34364|1146.470652|ops/ms|1000,1ms| |
> |StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|24412.00941|981.98882|ops/ms|1000,100ms| |
> |StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|8284.336114|177.482373|ops/ms|1000,100ms,SSL| |
> |SumLongsBenchmark.benchmarkCount|thrpt|1|30|7846.800667|127.321584|ops/ms| | |
> |WindowBenchmarks.globalWindow|thrpt|1|30|4837.270101|94.519852|ops/ms| | |
> |WindowBenchmarks.sessionWindow|thrpt|1|30|591.304589|5.324132|ops/ms| | |
> |WindowBenchmarks.slidingWindow|thrpt|1|30|446.605784|2.53677|ops/ms| | |
> |WindowBenchmarks.tumblingWindow|thrpt|1|30|2878.885056|64.035709|ops/ms| | |
> |StreamNetworkLatencyBenchmarkExecutor.networkLatency1to1|avgt|1|30|12.705601|0.164959|ms/op| | |
>  
> The optimization is especially useful for small records.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)