You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Sam Huang <sa...@reflektion.com> on 2017/02/28 23:02:38 UTC
Event-time tumbling window doesn't fire- Flink 1.2.0,
Kafka-0.8_2.10
HiI'm using Flink 1.2.0 to read from Kafka-0.8.1.1_2.10I have written a flink
streaming job that creates (event) time based window and then computes some
stats. However, the window function is never called. I used the debug
watermark code and noticed that no watermark is generated. If I read from
file, then only one watermark is generated. Here is my code (reading from
kafka)- public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = FlinkUtil.createExecutionEnvironment(args);
// Read from kafka and reading works as the following print statement works
DataStream jsonEventStream = JsonEventStreamReader.readStream(env);
// jsonEventStream.print(); jsonEventStream .flatMap(new
strToTupleFlatMapFunImpl())
.assignTimestampsAndWatermarks(getRawJsonTimestampsAndWatermarksAssigner())
.flatMap(new jsonToTupleListFlatMapFunImpl())
.transform("WatermarkDebugger", tmp.getType(), new WatermarkDebugger<>());
.keyBy(0, 1, 2) .timeWindow(Time.seconds(60))
.allowedLateness(Time.seconds(10)) .reduce(new ReduceFunImpl(),
new WindowFunImpl()) // reduce fun is called but not window
.addSink(new InfluxDBSink(INFLUXDB_DB)); env.execute(); }
private static BoundedOutOfOrdernessTimestampExtractor<Tuple2<String,
Long>> getRawJsonTimestampsAndWatermarksAssigner() { return new
BoundedOutOfOrdernessTimestampExtractor<Tuple2<String,
Long>>(Time.seconds(WINDOW_LATENESS)) { @Override
public long extractTimestamp(Tuple2<String, Long> tuple) {
return tuple.f1; } }; } public static
StreamExecutionEnvironment createExecutionEnvironment(String[] args) throws
IOException { ParameterTool params = ParameterTool.fromArgs(args);
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(params);
//env.getConfig().setAutoWatermarkInterval(1000);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(params.getLong("--flink.checkpointing", 5000));
return env; }
--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Event-time-tumbling-window-doesn-t-fire-Flink-1-2-0-Kafka-0-8-2-10-tp11975.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.