You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Haibo Sun (JIRA)" <ji...@apache.org> on 2019/07/29 03:37:00 UTC

[jira] [Commented] (FLINK-12818) Improve stability of twoInputMapSink benchmark

    [ https://issues.apache.org/jira/browse/FLINK-12818?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16894906#comment-16894906 ] 

Haibo Sun commented on FLINK-12818:
-----------------------------------

Hi [~pnowojski],

The benchmark on `TwoInputSelectableStreamTask`/`StreamTwoInputSelectableProcessor` was also unstable, and the original expectation of stabilizing was broken.


I made some other attempts, including upgrading `JDK 1.8` to the latest version "1.8.0_212", closing the hyper-threading of CPUs, and disabling checkpointing, but the benchmark is still unstable. After using VTune for analysis, it was found that the slow JVM-fork was more time-consuming than the fast one, mainly in the `RecordWriter#emit()` method (the stack information is shown in the following figure). I suspect this is related to the cache miss of CPU. After disabling checkpointing and adjusting the settings by the following code, the benchmark becomes stable, but it becomes unstable once checkpointing is enabled.

 

*Code of Class FlinkEnvironmentContext :*
{code:java}
public class FlinkEnvironmentContext {
    public StreamExecutionEnvironment env;

    private final int parallelism = 1;
    private final boolean objectReuse = true;

    @Setup
    public void setUp() throws IOException {
        Configuration configuration = new Configuration();
        configuration.setInteger(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_PER_CHANNEL, 2);
        configuration.setInteger(NettyShuffleEnvironmentOptions.NETWORK_EXTRA_BUFFERS_PER_GATE, 0);
        configuration.setString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN, "1mb");

        env = StreamExecutionEnvironment.createLocalEnvironment(parallelism, configuration);

        // set up the execution environment
        env.setParallelism(parallelism);
        env.getConfig().disableSysoutLogging();
        if (objectReuse) {
            env.getConfig().enableObjectReuse();
        }

        env.setStateBackend(new MemoryStateBackend());
    }

    public void execute() throws Exception {
        env.execute();
    }

}{code}
 

*Call Stack of RecordWriter#emit() :*

!RecordWriter-emit.png!

 

> Improve stability of twoInputMapSink benchmark
> ----------------------------------------------
>
>                 Key: FLINK-12818
>                 URL: https://issues.apache.org/jira/browse/FLINK-12818
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Benchmarks
>            Reporter: Piotr Nowojski
>            Priority: Critical
>         Attachments: RecordWriter-emit.png
>
>
> The {{twoInputMapSink}} benchmark is very unstable over time:
> http://codespeed.dak8s.net:8000/timeline/#/?exe=1&ben=twoInputMapSink&env=2&revs=200&equid=off&quarts=on&extr=on
> It should be fixed, otherwise the benchmark can not be used.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)