You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Sam Huang <sa...@reflektion.com> on 2017/02/28 23:02:38 UTC

Event-time tumbling window doesn't fire- Flink 1.2.0, Kafka-0.8_2.10

HiI'm using Flink 1.2.0 to read from Kafka-0.8.1.1_2.10I have written a flink
streaming job that creates (event) time based window and then computes some
stats. However, the window function is never called. I used the debug
watermark code and noticed that no watermark is generated. If I read from
file, then only one watermark is generated. Here is my code (reading from
kafka)- public static void main(String[] args) throws Exception {       
StreamExecutionEnvironment env = FlinkUtil.createExecutionEnvironment(args);       
// Read from kafka and reading works as the following print statement works       
DataStream jsonEventStream = JsonEventStreamReader.readStream(env);       
// jsonEventStream.print();       jsonEventStream            .flatMap(new
strToTupleFlatMapFunImpl())           
.assignTimestampsAndWatermarks(getRawJsonTimestampsAndWatermarksAssigner())           
.flatMap(new jsonToTupleListFlatMapFunImpl())           
.transform("WatermarkDebugger", tmp.getType(), new WatermarkDebugger<>());           
.keyBy(0, 1, 2)            .timeWindow(Time.seconds(60))           
.allowedLateness(Time.seconds(10))            .reduce(new ReduceFunImpl(),
new WindowFunImpl())   // reduce fun is called but not window           
.addSink(new InfluxDBSink(INFLUXDB_DB));        env.execute();    }   
private static BoundedOutOfOrdernessTimestampExtractor<Tuple2&lt;String,
Long>> getRawJsonTimestampsAndWatermarksAssigner() {        return new
BoundedOutOfOrdernessTimestampExtractor<Tuple2&lt;String,
Long>>(Time.seconds(WINDOW_LATENESS)) {            @Override           
public long extractTimestamp(Tuple2<String, Long> tuple) {               
return tuple.f1;            }        };    }    public static
StreamExecutionEnvironment createExecutionEnvironment(String[] args) throws
IOException {        ParameterTool params = ParameterTool.fromArgs(args);       
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();       
env.getConfig().setGlobalJobParameters(params);       
//env.getConfig().setAutoWatermarkInterval(1000);       
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);       
env.enableCheckpointing(params.getLong("--flink.checkpointing", 5000));       
return env;    }



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Event-time-tumbling-window-doesn-t-fire-Flink-1-2-0-Kafka-0-8-2-10-tp11975.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.