You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Aljoscha Krettek (JIRA)" <ji...@apache.org> on 2016/07/13 08:56:20 UTC
[jira] [Created] (FLINK-4207) WindowOperator becomes very slow with
allowed lateness
Aljoscha Krettek created FLINK-4207:
---------------------------------------
Summary: WindowOperator becomes very slow with allowed lateness
Key: FLINK-4207
URL: https://issues.apache.org/jira/browse/FLINK-4207
Project: Flink
Issue Type: Bug
Components: Streaming
Affects Versions: 1.1.0
Reporter: Aljoscha Krettek
In this simple example the throughput (as measured by the count the window emits) becomes very low when an allowed lateness is set:
{code}
public class WindowWordCount {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
env.setParallelism(1);
env.addSource(new InfiniteTupleSource(100_000))
.keyBy(0)
.timeWindow(Time.seconds(3))
.allowedLateness(Time.seconds(1))
.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1,
Tuple2<String, Integer> value2) throws Exception {
return Tuple2.of(value1.f0, value1.f1 + value2.f1);
}
})
.filter(new FilterFunction<Tuple2<String, Integer>>() {
private static final long serialVersionUID = 1L;
@Override
public boolean filter(Tuple2<String, Integer> value) throws Exception {
return value.f0.startsWith("Tuple 0");
}
})
.print();
// execute program
env.execute("WindowWordCount");
}
public static class InfiniteTupleSource implements ParallelSourceFunction<Tuple2<String, Integer>> {
private static final long serialVersionUID = 1L;
private int numGroups;
public InfiniteTupleSource(int numGroups) {
this.numGroups = numGroups;
}
@Override
public void run(SourceContext<Tuple2<String, Integer>> out) throws Exception {
long index = 0;
while (true) {
Tuple2<String, Integer> tuple = new Tuple2<>("Tuple " + (index % numGroups), 1);
out.collect(tuple);
index++;
}
}
@Override
public void cancel() {
}
}
}
{code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)