You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Brice Bingman (JIRA)" <ji...@apache.org> on 2017/06/22 20:54:00 UTC
[jira] [Created] (FLINK-6990) Poor performance with Sliding Time
Windows
Brice Bingman created FLINK-6990:
------------------------------------
Summary: Poor performance with Sliding Time Windows
Key: FLINK-6990
URL: https://issues.apache.org/jira/browse/FLINK-6990
Project: Flink
Issue Type: Improvement
Affects Versions: 1.3.0
Environment: OSX 10.11.4
2.8 GHz Intel Core i7
16 GB 1600 MHz DDR3
Reporter: Brice Bingman
I'm experiencing poor performance when using sliding time windows. Here is a simple example that performs poorly for me:
{code:java}
public class FlinkPerfTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
//Streaming 10,000 events per second
see.addSource(new SourceFunction<TestObject>() {
transient ScheduledExecutorService executor;
@Override
public synchronized void run(final SourceContext<TestObject> ctx) throws Exception {
executor = Executors.newSingleThreadScheduledExecutor();
executor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
for (int k = 0; k < 10; k++) {
for (int i = 0; i < 1000; i++) {
TestObject obj = new TestObject();
obj.setKey(k);
ctx.collect(obj);
}
}
}
}, 0, 1, TimeUnit.SECONDS);
this.wait();
}
@Override
public synchronized void cancel() {
executor.shutdown();
this.notify();
}
}).keyBy("key")
.window(SlidingProcessingTimeWindows.of(Time.minutes(10), Time.seconds(1))).apply(new WindowFunction<TestObject, String, Tuple, TimeWindow>() {
@Override
public void apply(Tuple key, TimeWindow window, Iterable<TestObject> input, Collector<String> out) throws Exception {
int count = 0;
for (Object obj : input) {
count++;
}
out.collect(key.getField(0) + ": " + count);
}
})
.print();
see.execute();
}
public static class TestObject {
private Integer key;
public Integer getKey() {
return key;
}
public void setKey(Integer key) {
this.key = key;
}
}
}
{code}
When running this, flink periodically pauses for long periods of time. I would expect a steady stream of output at 1 second intervals. For comparison, you can switch to a count window of similar size which peforms just fine:
{code:java}
.countWindow(600000, 1000).apply(new WindowFunction<FlinkPerfTest.TestObject, String, Tuple, GlobalWindow>() {
@Override
public void apply(Tuple key, GlobalWindow window, Iterable<TestObject> input, Collector<String> out) throws Exception {
int count = 0;
for (Object obj : input) {
count++;
}
out.collect(key.getField(0) + ": " + count);
}
})
{code}
I would expect the sliding time window to perform similarly to a count window. The sliding time window also uses significantly more cpu and memory than the count window. I would also expect resource consumption to be similar.
A possible cause could be that the SystemProcessingTimeService.TriggerTask is locking with the checkpointLock which acts like a global lock. There should be a lock per key or preferably a lock-less solution.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)