You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Nico <ni...@gmail.com> on 2017/01/02 14:04:42 UTC

Re: Events are assigned to wrong window

Hi Aljoscha,

thank you for having a look. Actually there is not too much code based on
timestamps:

stream
      .keyBy("id")
      .map(...)
      .filter(...)
      .map(...)
      .keyBy("areaID")
      .map(new KeyExtractor())
      .keyBy("f1.areaID","f0.sinterval")
      .window(TumblingEventTimeWindows.of(Time.seconds(20)))
      .apply(new TrafficInformation());

The map functions only enrich the data and don't change anything related to
the timestamp.

the apply function is:

@Override
public void apply(
Tuple key,
TimeWindow timeWindow,
Iterable<Tuple2<DirectionInterval, Car>> cars,
Collector<Tuple3<String, Double, Double>> out) throws Exception {

System.out.println("Start: " +timeWindow.getStart());
System.out.println("End: " +timeWindow.getEnd());

for(Tuple2<DirectionInterval, Car> t : cars){
System.out.println(t.f1);
}

System.out.println(t.f1) prints all information about a car, in which the
timestep is embedded. The System gets the timestamp with the class:

public class TimestampGenerator extends
BoundedOutOfOrdernessTimestampExtractor <Car> {


    public TimestampGenerator(Time maxOutOfOrderness){
        super(maxOutOfOrderness);
    }

    @Override
    public long extractTimestamp(Car car) {
        return car.getTimestamp();
    }


Example output is presented in the previous post... it looks like the
timestamp is rounded... I am confused :-/

Best,
Nico

2016-12-23 19:41 GMT+01:00 Aljoscha Krettek <al...@apache.org>:

> Hi,
> could you please share code (and example data) for producing this output.
> I'd like to have a look.
>
> Cheers,
> Aljoscha
>
> On Wed, 21 Dec 2016 at 16:29 Nico <ni...@gmail.com> wrote:
>
>> Hi @all,
>>
>> I am using a TumblingEventTimeWindows.of(Time.seconds(20)) for testing.
>> During this I found a strange behavior (at least for me) in the assignment
>> of events.
>>
>> The first element of a new window is actually always part of the old
>> window. I thought the events are late, but then they they would be dropped
>> instead of assigned to the new window. Even with a allowedLateness of 10s
>> the behavior remains the same.
>>
>> The used timeWindow.getStart() and getEnd in order to get the boundaries
>> of the window.
>>
>> Can someone explain this?
>>
>> Best,
>> Nico
>>
>>
>> TimeWindows with Elements:
>>
>> Start: 1482332940000 - End: 1482332960000
>> timestamp=1482332952907
>>
>> Start: 1482332960000 - End: 1482332980000
>> timestamp=1482332958929
>> timestamp=1482332963995
>> timestamp=1482332969027
>> timestamp=1482332974039
>>
>> Start: 1482332980000 - End: 1482333000000
>> timestamp=1482332979059
>> timestamp=1482332984072
>> timestamp=1482332989081
>> timestamp=1482332994089
>>
>> Start: 1482333000000 - End: 1482333020000
>> timestamp=1482332999113
>> timestamp=1482333004123
>> timestamp=1482333009132
>> timestamp=1482333014144
>>
>

Re: Events are assigned to wrong window

Posted by Aljoscha Krettek <al...@apache.org>.
Yes, that's true.

On Fri, 27 Jan 2017 at 13:16 Nico <ni...@gmail.com> wrote:

> Hi Aljoscha,
>
> got it!!! :) Thank you. So, in order to retain the "original" timestamps,
> it would be necessary to assign the timestemps after the MapFunction
> instead of the kafka source? At lest, this solves the issue in the example.
>
> Best,
> Nico
>
> 2017-01-27 11:49 GMT+01:00 Aljoscha Krettek <al...@apache.org>:
>
> Now I see. What you're doing in this example is basically reassigning
> timestamps to other elements in your stateful MapFunction. Flink internally
> keeps track of the timestamp of an element. This can normally not be
> changed, except by using a TimestampAssigner, which you're doing. Now, the
> output from a MapFunction has the same timestamp as the input element. By
> keeping an element in state and emitting it when the next element arrives
> you emit it with the timestamp of that next element and that's the reason
> why the end up in the "wrong" windows.
>
> Does that help?
>
> -
> Aljoscha
>
> On Thu, 26 Jan 2017 at 19:17 Nico <ni...@gmail.com> wrote:
>
> Hi,
>
> can anyone help me with this problem? I don't get it. Forget the examples
> below, I've created a copy / paste example to reproduce the problem of
> incorrect results when using key-value state und windowOperator.
>
>
> public class StreamingJob {
>
> public static void main(String[] args) throws Exception {
> // set up the streaming execution environment
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
> DataStream<Tuple2<String,Long>> stream = env.fromElements(
> new Tuple2<>("1",1485446260994L),
> new Tuple2<>("1",1485446266012L),
> new Tuple2<>("1",1485446271031L),
> new Tuple2<>("1",1485446276040L),
> new Tuple2<>("1",1485446281045L),
> new Tuple2<>("1",1485446286049L),
> new Tuple2<>("1",1485446291062L),
> new Tuple2<>("1",1485446296066L),
> new Tuple2<>("1",1485446302019L)
> );
>
> stream
> .assignTimestampsAndWatermarks(new
> BoundedOutOfOrdernessTimestampExtractor<Tuple2<String,
> Long>>(Time.seconds(0)) {
> @Override
> public long extractTimestamp(Tuple2<String, Long> stringLongTuple2) {
> return stringLongTuple2.f1;
> }
> })
> .keyBy("f0")
> .map(new MapTest())
> .keyBy("f0")
> .window(TumblingEventTimeWindows.of(Time.seconds(20)))
> .apply(new WindowFunction<Tuple2<String,Long>, Object, Tuple,
> TimeWindow>() {
> @Override
> public void apply(Tuple tuple, TimeWindow timeWindow,
> Iterable<Tuple2<String, Long>> iterable, Collector<Object> collector)
> throws Exception {
>
> Set<Long> set = new HashSet<>();
> for(Tuple2<String,Long> t : iterable){
> set.add(t.f1);
> }
>
> StringBuilder sb = new StringBuilder();
>
> sb.append("Window [" +timeWindow.getStart() +" " +timeWindow.getEnd() +"]
> ");
> sb.append("Set " +set.toString());
> System.out.println(sb.toString());
> }
> })
> .print();
>
>
> // execute program
> env.execute("Flink Streaming Java API Skeleton");
> }
>
> private static class MapTest extends
> RichMapFunction<Tuple2<String,Long>,Tuple2<String,Long>> {
>
> private transient ValueState<Tuple2<String, Long>> state;
>
> @Override
> public Tuple2<String, Long> map(Tuple2<String, Long> stringLongTuple2)
> throws Exception {
>
> Tuple2<String,Long> t = state.value();
>
> state.update(stringLongTuple2);
>
> if(t == null) return stringLongTuple2;
>
> return t;
> }
>
> @Override
> public void open(Configuration parameters) throws Exception {
>
> ValueStateDescriptor<Tuple2<String,Long>> vsd = new ValueStateDescriptor<>(
> "lastEvent",
> TypeInformation.of(new TypeHint<Tuple2<String, Long>>() {}),
> null
> );
>
> state = getRuntimeContext().getState(vsd);
> }
> }
> }
>
>
> Output:
>
> Window [1485446260000 1485446280000] Set [1485446271031, 1485446260994,
> 1485446266012]
> Window [1485446280000 1485446300000] Set [1485446291062, 1485446281045,
> 1485446286049, 1485446276040]
> Window [1485446300000 1485446320000] Set [1485446296066]
>
> Best,
> Nico
>
> BTW ... I am using Flink 1.1.3.
>
>
> 2017-01-16 12:18 GMT+01:00 Nico <ni...@gmail.com>:
>
> Hi Aljoscha,
>
> is was able to identify the root cause of the problem. It is my first map
> function using the ValueState. But first, the
> assignTimestampsAndWatermarks() is called after the connector to Kafka is
> generated:
>
> FlinkKafkaConsumer09<Car> carFlinkKafkaConsumer09  =
>       new FlinkKafkaConsumer09<>("Traffic", new Car(), properties);
>
> // Extrahieren der Timestamps mit max. Delay von 2s
> carFlinkKafkaConsumer09.assignTimestampsAndWatermarks(new TimestampGenerator(Time.seconds(0)));
>
> In the map function I try to calculate the direction between two GPS data points. For this, I store the last event in ValueState. The function looks like this:
>
> private static class BearingMap extends RichMapFunction<Car, Car> {
>
>    private transient ValueState<Car> state;
>    private final double maxdiff = 12; // in Sekunden
>
>    @Override
>    public Car map(Car destination) throws Exception {
>
>       Car origin = state.value();
>       double olat, olon, dlat, dlon;
>
>       /**
>        *  Wenn State leer, berechne keine Richtung, sondern speichere Event nur in State
>        */
>       if (origin == null){
>          state.update(destination);
>          // gebe Car ohne Aenderungen zurueck
>          return destination;
>       }
>
>       double diff = origin.getTimestamp()-destination.getTimestamp();
>
>            System.out.println("Differenz: " +diff);
>
>            if(Math.abs(diff) <= maxdiff*1000){
>
>          /*
>           * Bei spaeten Events, die jedoch noch in den erlaubten Delay fallen
>           */
>          if(diff > 0){
>             Car tmp = destination;
>             destination = origin;
>             origin = tmp;
>          }
>
>          /*
>           * Car tmp ist immer der Ursprung
>           */
>
>          double bearing = Helper.calculateBearing(
>                origin.getLat(),origin.getLon(),destination.getLat(),destination.getLon());
>
>          // Update des State's
>          state.update(destination);
>
>          origin.setDirection(bearing);
>          return origin;
>
>       }
>
>       // Bei zu spaeten Events behalte jetzigen Status und gebe diesen ohne Richtung zurück
>          return origin;
>
>    }
>
>
>    @Override
>    public void open(Configuration parameters) throws Exception {
>
>       ValueStateDescriptor<Car> vsd = new ValueStateDescriptor<>(
>             "lastEvent",
>             Car.class,
>             null
>       );
>
>       state = getRuntimeContext().getState(vsd);
>    }
>
> }
>
> Together with the window function:
>
>
> private static class TimeWindowTest implements WindowFunction<Car, Tuple9<Double, Double, Double, Double, Double, Double, Double, Integer, List<String>>, Tuple, TimeWindow> {
>     @Override
>     public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Car> iterable, Collector<Tuple9<Double, Double, Double, Double, Double, Double, Double, Integer, List<String>>> collector) throws Exception {
>         String s = "Zeitfenster: " +timeWindow.getStart() +" - " + timeWindow.getEnd() +"\n";
>         Set<Long> timestamps = new HashSet<Long>();
>
>         for( Car c : iterable){
>             timestamps.add(c.getTimestamp());
>         }
>
>         System.out.println( s +timestamps +"\n\n");
>     }
> }
>
> I get for :
>
> stream
>    .filter(new FilterFunction<Car>() {
>       @Override
>       public boolean filter(Car car) throws Exception {
>          return car.getId().equals("car.330");
>       }
>    })
>              .keyBy("id")
>              .map(new BearingMap())
>              .keyBy("id")
>              .window(TumblingEventTimeWindows.of(Time.seconds(10)))
>              .apply(new TimeWindowTest());
>
> So actually, when an event e1 arrives the Map Operator, it is stored in ValueState and after the next element e2 arrives, e1
> will be forwarded. This is after 5 seconds. This generates the following outcome. One Element is always around 5 seconds before the
> start of the window.
>
> Differenz: -5013.0
> Differenz: -5014.0
> Zeitfenster: 1484564690000 - 1484564700000 (Window times start and end)
> [1484564686236, 1484564691260]
>
>
> Differenz: -5009.0
> Differenz: -5007.0
> Zeitfenster: 1484564700000 - 1484564710000
> [1484564696273, 1484564701287]
>
>
> Differenz: -5005.0
> Differenz: -5014.0
> Zeitfenster: 1484564710000 - 1484564720000
> [1484564706296, 1484564711303]
>
>
> Best,
>
> Nico
>
>
>
> 2017-01-09 16:10 GMT+01:00 Aljoscha Krettek <al...@apache.org>:
>
> Hi,
> I'm assuming you also have the call to assignTimestampsAndWatermarks()
> somewhere in there as well, as in:
>
> stream
>       .assignTimestampsAndWatermarks(new TimestampGenerator()) // or
> somewhere else in the pipeline
>       .keyBy("id")
>       .map(...)
>       .filter(...)
>       .map(...)
>       .keyBy("areaID")
>       .map(new KeyExtractor())
>       .keyBy("f1.areaID","f0.sinterval")
>       .window(TumblingEventTimeWindows.of(Time.seconds(20)))
>       .apply(new TrafficInformation());
>
> Just checking, to make sure. If you have this we might have to dig a
> little deeper. Could you also please trying to bring the whole output of
> your apply() method in one go, i.e. collect all the output in a String and
> then have one call to System.out.println(), it could be that the output in
> the terminal is not completely in order.
>
> Cheers,
> Aljoscha
>
> On Mon, 2 Jan 2017 at 15:04 Nico <ni...@gmail.com> wrote:
>
> Hi Aljoscha,
>
> thank you for having a look. Actually there is not too much code based on
> timestamps:
>
> stream
>       .keyBy("id")
>       .map(...)
>       .filter(...)
>       .map(...)
>       .keyBy("areaID")
>       .map(new KeyExtractor())
>       .keyBy("f1.areaID","f0.sinterval")
>       .window(TumblingEventTimeWindows.of(Time.seconds(20)))
>       .apply(new TrafficInformation());
>
> The map functions only enrich the data and don't change anything related
> to the timestamp.
>
> the apply function is:
>
> @Override
> public void apply(
> Tuple key,
> TimeWindow timeWindow,
> Iterable<Tuple2<DirectionInterval, Car>> cars,
> Collector<Tuple3<String, Double, Double>> out) throws Exception {
>
> System.out.println("Start: " +timeWindow.getStart());
> System.out.println("End: " +timeWindow.getEnd());
>
> for(Tuple2<DirectionInterval, Car> t : cars){
> System.out.println(t.f1);
> }
>
> System.out.println(t.f1) prints all information about a car, in which the
> timestep is embedded. The System gets the timestamp with the class:
>
> public class TimestampGenerator extends
> BoundedOutOfOrdernessTimestampExtractor <Car> {
>
>
>     public TimestampGenerator(Time maxOutOfOrderness){
>         super(maxOutOfOrderness);
>     }
>
>     @Override
>     public long extractTimestamp(Car car) {
>         return car.getTimestamp();
>     }
>
>
> Example output is presented in the previous post... it looks like the
> timestamp is rounded... I am confused :-/
>
> Best,
> Nico
>
> 2016-12-23 19:41 GMT+01:00 Aljoscha Krettek <al...@apache.org>:
>
> Hi,
> could you please share code (and example data) for producing this output.
> I'd like to have a look.
>
> Cheers,
> Aljoscha
>
> On Wed, 21 Dec 2016 at 16:29 Nico <ni...@gmail.com> wrote:
>
> Hi @all,
>
> I am using a TumblingEventTimeWindows.of(Time.seconds(20)) for testing.
> During this I found a strange behavior (at least for me) in the assignment
> of events.
>
> The first element of a new window is actually always part of the old
> window. I thought the events are late, but then they they would be dropped
> instead of assigned to the new window. Even with a allowedLateness of 10s
> the behavior remains the same.
>
> The used timeWindow.getStart() and getEnd in order to get the boundaries
> of the window.
>
> Can someone explain this?
>
> Best,
> Nico
>
>
> TimeWindows with Elements:
>
> Start: 1482332940000 - End: 1482332960000
> timestamp=1482332952907
>
> Start: 1482332960000 - End: 1482332980000
> timestamp=1482332958929
> timestamp=1482332963995
> timestamp=1482332969027
> timestamp=1482332974039
>
> Start: 1482332980000 - End: 1482333000000
> timestamp=1482332979059
> timestamp=1482332984072
> timestamp=1482332989081
> timestamp=1482332994089
>
> Start: 1482333000000 - End: 1482333020000
> timestamp=1482332999113
> timestamp=1482333004123
> timestamp=1482333009132
> timestamp=1482333014144
>
>
>
>
>
>

Re: Events are assigned to wrong window

Posted by Nico <ni...@gmail.com>.
Hi Aljoscha,

got it!!! :) Thank you. So, in order to retain the "original" timestamps,
it would be necessary to assign the timestemps after the MapFunction
instead of the kafka source? At lest, this solves the issue in the example.

Best,
Nico

2017-01-27 11:49 GMT+01:00 Aljoscha Krettek <al...@apache.org>:

> Now I see. What you're doing in this example is basically reassigning
> timestamps to other elements in your stateful MapFunction. Flink internally
> keeps track of the timestamp of an element. This can normally not be
> changed, except by using a TimestampAssigner, which you're doing. Now, the
> output from a MapFunction has the same timestamp as the input element. By
> keeping an element in state and emitting it when the next element arrives
> you emit it with the timestamp of that next element and that's the reason
> why the end up in the "wrong" windows.
>
> Does that help?
>
> -
> Aljoscha
>
> On Thu, 26 Jan 2017 at 19:17 Nico <ni...@gmail.com> wrote:
>
>> Hi,
>>
>> can anyone help me with this problem? I don't get it. Forget the examples
>> below, I've created a copy / paste example to reproduce the problem of
>> incorrect results when using key-value state und windowOperator.
>>
>>
>> public class StreamingJob {
>>
>> public static void main(String[] args) throws Exception {
>> // set up the streaming execution environment
>> final StreamExecutionEnvironment env = StreamExecutionEnvironment.
>> getExecutionEnvironment();
>>
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>
>> DataStream<Tuple2<String,Long>> stream = env.fromElements(
>> new Tuple2<>("1",1485446260994L),
>> new Tuple2<>("1",1485446266012L),
>> new Tuple2<>("1",1485446271031L),
>> new Tuple2<>("1",1485446276040L),
>> new Tuple2<>("1",1485446281045L),
>> new Tuple2<>("1",1485446286049L),
>> new Tuple2<>("1",1485446291062L),
>> new Tuple2<>("1",1485446296066L),
>> new Tuple2<>("1",1485446302019L)
>> );
>>
>> stream
>> .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple2<String,
>> Long>>(Time.seconds(0)) {
>> @Override
>> public long extractTimestamp(Tuple2<String, Long> stringLongTuple2) {
>> return stringLongTuple2.f1;
>> }
>> })
>> .keyBy("f0")
>> .map(new MapTest())
>> .keyBy("f0")
>> .window(TumblingEventTimeWindows.of(Time.seconds(20)))
>> .apply(new WindowFunction<Tuple2<String,Long>, Object, Tuple,
>> TimeWindow>() {
>> @Override
>> public void apply(Tuple tuple, TimeWindow timeWindow,
>> Iterable<Tuple2<String, Long>> iterable, Collector<Object> collector)
>> throws Exception {
>>
>> Set<Long> set = new HashSet<>();
>> for(Tuple2<String,Long> t : iterable){
>> set.add(t.f1);
>> }
>>
>> StringBuilder sb = new StringBuilder();
>>
>> sb.append("Window [" +timeWindow.getStart() +" " +timeWindow.getEnd() +"]
>> ");
>> sb.append("Set " +set.toString());
>> System.out.println(sb.toString());
>> }
>> })
>> .print();
>>
>>
>> // execute program
>> env.execute("Flink Streaming Java API Skeleton");
>> }
>>
>> private static class MapTest extends RichMapFunction<Tuple2<String,Long>,Tuple2<String,Long>>
>> {
>>
>> private transient ValueState<Tuple2<String, Long>> state;
>>
>> @Override
>> public Tuple2<String, Long> map(Tuple2<String, Long> stringLongTuple2)
>> throws Exception {
>>
>> Tuple2<String,Long> t = state.value();
>>
>> state.update(stringLongTuple2);
>>
>> if(t == null) return stringLongTuple2;
>>
>> return t;
>> }
>>
>> @Override
>> public void open(Configuration parameters) throws Exception {
>>
>> ValueStateDescriptor<Tuple2<String,Long>> vsd = new
>> ValueStateDescriptor<>(
>> "lastEvent",
>> TypeInformation.of(new TypeHint<Tuple2<String, Long>>() {}),
>> null
>> );
>>
>> state = getRuntimeContext().getState(vsd);
>> }
>> }
>> }
>>
>>
>> Output:
>>
>> Window [1485446260000 1485446280000] Set [1485446271031, 1485446260994,
>> 1485446266012]
>> Window [1485446280000 1485446300000] Set [1485446291062, 1485446281045,
>> 1485446286049, 1485446276040]
>> Window [1485446300000 1485446320000] Set [1485446296066]
>>
>> Best,
>> Nico
>>
>> BTW ... I am using Flink 1.1.3.
>>
>>
>> 2017-01-16 12:18 GMT+01:00 Nico <ni...@gmail.com>:
>>
>> Hi Aljoscha,
>>
>> is was able to identify the root cause of the problem. It is my first map
>> function using the ValueState. But first, the assignTimestampsAndWaterma
>> rks() is called after the connector to Kafka is generated:
>>
>> FlinkKafkaConsumer09<Car> carFlinkKafkaConsumer09  =
>>       new FlinkKafkaConsumer09<>("Traffic", new Car(), properties);
>>
>> // Extrahieren der Timestamps mit max. Delay von 2s
>> carFlinkKafkaConsumer09.assignTimestampsAndWatermarks(new TimestampGenerator(Time.seconds(0)));
>>
>> In the map function I try to calculate the direction between two GPS data points. For this, I store the last event in ValueState. The function looks like this:
>>
>> private static class BearingMap extends RichMapFunction<Car, Car> {
>>
>>    private transient ValueState<Car> state;
>>    private final double maxdiff = 12; // in Sekunden
>>
>>    @Override
>>    public Car map(Car destination) throws Exception {
>>
>>       Car origin = state.value();
>>       double olat, olon, dlat, dlon;
>>
>>       /**
>>        *  Wenn State leer, berechne keine Richtung, sondern speichere Event nur in State
>>        */
>>       if (origin == null){
>>          state.update(destination);
>>          // gebe Car ohne Aenderungen zurueck
>>          return destination;
>>       }
>>
>>       double diff = origin.getTimestamp()-destination.getTimestamp();
>>
>>            System.out.println("Differenz: " +diff);
>>
>>            if(Math.abs(diff) <= maxdiff*1000){
>>
>>          /*
>>           * Bei spaeten Events, die jedoch noch in den erlaubten Delay fallen
>>           */
>>          if(diff > 0){
>>             Car tmp = destination;
>>             destination = origin;
>>             origin = tmp;
>>          }
>>
>>          /*
>>           * Car tmp ist immer der Ursprung
>>           */
>>
>>          double bearing = Helper.calculateBearing(
>>                origin.getLat(),origin.getLon(),destination.getLat(),destination.getLon());
>>
>>          // Update des State's
>>          state.update(destination);
>>
>>          origin.setDirection(bearing);
>>          return origin;
>>
>>       }
>>
>>       // Bei zu spaeten Events behalte jetzigen Status und gebe diesen ohne Richtung zurück
>>          return origin;
>>
>>    }
>>
>>
>>    @Override
>>    public void open(Configuration parameters) throws Exception {
>>
>>       ValueStateDescriptor<Car> vsd = new ValueStateDescriptor<>(
>>             "lastEvent",
>>             Car.class,
>>             null
>>       );
>>
>>       state = getRuntimeContext().getState(vsd);
>>    }
>>
>> }
>>
>> Together with the window function:
>>
>>
>> private static class TimeWindowTest implements WindowFunction<Car, Tuple9<Double, Double, Double, Double, Double, Double, Double, Integer, List<String>>, Tuple, TimeWindow> {
>>     @Override
>>     public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Car> iterable, Collector<Tuple9<Double, Double, Double, Double, Double, Double, Double, Integer, List<String>>> collector) throws Exception {
>>         String s = "Zeitfenster: " +timeWindow.getStart() +" - " + timeWindow.getEnd() +"\n";
>>         Set<Long> timestamps = new HashSet<Long>();
>>
>>         for( Car c : iterable){
>>             timestamps.add(c.getTimestamp());
>>         }
>>
>>         System.out.println( s +timestamps +"\n\n");
>>     }
>> }
>>
>> I get for :
>>
>> stream
>>    .filter(new FilterFunction<Car>() {
>>       @Override
>>       public boolean filter(Car car) throws Exception {
>>          return car.getId().equals("car.330");
>>       }
>>    })
>>              .keyBy("id")
>>              .map(new BearingMap())
>>              .keyBy("id")
>>              .window(TumblingEventTimeWindows.of(Time.seconds(10)))
>>              .apply(new TimeWindowTest());
>>
>> So actually, when an event e1 arrives the Map Operator, it is stored in ValueState and after the next element e2 arrives, e1
>> will be forwarded. This is after 5 seconds. This generates the following outcome. One Element is always around 5 seconds before the
>> start of the window.
>>
>> Differenz: -5013.0
>> Differenz: -5014.0
>> Zeitfenster: 1484564690000 - 1484564700000 (Window times start and end)
>> [1484564686236, 1484564691260]
>>
>>
>> Differenz: -5009.0
>> Differenz: -5007.0
>> Zeitfenster: 1484564700000 - 1484564710000
>> [1484564696273, 1484564701287]
>>
>>
>> Differenz: -5005.0
>> Differenz: -5014.0
>> Zeitfenster: 1484564710000 - 1484564720000
>> [1484564706296, 1484564711303]
>>
>>
>> Best,
>>
>> Nico
>>
>>
>>
>> 2017-01-09 16:10 GMT+01:00 Aljoscha Krettek <al...@apache.org>:
>>
>> Hi,
>> I'm assuming you also have the call to assignTimestampsAndWatermarks()
>> somewhere in there as well, as in:
>>
>> stream
>>       .assignTimestampsAndWatermarks(new TimestampGenerator()) // or
>> somewhere else in the pipeline
>>       .keyBy("id")
>>       .map(...)
>>       .filter(...)
>>       .map(...)
>>       .keyBy("areaID")
>>       .map(new KeyExtractor())
>>       .keyBy("f1.areaID","f0.sinterval")
>>       .window(TumblingEventTimeWindows.of(Time.seconds(20)))
>>       .apply(new TrafficInformation());
>>
>> Just checking, to make sure. If you have this we might have to dig a
>> little deeper. Could you also please trying to bring the whole output of
>> your apply() method in one go, i.e. collect all the output in a String and
>> then have one call to System.out.println(), it could be that the output in
>> the terminal is not completely in order.
>>
>> Cheers,
>> Aljoscha
>>
>> On Mon, 2 Jan 2017 at 15:04 Nico <ni...@gmail.com> wrote:
>>
>> Hi Aljoscha,
>>
>> thank you for having a look. Actually there is not too much code based on
>> timestamps:
>>
>> stream
>>       .keyBy("id")
>>       .map(...)
>>       .filter(...)
>>       .map(...)
>>       .keyBy("areaID")
>>       .map(new KeyExtractor())
>>       .keyBy("f1.areaID","f0.sinterval")
>>       .window(TumblingEventTimeWindows.of(Time.seconds(20)))
>>       .apply(new TrafficInformation());
>>
>> The map functions only enrich the data and don't change anything related
>> to the timestamp.
>>
>> the apply function is:
>>
>> @Override
>> public void apply(
>> Tuple key,
>> TimeWindow timeWindow,
>> Iterable<Tuple2<DirectionInterval, Car>> cars,
>> Collector<Tuple3<String, Double, Double>> out) throws Exception {
>>
>> System.out.println("Start: " +timeWindow.getStart());
>> System.out.println("End: " +timeWindow.getEnd());
>>
>> for(Tuple2<DirectionInterval, Car> t : cars){
>> System.out.println(t.f1);
>> }
>>
>> System.out.println(t.f1) prints all information about a car, in which the
>> timestep is embedded. The System gets the timestamp with the class:
>>
>> public class TimestampGenerator extends BoundedOutOfOrdernessTimestampExtractor
>> <Car> {
>>
>>
>>     public TimestampGenerator(Time maxOutOfOrderness){
>>         super(maxOutOfOrderness);
>>     }
>>
>>     @Override
>>     public long extractTimestamp(Car car) {
>>         return car.getTimestamp();
>>     }
>>
>>
>> Example output is presented in the previous post... it looks like the
>> timestamp is rounded... I am confused :-/
>>
>> Best,
>> Nico
>>
>> 2016-12-23 19:41 GMT+01:00 Aljoscha Krettek <al...@apache.org>:
>>
>> Hi,
>> could you please share code (and example data) for producing this output.
>> I'd like to have a look.
>>
>> Cheers,
>> Aljoscha
>>
>> On Wed, 21 Dec 2016 at 16:29 Nico <ni...@gmail.com> wrote:
>>
>> Hi @all,
>>
>> I am using a TumblingEventTimeWindows.of(Time.seconds(20)) for testing.
>> During this I found a strange behavior (at least for me) in the assignment
>> of events.
>>
>> The first element of a new window is actually always part of the old
>> window. I thought the events are late, but then they they would be dropped
>> instead of assigned to the new window. Even with a allowedLateness of 10s
>> the behavior remains the same.
>>
>> The used timeWindow.getStart() and getEnd in order to get the boundaries
>> of the window.
>>
>> Can someone explain this?
>>
>> Best,
>> Nico
>>
>>
>> TimeWindows with Elements:
>>
>> Start: 1482332940000 - End: 1482332960000
>> timestamp=1482332952907
>>
>> Start: 1482332960000 - End: 1482332980000
>> timestamp=1482332958929
>> timestamp=1482332963995
>> timestamp=1482332969027
>> timestamp=1482332974039
>>
>> Start: 1482332980000 - End: 1482333000000
>> timestamp=1482332979059
>> timestamp=1482332984072
>> timestamp=1482332989081
>> timestamp=1482332994089
>>
>> Start: 1482333000000 - End: 1482333020000
>> timestamp=1482332999113
>> timestamp=1482333004123
>> timestamp=1482333009132
>> timestamp=1482333014144
>>
>>
>>
>>
>>

Re: Events are assigned to wrong window

Posted by Aljoscha Krettek <al...@apache.org>.
Now I see. What you're doing in this example is basically reassigning
timestamps to other elements in your stateful MapFunction. Flink internally
keeps track of the timestamp of an element. This can normally not be
changed, except by using a TimestampAssigner, which you're doing. Now, the
output from a MapFunction has the same timestamp as the input element. By
keeping an element in state and emitting it when the next element arrives
you emit it with the timestamp of that next element and that's the reason
why the end up in the "wrong" windows.

Does that help?

-
Aljoscha

On Thu, 26 Jan 2017 at 19:17 Nico <ni...@gmail.com> wrote:

> Hi,
>
> can anyone help me with this problem? I don't get it. Forget the examples
> below, I've created a copy / paste example to reproduce the problem of
> incorrect results when using key-value state und windowOperator.
>
>
> public class StreamingJob {
>
> public static void main(String[] args) throws Exception {
> // set up the streaming execution environment
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
> DataStream<Tuple2<String,Long>> stream = env.fromElements(
> new Tuple2<>("1",1485446260994L),
> new Tuple2<>("1",1485446266012L),
> new Tuple2<>("1",1485446271031L),
> new Tuple2<>("1",1485446276040L),
> new Tuple2<>("1",1485446281045L),
> new Tuple2<>("1",1485446286049L),
> new Tuple2<>("1",1485446291062L),
> new Tuple2<>("1",1485446296066L),
> new Tuple2<>("1",1485446302019L)
> );
>
> stream
> .assignTimestampsAndWatermarks(new
> BoundedOutOfOrdernessTimestampExtractor<Tuple2<String,
> Long>>(Time.seconds(0)) {
> @Override
> public long extractTimestamp(Tuple2<String, Long> stringLongTuple2) {
> return stringLongTuple2.f1;
> }
> })
> .keyBy("f0")
> .map(new MapTest())
> .keyBy("f0")
> .window(TumblingEventTimeWindows.of(Time.seconds(20)))
> .apply(new WindowFunction<Tuple2<String,Long>, Object, Tuple,
> TimeWindow>() {
> @Override
> public void apply(Tuple tuple, TimeWindow timeWindow,
> Iterable<Tuple2<String, Long>> iterable, Collector<Object> collector)
> throws Exception {
>
> Set<Long> set = new HashSet<>();
> for(Tuple2<String,Long> t : iterable){
> set.add(t.f1);
> }
>
> StringBuilder sb = new StringBuilder();
>
> sb.append("Window [" +timeWindow.getStart() +" " +timeWindow.getEnd() +"]
> ");
> sb.append("Set " +set.toString());
> System.out.println(sb.toString());
> }
> })
> .print();
>
>
> // execute program
> env.execute("Flink Streaming Java API Skeleton");
> }
>
> private static class MapTest extends
> RichMapFunction<Tuple2<String,Long>,Tuple2<String,Long>> {
>
> private transient ValueState<Tuple2<String, Long>> state;
>
> @Override
> public Tuple2<String, Long> map(Tuple2<String, Long> stringLongTuple2)
> throws Exception {
>
> Tuple2<String,Long> t = state.value();
>
> state.update(stringLongTuple2);
>
> if(t == null) return stringLongTuple2;
>
> return t;
> }
>
> @Override
> public void open(Configuration parameters) throws Exception {
>
> ValueStateDescriptor<Tuple2<String,Long>> vsd = new ValueStateDescriptor<>(
> "lastEvent",
> TypeInformation.of(new TypeHint<Tuple2<String, Long>>() {}),
> null
> );
>
> state = getRuntimeContext().getState(vsd);
> }
> }
> }
>
>
> Output:
>
> Window [1485446260000 1485446280000] Set [1485446271031, 1485446260994,
> 1485446266012]
> Window [1485446280000 1485446300000] Set [1485446291062, 1485446281045,
> 1485446286049, 1485446276040]
> Window [1485446300000 1485446320000] Set [1485446296066]
>
> Best,
> Nico
>
> BTW ... I am using Flink 1.1.3.
>
>
> 2017-01-16 12:18 GMT+01:00 Nico <ni...@gmail.com>:
>
> Hi Aljoscha,
>
> is was able to identify the root cause of the problem. It is my first map
> function using the ValueState. But first, the
> assignTimestampsAndWatermarks() is called after the connector to Kafka is
> generated:
>
> FlinkKafkaConsumer09<Car> carFlinkKafkaConsumer09  =
>       new FlinkKafkaConsumer09<>("Traffic", new Car(), properties);
>
> // Extrahieren der Timestamps mit max. Delay von 2s
> carFlinkKafkaConsumer09.assignTimestampsAndWatermarks(new TimestampGenerator(Time.seconds(0)));
>
> In the map function I try to calculate the direction between two GPS data points. For this, I store the last event in ValueState. The function looks like this:
>
> private static class BearingMap extends RichMapFunction<Car, Car> {
>
>    private transient ValueState<Car> state;
>    private final double maxdiff = 12; // in Sekunden
>
>    @Override
>    public Car map(Car destination) throws Exception {
>
>       Car origin = state.value();
>       double olat, olon, dlat, dlon;
>
>       /**
>        *  Wenn State leer, berechne keine Richtung, sondern speichere Event nur in State
>        */
>       if (origin == null){
>          state.update(destination);
>          // gebe Car ohne Aenderungen zurueck
>          return destination;
>       }
>
>       double diff = origin.getTimestamp()-destination.getTimestamp();
>
>            System.out.println("Differenz: " +diff);
>
>            if(Math.abs(diff) <= maxdiff*1000){
>
>          /*
>           * Bei spaeten Events, die jedoch noch in den erlaubten Delay fallen
>           */
>          if(diff > 0){
>             Car tmp = destination;
>             destination = origin;
>             origin = tmp;
>          }
>
>          /*
>           * Car tmp ist immer der Ursprung
>           */
>
>          double bearing = Helper.calculateBearing(
>                origin.getLat(),origin.getLon(),destination.getLat(),destination.getLon());
>
>          // Update des State's
>          state.update(destination);
>
>          origin.setDirection(bearing);
>          return origin;
>
>       }
>
>       // Bei zu spaeten Events behalte jetzigen Status und gebe diesen ohne Richtung zurück
>          return origin;
>
>    }
>
>
>    @Override
>    public void open(Configuration parameters) throws Exception {
>
>       ValueStateDescriptor<Car> vsd = new ValueStateDescriptor<>(
>             "lastEvent",
>             Car.class,
>             null
>       );
>
>       state = getRuntimeContext().getState(vsd);
>    }
>
> }
>
> Together with the window function:
>
>
> private static class TimeWindowTest implements WindowFunction<Car, Tuple9<Double, Double, Double, Double, Double, Double, Double, Integer, List<String>>, Tuple, TimeWindow> {
>     @Override
>     public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Car> iterable, Collector<Tuple9<Double, Double, Double, Double, Double, Double, Double, Integer, List<String>>> collector) throws Exception {
>         String s = "Zeitfenster: " +timeWindow.getStart() +" - " + timeWindow.getEnd() +"\n";
>         Set<Long> timestamps = new HashSet<Long>();
>
>         for( Car c : iterable){
>             timestamps.add(c.getTimestamp());
>         }
>
>         System.out.println( s +timestamps +"\n\n");
>     }
> }
>
> I get for :
>
> stream
>    .filter(new FilterFunction<Car>() {
>       @Override
>       public boolean filter(Car car) throws Exception {
>          return car.getId().equals("car.330");
>       }
>    })
>              .keyBy("id")
>              .map(new BearingMap())
>              .keyBy("id")
>              .window(TumblingEventTimeWindows.of(Time.seconds(10)))
>              .apply(new TimeWindowTest());
>
> So actually, when an event e1 arrives the Map Operator, it is stored in ValueState and after the next element e2 arrives, e1
> will be forwarded. This is after 5 seconds. This generates the following outcome. One Element is always around 5 seconds before the
> start of the window.
>
> Differenz: -5013.0
> Differenz: -5014.0
> Zeitfenster: 1484564690000 - 1484564700000 (Window times start and end)
> [1484564686236, 1484564691260]
>
>
> Differenz: -5009.0
> Differenz: -5007.0
> Zeitfenster: 1484564700000 - 1484564710000
> [1484564696273, 1484564701287]
>
>
> Differenz: -5005.0
> Differenz: -5014.0
> Zeitfenster: 1484564710000 - 1484564720000
> [1484564706296, 1484564711303]
>
>
> Best,
>
> Nico
>
>
>
> 2017-01-09 16:10 GMT+01:00 Aljoscha Krettek <al...@apache.org>:
>
> Hi,
> I'm assuming you also have the call to assignTimestampsAndWatermarks()
> somewhere in there as well, as in:
>
> stream
>       .assignTimestampsAndWatermarks(new TimestampGenerator()) // or
> somewhere else in the pipeline
>       .keyBy("id")
>       .map(...)
>       .filter(...)
>       .map(...)
>       .keyBy("areaID")
>       .map(new KeyExtractor())
>       .keyBy("f1.areaID","f0.sinterval")
>       .window(TumblingEventTimeWindows.of(Time.seconds(20)))
>       .apply(new TrafficInformation());
>
> Just checking, to make sure. If you have this we might have to dig a
> little deeper. Could you also please trying to bring the whole output of
> your apply() method in one go, i.e. collect all the output in a String and
> then have one call to System.out.println(), it could be that the output in
> the terminal is not completely in order.
>
> Cheers,
> Aljoscha
>
> On Mon, 2 Jan 2017 at 15:04 Nico <ni...@gmail.com> wrote:
>
> Hi Aljoscha,
>
> thank you for having a look. Actually there is not too much code based on
> timestamps:
>
> stream
>       .keyBy("id")
>       .map(...)
>       .filter(...)
>       .map(...)
>       .keyBy("areaID")
>       .map(new KeyExtractor())
>       .keyBy("f1.areaID","f0.sinterval")
>       .window(TumblingEventTimeWindows.of(Time.seconds(20)))
>       .apply(new TrafficInformation());
>
> The map functions only enrich the data and don't change anything related
> to the timestamp.
>
> the apply function is:
>
> @Override
> public void apply(
> Tuple key,
> TimeWindow timeWindow,
> Iterable<Tuple2<DirectionInterval, Car>> cars,
> Collector<Tuple3<String, Double, Double>> out) throws Exception {
>
> System.out.println("Start: " +timeWindow.getStart());
> System.out.println("End: " +timeWindow.getEnd());
>
> for(Tuple2<DirectionInterval, Car> t : cars){
> System.out.println(t.f1);
> }
>
> System.out.println(t.f1) prints all information about a car, in which the
> timestep is embedded. The System gets the timestamp with the class:
>
> public class TimestampGenerator extends
> BoundedOutOfOrdernessTimestampExtractor <Car> {
>
>
>     public TimestampGenerator(Time maxOutOfOrderness){
>         super(maxOutOfOrderness);
>     }
>
>     @Override
>     public long extractTimestamp(Car car) {
>         return car.getTimestamp();
>     }
>
>
> Example output is presented in the previous post... it looks like the
> timestamp is rounded... I am confused :-/
>
> Best,
> Nico
>
> 2016-12-23 19:41 GMT+01:00 Aljoscha Krettek <al...@apache.org>:
>
> Hi,
> could you please share code (and example data) for producing this output.
> I'd like to have a look.
>
> Cheers,
> Aljoscha
>
> On Wed, 21 Dec 2016 at 16:29 Nico <ni...@gmail.com> wrote:
>
> Hi @all,
>
> I am using a TumblingEventTimeWindows.of(Time.seconds(20)) for testing.
> During this I found a strange behavior (at least for me) in the assignment
> of events.
>
> The first element of a new window is actually always part of the old
> window. I thought the events are late, but then they they would be dropped
> instead of assigned to the new window. Even with a allowedLateness of 10s
> the behavior remains the same.
>
> The used timeWindow.getStart() and getEnd in order to get the boundaries
> of the window.
>
> Can someone explain this?
>
> Best,
> Nico
>
>
> TimeWindows with Elements:
>
> Start: 1482332940000 - End: 1482332960000
> timestamp=1482332952907
>
> Start: 1482332960000 - End: 1482332980000
> timestamp=1482332958929
> timestamp=1482332963995
> timestamp=1482332969027
> timestamp=1482332974039
>
> Start: 1482332980000 - End: 1482333000000
> timestamp=1482332979059
> timestamp=1482332984072
> timestamp=1482332989081
> timestamp=1482332994089
>
> Start: 1482333000000 - End: 1482333020000
> timestamp=1482332999113
> timestamp=1482333004123
> timestamp=1482333009132
> timestamp=1482333014144
>
>
>
>
>

Re: Events are assigned to wrong window

Posted by Nico <ni...@gmail.com>.
Hi,

can anyone help me with this problem? I don't get it. Forget the examples
below, I've created a copy / paste example to reproduce the problem of
incorrect results when using key-value state und windowOperator.


public class StreamingJob {

public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

DataStream<Tuple2<String,Long>> stream = env.fromElements(
new Tuple2<>("1",1485446260994L),
new Tuple2<>("1",1485446266012L),
new Tuple2<>("1",1485446271031L),
new Tuple2<>("1",1485446276040L),
new Tuple2<>("1",1485446281045L),
new Tuple2<>("1",1485446286049L),
new Tuple2<>("1",1485446291062L),
new Tuple2<>("1",1485446296066L),
new Tuple2<>("1",1485446302019L)
);

stream
.assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessTimestampExtractor<Tuple2<String,
Long>>(Time.seconds(0)) {
@Override
public long extractTimestamp(Tuple2<String, Long> stringLongTuple2) {
return stringLongTuple2.f1;
}
})
.keyBy("f0")
.map(new MapTest())
.keyBy("f0")
.window(TumblingEventTimeWindows.of(Time.seconds(20)))
.apply(new WindowFunction<Tuple2<String,Long>, Object, Tuple, TimeWindow>()
{
@Override
public void apply(Tuple tuple, TimeWindow timeWindow,
Iterable<Tuple2<String, Long>> iterable, Collector<Object> collector)
throws Exception {

Set<Long> set = new HashSet<>();
for(Tuple2<String,Long> t : iterable){
set.add(t.f1);
}

StringBuilder sb = new StringBuilder();

sb.append("Window [" +timeWindow.getStart() +" " +timeWindow.getEnd() +"]
");
sb.append("Set " +set.toString());
System.out.println(sb.toString());
}
})
.print();


// execute program
env.execute("Flink Streaming Java API Skeleton");
}

private static class MapTest extends
RichMapFunction<Tuple2<String,Long>,Tuple2<String,Long>> {

private transient ValueState<Tuple2<String, Long>> state;

@Override
public Tuple2<String, Long> map(Tuple2<String, Long> stringLongTuple2)
throws Exception {

Tuple2<String,Long> t = state.value();

state.update(stringLongTuple2);

if(t == null) return stringLongTuple2;

return t;
}

@Override
public void open(Configuration parameters) throws Exception {

ValueStateDescriptor<Tuple2<String,Long>> vsd = new ValueStateDescriptor<>(
"lastEvent",
TypeInformation.of(new TypeHint<Tuple2<String, Long>>() {}),
null
);

state = getRuntimeContext().getState(vsd);
}
}
}


Output:

Window [1485446260000 1485446280000] Set [1485446271031, 1485446260994,
1485446266012]
Window [1485446280000 1485446300000] Set [1485446291062, 1485446281045,
1485446286049, 1485446276040]
Window [1485446300000 1485446320000] Set [1485446296066]

Best,
Nico

BTW ... I am using Flink 1.1.3.


2017-01-16 12:18 GMT+01:00 Nico <ni...@gmail.com>:

> Hi Aljoscha,
>
> is was able to identify the root cause of the problem. It is my first map
> function using the ValueState. But first, the assignTimestampsAndWaterma
> rks() is called after the connector to Kafka is generated:
>
> FlinkKafkaConsumer09<Car> carFlinkKafkaConsumer09  =
>       new FlinkKafkaConsumer09<>("Traffic", new Car(), properties);
>
> // Extrahieren der Timestamps mit max. Delay von 2s
> carFlinkKafkaConsumer09.assignTimestampsAndWatermarks(new TimestampGenerator(Time.seconds(0)));
>
> In the map function I try to calculate the direction between two GPS data points. For this, I store the last event in ValueState. The function looks like this:
>
> private static class BearingMap extends RichMapFunction<Car, Car> {
>
>    private transient ValueState<Car> state;
>    private final double maxdiff = 12; // in Sekunden
>
>    @Override
>    public Car map(Car destination) throws Exception {
>
>       Car origin = state.value();
>       double olat, olon, dlat, dlon;
>
>       /**
>        *  Wenn State leer, berechne keine Richtung, sondern speichere Event nur in State
>        */
>       if (origin == null){
>          state.update(destination);
>          // gebe Car ohne Aenderungen zurueck
>          return destination;
>       }
>
>       double diff = origin.getTimestamp()-destination.getTimestamp();
>
>            System.out.println("Differenz: " +diff);
>
>            if(Math.abs(diff) <= maxdiff*1000){
>
>          /*
>           * Bei spaeten Events, die jedoch noch in den erlaubten Delay fallen
>           */
>          if(diff > 0){
>             Car tmp = destination;
>             destination = origin;
>             origin = tmp;
>          }
>
>          /*
>           * Car tmp ist immer der Ursprung
>           */
>
>          double bearing = Helper.calculateBearing(
>                origin.getLat(),origin.getLon(),destination.getLat(),destination.getLon());
>
>          // Update des State's
>          state.update(destination);
>
>          origin.setDirection(bearing);
>          return origin;
>
>       }
>
>       // Bei zu spaeten Events behalte jetzigen Status und gebe diesen ohne Richtung zurück
>          return origin;
>
>    }
>
>
>    @Override
>    public void open(Configuration parameters) throws Exception {
>
>       ValueStateDescriptor<Car> vsd = new ValueStateDescriptor<>(
>             "lastEvent",
>             Car.class,
>             null
>       );
>
>       state = getRuntimeContext().getState(vsd);
>    }
>
> }
>
> Together with the window function:
>
>
> private static class TimeWindowTest implements WindowFunction<Car, Tuple9<Double, Double, Double, Double, Double, Double, Double, Integer, List<String>>, Tuple, TimeWindow> {
>     @Override
>     public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Car> iterable, Collector<Tuple9<Double, Double, Double, Double, Double, Double, Double, Integer, List<String>>> collector) throws Exception {
>         String s = "Zeitfenster: " +timeWindow.getStart() +" - " + timeWindow.getEnd() +"\n";
>         Set<Long> timestamps = new HashSet<Long>();
>
>         for( Car c : iterable){
>             timestamps.add(c.getTimestamp());
>         }
>
>         System.out.println( s +timestamps +"\n\n");
>     }
> }
>
> I get for :
>
> stream
>    .filter(new FilterFunction<Car>() {
>       @Override
>       public boolean filter(Car car) throws Exception {
>          return car.getId().equals("car.330");
>       }
>    })
>              .keyBy("id")
>              .map(new BearingMap())
>              .keyBy("id")
>              .window(TumblingEventTimeWindows.of(Time.seconds(10)))
>              .apply(new TimeWindowTest());
>
> So actually, when an event e1 arrives the Map Operator, it is stored in ValueState and after the next element e2 arrives, e1
> will be forwarded. This is after 5 seconds. This generates the following outcome. One Element is always around 5 seconds before the
> start of the window.
>
> Differenz: -5013.0
> Differenz: -5014.0
> Zeitfenster: 1484564690000 - 1484564700000 (Window times start and end)
> [1484564686236, 1484564691260]
>
>
> Differenz: -5009.0
> Differenz: -5007.0
> Zeitfenster: 1484564700000 - 1484564710000
> [1484564696273, 1484564701287]
>
>
> Differenz: -5005.0
> Differenz: -5014.0
> Zeitfenster: 1484564710000 - 1484564720000
> [1484564706296, 1484564711303]
>
>
> Best,
>
> Nico
>
>
>
> 2017-01-09 16:10 GMT+01:00 Aljoscha Krettek <al...@apache.org>:
>
>> Hi,
>> I'm assuming you also have the call to assignTimestampsAndWatermarks()
>> somewhere in there as well, as in:
>>
>> stream
>>       .assignTimestampsAndWatermarks(new TimestampGenerator()) // or
>> somewhere else in the pipeline
>>       .keyBy("id")
>>       .map(...)
>>       .filter(...)
>>       .map(...)
>>       .keyBy("areaID")
>>       .map(new KeyExtractor())
>>       .keyBy("f1.areaID","f0.sinterval")
>>       .window(TumblingEventTimeWindows.of(Time.seconds(20)))
>>       .apply(new TrafficInformation());
>>
>> Just checking, to make sure. If you have this we might have to dig a
>> little deeper. Could you also please trying to bring the whole output of
>> your apply() method in one go, i.e. collect all the output in a String and
>> then have one call to System.out.println(), it could be that the output in
>> the terminal is not completely in order.
>>
>> Cheers,
>> Aljoscha
>>
>> On Mon, 2 Jan 2017 at 15:04 Nico <ni...@gmail.com> wrote:
>>
>>> Hi Aljoscha,
>>>
>>> thank you for having a look. Actually there is not too much code based
>>> on timestamps:
>>>
>>> stream
>>>       .keyBy("id")
>>>       .map(...)
>>>       .filter(...)
>>>       .map(...)
>>>       .keyBy("areaID")
>>>       .map(new KeyExtractor())
>>>       .keyBy("f1.areaID","f0.sinterval")
>>>       .window(TumblingEventTimeWindows.of(Time.seconds(20)))
>>>       .apply(new TrafficInformation());
>>>
>>> The map functions only enrich the data and don't change anything related
>>> to the timestamp.
>>>
>>> the apply function is:
>>>
>>> @Override
>>> public void apply(
>>> Tuple key,
>>> TimeWindow timeWindow,
>>> Iterable<Tuple2<DirectionInterval, Car>> cars,
>>> Collector<Tuple3<String, Double, Double>> out) throws Exception {
>>>
>>> System.out.println("Start: " +timeWindow.getStart());
>>> System.out.println("End: " +timeWindow.getEnd());
>>>
>>> for(Tuple2<DirectionInterval, Car> t : cars){
>>> System.out.println(t.f1);
>>> }
>>>
>>> System.out.println(t.f1) prints all information about a car, in which
>>> the timestep is embedded. The System gets the timestamp with the class:
>>>
>>> public class TimestampGenerator extends BoundedOutOfOrdernessTimestampExtractor
>>> <Car> {
>>>
>>>
>>>     public TimestampGenerator(Time maxOutOfOrderness){
>>>         super(maxOutOfOrderness);
>>>     }
>>>
>>>     @Override
>>>     public long extractTimestamp(Car car) {
>>>         return car.getTimestamp();
>>>     }
>>>
>>>
>>> Example output is presented in the previous post... it looks like the
>>> timestamp is rounded... I am confused :-/
>>>
>>> Best,
>>> Nico
>>>
>>> 2016-12-23 19:41 GMT+01:00 Aljoscha Krettek <al...@apache.org>:
>>>
>>> Hi,
>>> could you please share code (and example data) for producing this
>>> output. I'd like to have a look.
>>>
>>> Cheers,
>>> Aljoscha
>>>
>>> On Wed, 21 Dec 2016 at 16:29 Nico <ni...@gmail.com> wrote:
>>>
>>> Hi @all,
>>>
>>> I am using a TumblingEventTimeWindows.of(Time.seconds(20)) for testing.
>>> During this I found a strange behavior (at least for me) in the assignment
>>> of events.
>>>
>>> The first element of a new window is actually always part of the old
>>> window. I thought the events are late, but then they they would be dropped
>>> instead of assigned to the new window. Even with a allowedLateness of 10s
>>> the behavior remains the same.
>>>
>>> The used timeWindow.getStart() and getEnd in order to get the boundaries
>>> of the window.
>>>
>>> Can someone explain this?
>>>
>>> Best,
>>> Nico
>>>
>>>
>>> TimeWindows with Elements:
>>>
>>> Start: 1482332940000 - End: 1482332960000
>>> timestamp=1482332952907
>>>
>>> Start: 1482332960000 - End: 1482332980000
>>> timestamp=1482332958929
>>> timestamp=1482332963995
>>> timestamp=1482332969027
>>> timestamp=1482332974039
>>>
>>> Start: 1482332980000 - End: 1482333000000
>>> timestamp=1482332979059
>>> timestamp=1482332984072
>>> timestamp=1482332989081
>>> timestamp=1482332994089
>>>
>>> Start: 1482333000000 - End: 1482333020000
>>> timestamp=1482332999113
>>> timestamp=1482333004123
>>> timestamp=1482333009132
>>> timestamp=1482333014144
>>>
>>>
>>>
>

Re: Events are assigned to wrong window

Posted by Nico <ni...@gmail.com>.
Hi Aljoscha,

is was able to identify the root cause of the problem. It is my first map
function using the ValueState. But first, the assignTimestampsAndWatermarks()
is called after the connector to Kafka is generated:

FlinkKafkaConsumer09<Car> carFlinkKafkaConsumer09  =
      new FlinkKafkaConsumer09<>("Traffic", new Car(), properties);

// Extrahieren der Timestamps mit max. Delay von 2s
carFlinkKafkaConsumer09.assignTimestampsAndWatermarks(new
TimestampGenerator(Time.seconds(0)));

In the map function I try to calculate the direction between two GPS
data points. For this, I store the last event in ValueState. The
function looks like this:

private static class BearingMap extends RichMapFunction<Car, Car> {

   private transient ValueState<Car> state;
   private final double maxdiff = 12; // in Sekunden

   @Override
   public Car map(Car destination) throws Exception {

      Car origin = state.value();
      double olat, olon, dlat, dlon;

      /**
       *  Wenn State leer, berechne keine Richtung, sondern speichere
Event nur in State
       */
      if (origin == null){
         state.update(destination);
         // gebe Car ohne Aenderungen zurueck
         return destination;
      }

      double diff = origin.getTimestamp()-destination.getTimestamp();

           System.out.println("Differenz: " +diff);

           if(Math.abs(diff) <= maxdiff*1000){

         /*
          * Bei spaeten Events, die jedoch noch in den erlaubten Delay fallen
          */
         if(diff > 0){
            Car tmp = destination;
            destination = origin;
            origin = tmp;
         }

         /*
          * Car tmp ist immer der Ursprung
          */

         double bearing = Helper.calculateBearing(
               origin.getLat(),origin.getLon(),destination.getLat(),destination.getLon());

         // Update des State's
         state.update(destination);

         origin.setDirection(bearing);
         return origin;

      }

      // Bei zu spaeten Events behalte jetzigen Status und gebe diesen
ohne Richtung zurück
         return origin;

   }


   @Override
   public void open(Configuration parameters) throws Exception {

      ValueStateDescriptor<Car> vsd = new ValueStateDescriptor<>(
            "lastEvent",
            Car.class,
            null
      );

      state = getRuntimeContext().getState(vsd);
   }

}

Together with the window function:


private static class TimeWindowTest implements WindowFunction<Car,
Tuple9<Double, Double, Double, Double, Double, Double, Double,
Integer, List<String>>, Tuple, TimeWindow> {
    @Override
    public void apply(Tuple tuple, TimeWindow timeWindow,
Iterable<Car> iterable, Collector<Tuple9<Double, Double, Double,
Double, Double, Double, Double, Integer, List<String>>> collector)
throws Exception {
        String s = "Zeitfenster: " +timeWindow.getStart() +" - " +
timeWindow.getEnd() +"\n";
        Set<Long> timestamps = new HashSet<Long>();

        for( Car c : iterable){
            timestamps.add(c.getTimestamp());
        }

        System.out.println( s +timestamps +"\n\n");
    }
}

I get for :

stream
   .filter(new FilterFunction<Car>() {
      @Override
      public boolean filter(Car car) throws Exception {
         return car.getId().equals("car.330");
      }
   })
             .keyBy("id")
             .map(new BearingMap())
             .keyBy("id")
             .window(TumblingEventTimeWindows.of(Time.seconds(10)))
             .apply(new TimeWindowTest());

So actually, when an event e1 arrives the Map Operator, it is stored
in ValueState and after the next element e2 arrives, e1
will be forwarded. This is after 5 seconds. This generates the
following outcome. One Element is always around 5 seconds before the
start of the window.

Differenz: -5013.0
Differenz: -5014.0
Zeitfenster: 1484564690000 - 1484564700000 (Window times start and end)
[1484564686236, 1484564691260]


Differenz: -5009.0
Differenz: -5007.0
Zeitfenster: 1484564700000 - 1484564710000
[1484564696273, 1484564701287]


Differenz: -5005.0
Differenz: -5014.0
Zeitfenster: 1484564710000 - 1484564720000
[1484564706296, 1484564711303]


Best,

Nico



2017-01-09 16:10 GMT+01:00 Aljoscha Krettek <al...@apache.org>:

> Hi,
> I'm assuming you also have the call to assignTimestampsAndWatermarks()
> somewhere in there as well, as in:
>
> stream
>       .assignTimestampsAndWatermarks(new TimestampGenerator()) // or
> somewhere else in the pipeline
>       .keyBy("id")
>       .map(...)
>       .filter(...)
>       .map(...)
>       .keyBy("areaID")
>       .map(new KeyExtractor())
>       .keyBy("f1.areaID","f0.sinterval")
>       .window(TumblingEventTimeWindows.of(Time.seconds(20)))
>       .apply(new TrafficInformation());
>
> Just checking, to make sure. If you have this we might have to dig a
> little deeper. Could you also please trying to bring the whole output of
> your apply() method in one go, i.e. collect all the output in a String and
> then have one call to System.out.println(), it could be that the output in
> the terminal is not completely in order.
>
> Cheers,
> Aljoscha
>
> On Mon, 2 Jan 2017 at 15:04 Nico <ni...@gmail.com> wrote:
>
>> Hi Aljoscha,
>>
>> thank you for having a look. Actually there is not too much code based on
>> timestamps:
>>
>> stream
>>       .keyBy("id")
>>       .map(...)
>>       .filter(...)
>>       .map(...)
>>       .keyBy("areaID")
>>       .map(new KeyExtractor())
>>       .keyBy("f1.areaID","f0.sinterval")
>>       .window(TumblingEventTimeWindows.of(Time.seconds(20)))
>>       .apply(new TrafficInformation());
>>
>> The map functions only enrich the data and don't change anything related
>> to the timestamp.
>>
>> the apply function is:
>>
>> @Override
>> public void apply(
>> Tuple key,
>> TimeWindow timeWindow,
>> Iterable<Tuple2<DirectionInterval, Car>> cars,
>> Collector<Tuple3<String, Double, Double>> out) throws Exception {
>>
>> System.out.println("Start: " +timeWindow.getStart());
>> System.out.println("End: " +timeWindow.getEnd());
>>
>> for(Tuple2<DirectionInterval, Car> t : cars){
>> System.out.println(t.f1);
>> }
>>
>> System.out.println(t.f1) prints all information about a car, in which the
>> timestep is embedded. The System gets the timestamp with the class:
>>
>> public class TimestampGenerator extends BoundedOutOfOrdernessTimestampExtractor
>> <Car> {
>>
>>
>>     public TimestampGenerator(Time maxOutOfOrderness){
>>         super(maxOutOfOrderness);
>>     }
>>
>>     @Override
>>     public long extractTimestamp(Car car) {
>>         return car.getTimestamp();
>>     }
>>
>>
>> Example output is presented in the previous post... it looks like the
>> timestamp is rounded... I am confused :-/
>>
>> Best,
>> Nico
>>
>> 2016-12-23 19:41 GMT+01:00 Aljoscha Krettek <al...@apache.org>:
>>
>> Hi,
>> could you please share code (and example data) for producing this output.
>> I'd like to have a look.
>>
>> Cheers,
>> Aljoscha
>>
>> On Wed, 21 Dec 2016 at 16:29 Nico <ni...@gmail.com> wrote:
>>
>> Hi @all,
>>
>> I am using a TumblingEventTimeWindows.of(Time.seconds(20)) for testing.
>> During this I found a strange behavior (at least for me) in the assignment
>> of events.
>>
>> The first element of a new window is actually always part of the old
>> window. I thought the events are late, but then they they would be dropped
>> instead of assigned to the new window. Even with a allowedLateness of 10s
>> the behavior remains the same.
>>
>> The used timeWindow.getStart() and getEnd in order to get the boundaries
>> of the window.
>>
>> Can someone explain this?
>>
>> Best,
>> Nico
>>
>>
>> TimeWindows with Elements:
>>
>> Start: 1482332940000 - End: 1482332960000
>> timestamp=1482332952907
>>
>> Start: 1482332960000 - End: 1482332980000
>> timestamp=1482332958929
>> timestamp=1482332963995
>> timestamp=1482332969027
>> timestamp=1482332974039
>>
>> Start: 1482332980000 - End: 1482333000000
>> timestamp=1482332979059
>> timestamp=1482332984072
>> timestamp=1482332989081
>> timestamp=1482332994089
>>
>> Start: 1482333000000 - End: 1482333020000
>> timestamp=1482332999113
>> timestamp=1482333004123
>> timestamp=1482333009132
>> timestamp=1482333014144
>>
>>
>>

Re: Events are assigned to wrong window

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
I'm assuming you also have the call to assignTimestampsAndWatermarks()
somewhere in there as well, as in:

stream
      .assignTimestampsAndWatermarks(new TimestampGenerator()) // or
somewhere else in the pipeline
      .keyBy("id")
      .map(...)
      .filter(...)
      .map(...)
      .keyBy("areaID")
      .map(new KeyExtractor())
      .keyBy("f1.areaID","f0.sinterval")
      .window(TumblingEventTimeWindows.of(Time.seconds(20)))
      .apply(new TrafficInformation());

Just checking, to make sure. If you have this we might have to dig a little
deeper. Could you also please trying to bring the whole output of your
apply() method in one go, i.e. collect all the output in a String and then
have one call to System.out.println(), it could be that the output in the
terminal is not completely in order.

Cheers,
Aljoscha

On Mon, 2 Jan 2017 at 15:04 Nico <ni...@gmail.com> wrote:

> Hi Aljoscha,
>
> thank you for having a look. Actually there is not too much code based on
> timestamps:
>
> stream
>       .keyBy("id")
>       .map(...)
>       .filter(...)
>       .map(...)
>       .keyBy("areaID")
>       .map(new KeyExtractor())
>       .keyBy("f1.areaID","f0.sinterval")
>       .window(TumblingEventTimeWindows.of(Time.seconds(20)))
>       .apply(new TrafficInformation());
>
> The map functions only enrich the data and don't change anything related
> to the timestamp.
>
> the apply function is:
>
> @Override
> public void apply(
> Tuple key,
> TimeWindow timeWindow,
> Iterable<Tuple2<DirectionInterval, Car>> cars,
> Collector<Tuple3<String, Double, Double>> out) throws Exception {
>
> System.out.println("Start: " +timeWindow.getStart());
> System.out.println("End: " +timeWindow.getEnd());
>
> for(Tuple2<DirectionInterval, Car> t : cars){
> System.out.println(t.f1);
> }
>
> System.out.println(t.f1) prints all information about a car, in which the
> timestep is embedded. The System gets the timestamp with the class:
>
> public class TimestampGenerator extends
> BoundedOutOfOrdernessTimestampExtractor <Car> {
>
>
>     public TimestampGenerator(Time maxOutOfOrderness){
>         super(maxOutOfOrderness);
>     }
>
>     @Override
>     public long extractTimestamp(Car car) {
>         return car.getTimestamp();
>     }
>
>
> Example output is presented in the previous post... it looks like the
> timestamp is rounded... I am confused :-/
>
> Best,
> Nico
>
> 2016-12-23 19:41 GMT+01:00 Aljoscha Krettek <al...@apache.org>:
>
> Hi,
> could you please share code (and example data) for producing this output.
> I'd like to have a look.
>
> Cheers,
> Aljoscha
>
> On Wed, 21 Dec 2016 at 16:29 Nico <ni...@gmail.com> wrote:
>
> Hi @all,
>
> I am using a TumblingEventTimeWindows.of(Time.seconds(20)) for testing.
> During this I found a strange behavior (at least for me) in the assignment
> of events.
>
> The first element of a new window is actually always part of the old
> window. I thought the events are late, but then they they would be dropped
> instead of assigned to the new window. Even with a allowedLateness of 10s
> the behavior remains the same.
>
> The used timeWindow.getStart() and getEnd in order to get the boundaries
> of the window.
>
> Can someone explain this?
>
> Best,
> Nico
>
>
> TimeWindows with Elements:
>
> Start: 1482332940000 - End: 1482332960000
> timestamp=1482332952907
>
> Start: 1482332960000 - End: 1482332980000
> timestamp=1482332958929
> timestamp=1482332963995
> timestamp=1482332969027
> timestamp=1482332974039
>
> Start: 1482332980000 - End: 1482333000000
> timestamp=1482332979059
> timestamp=1482332984072
> timestamp=1482332989081
> timestamp=1482332994089
>
> Start: 1482333000000 - End: 1482333020000
> timestamp=1482332999113
> timestamp=1482333004123
> timestamp=1482333009132
> timestamp=1482333014144
>
>
>