You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Robert Metzger (JIRA)" <ji...@apache.org> on 2017/06/23 12:12:00 UTC
[jira] [Updated] (FLINK-6990) Poor performance with Sliding Time
Windows
[ https://issues.apache.org/jira/browse/FLINK-6990?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Robert Metzger updated FLINK-6990:
----------------------------------
Component/s: Streaming
DataStream API
> Poor performance with Sliding Time Windows
> ------------------------------------------
>
> Key: FLINK-6990
> URL: https://issues.apache.org/jira/browse/FLINK-6990
> Project: Flink
> Issue Type: Improvement
> Components: DataStream API, Streaming
> Affects Versions: 1.3.0
> Environment: OSX 10.11.4
> 2.8 GHz Intel Core i7
> 16 GB 1600 MHz DDR3
> Reporter: Brice Bingman
>
> I'm experiencing poor performance when using sliding time windows. Here is a simple example that performs poorly for me:
> {code:java}
> public class FlinkPerfTest {
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
> //Streaming 10,000 events per second
> see.addSource(new SourceFunction<TestObject>() {
> transient ScheduledExecutorService executor;
> @Override
> public synchronized void run(final SourceContext<TestObject> ctx) throws Exception {
> executor = Executors.newSingleThreadScheduledExecutor();
> executor.scheduleAtFixedRate(new Runnable() {
> @Override
> public void run() {
> for (int k = 0; k < 10; k++) {
> for (int i = 0; i < 1000; i++) {
> TestObject obj = new TestObject();
> obj.setKey(k);
> ctx.collect(obj);
> }
> }
> }
> }, 0, 1, TimeUnit.SECONDS);
> this.wait();
> }
> @Override
> public synchronized void cancel() {
> executor.shutdown();
> this.notify();
> }
> }).keyBy("key")
> .window(SlidingProcessingTimeWindows.of(Time.minutes(10), Time.seconds(1))).apply(new WindowFunction<TestObject, String, Tuple, TimeWindow>() {
> @Override
> public void apply(Tuple key, TimeWindow window, Iterable<TestObject> input, Collector<String> out) throws Exception {
> int count = 0;
> for (Object obj : input) {
> count++;
> }
> out.collect(key.getField(0) + ": " + count);
> }
> })
> .print();
> see.execute();
> }
> public static class TestObject {
> private Integer key;
> public Integer getKey() {
> return key;
> }
> public void setKey(Integer key) {
> this.key = key;
> }
> }
> }
> {code}
> When running this, flink periodically pauses for long periods of time. I would expect a steady stream of output at 1 second intervals. For comparison, you can switch to a count window of similar size which peforms just fine:
> {code:java}
> .countWindow(600000, 1000).apply(new WindowFunction<FlinkPerfTest.TestObject, String, Tuple, GlobalWindow>() {
> @Override
> public void apply(Tuple key, GlobalWindow window, Iterable<TestObject> input, Collector<String> out) throws Exception {
> int count = 0;
> for (Object obj : input) {
> count++;
> }
> out.collect(key.getField(0) + ": " + count);
> }
> })
> {code}
> I would expect the sliding time window to perform similarly to a count window. The sliding time window also uses significantly more cpu and memory than the count window. I would also expect resource consumption to be similar.
> A possible cause could be that the SystemProcessingTimeService.TriggerTask is locking with the checkpointLock which acts like a global lock. There should be a lock per key or preferably a lock-less solution.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)