You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Jayesh Patel <jp...@keywcorp.com> on 2017/05/04 14:00:24 UTC
assignTimestampsAndWatermarks not working as expected
Can anybody see what's wrong with the following code? I am using Flink 1.2
and have tried running it in Eclipse (local mode) as well as on a 3 node
cluster and it's not behaving as expected.
The idea is to have a custom source collect messages from a JMS topic (I
have a fake source for now that generates some out of order messages with
event time that is not delayed more than 5 seconds). The source doesn't
collectWithTimestamp() or emitWatermark().
The messages (events) include the event time. In order to allow for late or
out of order messages I use assignTimestampsAndWatermarks with
BoundedOutOfOrdernessTimestampExtractor and the extractTimestamp() method
retrieves the event time from the event.
When I run this job, I don't get the printout from the extractTimestamp()
method, nor do I get the logTuples.print() or stampedLogs.print() output.
When running on the local environment(Eclipse) I do see the printouts from
the fake source (MockSource - not shown here). But I don't even get those
when run from my 3 node cluster with parallelism of 3.
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(2000); // just for
debugging, didn't affect the behavior
DataStream<Message> logs = env.addSource(new MockSource());
DataStream<Tuple2<String, CEFEvent>> logTuples = logs.map(new
ParseEvent());
logTuples.print();
DataStream<Tuple2<String, CEFEvent>> stampedLogs =
logTuples.assignTimestampsAndWatermarks(
new
BoundedOutOfOrdernessTimestampExtractor<Tuple2<String,CEFEvent>>(Time.second
s(5)) {
private static final long serialVersionUID = 1L;
@Override
public long extractTimestamp(Tuple2<String,CEFEvent>
element) {
// This is how to extract timestamp from the
event
long eventTime =
element.f1.getEventStartTime().toInstant().toEpochMilli();
System.out.println("returning event time " +
eventTime);
return eventTime;
}});
stampedLogs.print();
env.execute("simulation");
}
Thank you,
Jayesh