You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Robert Metzger (JIRA)" <ji...@apache.org> on 2017/06/23 12:12:00 UTC

[jira] [Updated] (FLINK-6990) Poor performance with Sliding Time Windows

     [ https://issues.apache.org/jira/browse/FLINK-6990?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Robert Metzger updated FLINK-6990:
----------------------------------
    Component/s: Streaming
                 DataStream API

> Poor performance with Sliding Time Windows
> ------------------------------------------
>
>                 Key: FLINK-6990
>                 URL: https://issues.apache.org/jira/browse/FLINK-6990
>             Project: Flink
>          Issue Type: Improvement
>          Components: DataStream API, Streaming
>    Affects Versions: 1.3.0
>         Environment: OSX 10.11.4
> 2.8 GHz Intel Core i7
> 16 GB 1600 MHz DDR3
>            Reporter: Brice Bingman
>
> I'm experiencing poor performance when using sliding time windows.  Here is a simple example that performs poorly for me:
> {code:java}
> public class FlinkPerfTest {
>     public static void main(String[] args) throws Exception {
>         StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
>         //Streaming 10,000 events per second
>         see.addSource(new SourceFunction<TestObject>() {
>             transient ScheduledExecutorService executor;
>             @Override
>             public synchronized void run(final SourceContext<TestObject> ctx) throws Exception {
>                 executor = Executors.newSingleThreadScheduledExecutor();
>                 executor.scheduleAtFixedRate(new Runnable() {
>                     @Override
>                     public void run() {
>                         for (int k = 0; k < 10; k++) {
>                             for (int i = 0; i < 1000; i++) {
>                                 TestObject obj = new TestObject();
>                                 obj.setKey(k);
>                                 ctx.collect(obj);
>                             }
>                         }
>                     }
>                 }, 0, 1, TimeUnit.SECONDS);
>                 this.wait();
>             }
>             @Override
>             public synchronized void cancel() {
>                 executor.shutdown();
>                 this.notify();
>             }
>         }).keyBy("key")
>         .window(SlidingProcessingTimeWindows.of(Time.minutes(10), Time.seconds(1))).apply(new WindowFunction<TestObject, String, Tuple, TimeWindow>() {
>             @Override
>             public void apply(Tuple key, TimeWindow window, Iterable<TestObject> input, Collector<String> out) throws Exception {
>                 int count = 0;
>                 for (Object obj : input) {
>                     count++;
>                 }
>                 out.collect(key.getField(0) + ": " + count);
>             }
>         })
>         .print();
>         see.execute();
>     }
>     public static class TestObject {
>         private Integer key;
>         public Integer getKey() {
>             return key;
>         }
>         public void setKey(Integer key) {
>             this.key = key;
>         }
>     }
> }
> {code}
> When running this, flink periodically pauses for long periods of time.  I would expect a steady stream of output at 1 second intervals.  For comparison, you can switch to a count window of similar size which peforms just fine:
> {code:java}
>    .countWindow(600000, 1000).apply(new WindowFunction<FlinkPerfTest.TestObject, String, Tuple, GlobalWindow>() {
>                     @Override
>                     public void apply(Tuple key, GlobalWindow window, Iterable<TestObject> input, Collector<String> out) throws Exception {
>                         int count = 0;
>                         for (Object obj : input) {
>                             count++;
>                         }
>                         out.collect(key.getField(0) + ": " + count);
>                     }
>                 })
> {code}
> I would expect the sliding time window to perform similarly to a count window.  The sliding time window also uses significantly more cpu and memory than the count window.  I would also expect resource consumption to be similar.
> A possible cause could be that the SystemProcessingTimeService.TriggerTask is locking with the checkpointLock which acts like a global lock.  There should be a lock per key or preferably a lock-less solution.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)