You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Brice Bingman (JIRA)" <ji...@apache.org> on 2017/06/22 20:54:00 UTC

[jira] [Created] (FLINK-6990) Poor performance with Sliding Time Windows

Brice Bingman created FLINK-6990:
------------------------------------

             Summary: Poor performance with Sliding Time Windows
                 Key: FLINK-6990
                 URL: https://issues.apache.org/jira/browse/FLINK-6990
             Project: Flink
          Issue Type: Improvement
    Affects Versions: 1.3.0
         Environment: OSX 10.11.4
2.8 GHz Intel Core i7
16 GB 1600 MHz DDR3
            Reporter: Brice Bingman


I'm experiencing poor performance when using sliding time windows.  Here is a simple example that performs poorly for me:

{code:java}
public class FlinkPerfTest {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
        //Streaming 10,000 events per second
        see.addSource(new SourceFunction<TestObject>() {

            transient ScheduledExecutorService executor;

            @Override
            public synchronized void run(final SourceContext<TestObject> ctx) throws Exception {
                executor = Executors.newSingleThreadScheduledExecutor();
                executor.scheduleAtFixedRate(new Runnable() {

                    @Override
                    public void run() {
                        for (int k = 0; k < 10; k++) {
                            for (int i = 0; i < 1000; i++) {
                                TestObject obj = new TestObject();
                                obj.setKey(k);
                                ctx.collect(obj);
                            }
                        }
                    }
                }, 0, 1, TimeUnit.SECONDS);
                this.wait();
            }

            @Override
            public synchronized void cancel() {
                executor.shutdown();
                this.notify();
            }
        }).keyBy("key")
        .window(SlidingProcessingTimeWindows.of(Time.minutes(10), Time.seconds(1))).apply(new WindowFunction<TestObject, String, Tuple, TimeWindow>() {

            @Override
            public void apply(Tuple key, TimeWindow window, Iterable<TestObject> input, Collector<String> out) throws Exception {
                int count = 0;
                for (Object obj : input) {
                    count++;
                }
                out.collect(key.getField(0) + ": " + count);
            }
        })
        .print();
        see.execute();
    }

    public static class TestObject {
        private Integer key;

        public Integer getKey() {
            return key;
        }

        public void setKey(Integer key) {
            this.key = key;
        }

    }

}
{code}

When running this, flink periodically pauses for long periods of time.  I would expect a steady stream of output at 1 second intervals.  For comparison, you can switch to a count window of similar size which peforms just fine:

{code:java}
   .countWindow(600000, 1000).apply(new WindowFunction<FlinkPerfTest.TestObject, String, Tuple, GlobalWindow>() {

                    @Override
                    public void apply(Tuple key, GlobalWindow window, Iterable<TestObject> input, Collector<String> out) throws Exception {
                        int count = 0;
                        for (Object obj : input) {
                            count++;
                        }
                        out.collect(key.getField(0) + ": " + count);
                    }
                })
{code}

I would expect the sliding time window to perform similarly to a count window.  The sliding time window also uses significantly more cpu and memory than the count window.  I would also expect resource consumption to be similar.

A possible cause could be that the SystemProcessingTimeService.TriggerTask is locking with the checkpointLock which acts like a global lock.  There should be a lock per key or preferably a lock-less solution.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)