You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Jayesh Patel <jp...@keywcorp.com> on 2017/05/04 14:00:24 UTC

assignTimestampsAndWatermarks not working as expected

Can anybody see what's wrong with the following code?  I am using Flink 1.2
and have tried running it in Eclipse (local mode) as well as on a 3 node
cluster and it's not behaving as expected.

 

The idea is to have a custom source collect messages from a JMS topic (I
have a fake source for now that generates some out of order messages with
event time that is not delayed more than 5 seconds).  The source doesn't
collectWithTimestamp() or emitWatermark().

The messages (events) include the event time.  In order to allow for late or
out of order messages I use assignTimestampsAndWatermarks with
BoundedOutOfOrdernessTimestampExtractor and the extractTimestamp() method
retrieves the event time from the event.

 

When I run this job, I don't get the printout from the extractTimestamp()
method, nor do I get the logTuples.print() or stampedLogs.print() output.
When running on the local environment(Eclipse) I do see the printouts from
the fake source (MockSource - not shown here).  But I don't even get those
when run from my 3 node cluster with parallelism of 3.

 

public static void main(String[] args) throws Exception {

       final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

       env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

       env.getConfig().setAutoWatermarkInterval(2000); // just for
debugging, didn't affect the behavior

 

       DataStream<Message> logs = env.addSource(new MockSource());

       DataStream<Tuple2<String, CEFEvent>> logTuples = logs.map(new
ParseEvent());

       logTuples.print();

 

 

       DataStream<Tuple2<String, CEFEvent>> stampedLogs =
logTuples.assignTimestampsAndWatermarks(

new
BoundedOutOfOrdernessTimestampExtractor<Tuple2<String,CEFEvent>>(Time.second
s(5)) {

                     private static final long serialVersionUID = 1L;

                     @Override

                     public long extractTimestamp(Tuple2<String,CEFEvent>
element) {

                            // This is how to extract timestamp from the
event

                           long eventTime =
element.f1.getEventStartTime().toInstant().toEpochMilli();

                           System.out.println("returning event time " +
eventTime);

                           return eventTime;

                     }});

       stampedLogs.print();

       env.execute("simulation");

}

 

Thank you,

Jayesh