You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by AndreaKinn <ki...@hotmail.it> on 2017/09/07 20:24:28 UTC

Handle event time

Hi,
I'm getting sensor data from a kafka source and I absolutely need they are
ordered on time data generation basis. I've implemented a custom
deserialiser and employed an AscendingTimestampExtractor to handle event
time.
Obviously I set EventTime as streamTimeCharacteristics.
Unfortunately when I print the stream I see there are many records
unordered. Am I doing something wrong?
I've attached a prove of that:

*env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
		env.enableCheckpointing(CHECKPOINT_TIME); 
		env.setParallelism(1);
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));

		Properties properties = new Properties();
		properties.setProperty("bootstrap.servers", KAFKA_ADDRESS);
		properties.setProperty("group.id", GROUP_ID);

		DataStream<Tuple6&lt;String, String, Date, String, String, Double>> stream
= env
				.addSource(new FlinkKafkaConsumer010<>(TOPIC, new CustomDeserializer(),
properties))
				.assignTimestampsAndWatermarks(new
AscendingTimestampExtractor<Tuple6&lt;String, String, Date, String, String,
Double>>() {

			        @Override
			        public long extractAscendingTimestamp(Tuple6<String, String,
Date, String, String, Double> element) {
			            return element.f2.getTime();
			        }
				})
				.keyBy(0);
		
stream.print()*

<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t985/Screen_Shot_2017-09-07_at_21.png> 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Handle event time

Posted by Xingcan Cui <xi...@gmail.com>.
Hi AndreaKinn,

Reordering in a stream environment is quite costly. AFAIK, Flink doesn't
provide such functions internally.

Watermark is just one of the approaches to deal with the out-of-order
problem. IMO, it just like a coarse-grained
reordering. The late records should be dropped *manually*. Maybe you can
try changing your function to be applied
on streams with such "coarse-grained" ordering. However, if the fully
ordered stream is necessary in your
application, I'm afraid you must cache and re-emit them in a user-defined
processFunction.

Best,
Xingcan


On Tue, Sep 12, 2017 at 1:48 AM, Eron Wright <er...@gmail.com> wrote:

> As mentioned earlier, the watermark is the basis for reasoning about the
> overall progression of time.   Many operators use the watermark to
> correctly organize records, e.g. into the correct time-based window.
> Within that window the records may still be unordered.   That said, some
> operators do take pains to reorder the records, notably the Flink CEP
> operator to correctly detect temporal patterns.  Basically, the operator
> buffers records until a watermark arrives; all buffered records older than
> the watermark may then be sorted and processed.
>
> It is tempting to write a standalone operator that simply reorders records
> as described, but subsequent repartitioning to downstream operators would
> reintroduce disorder.  Therefore one must ensure that subsequent processing
> is done with a 'forward' partitioning strategy.
>
> Hope this helps!
> Eron
>
> On Fri, Sep 8, 2017 at 3:50 AM, AndreaKinn <ki...@hotmail.it> wrote:
>
>> Thank you, effectively I developed also a simple custom solution for
>> watermark looking at flink doc but anyway I see unordered printed streams.
>> I have a doubt about flink behaviour: if I understand, flink doesn't
>> perform
>> automatically reordering of records in a stream, so if for instance a
>> record
>> arrives in late what is the behaviour of flink? In the doc it's described
>> that elements arrive after in late are dropped (allowed lateness default
>> value is 0) but also using this watermark emitter:
>>
>> *public class CustomTimestampExtractor implements
>> AssignerWithPeriodicWatermarks<Tuple6&lt;String, String, Date, String,
>> String, Double>>{
>>
>>         private static final long serialVersionUID = 5448621759931440489L;
>>         private final long maxOutOfOrderness = 0;
>>     private long currentMaxTimestamp;
>>
>>         @Override
>>         public long extractTimestamp(Tuple6<String, String, Date,
>> String, String,
>> Double> element, long previousElementTimestamp) {
>>                 long timestamp = element.f2.getTime();
>>                 currentMaxTimestamp = Math.max(timestamp,
>> currentMaxTimestamp);
>>                 return timestamp;
>>         }
>>
>>         @Override
>>         public Watermark getCurrentWatermark() {
>>                 return new Watermark(currentMaxTimestamp -
>> maxOutOfOrderness);
>>         }
>> }*
>>
>> with maxOutOfOrderness = 0 I see unordered record in the stream.
>>
>> What I want to obtain is a fully ordered stream, is there a way to
>> implement
>> it?
>>
>>
>>
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/
>>
>
>

Re: Handle event time

Posted by Eron Wright <er...@gmail.com>.
As mentioned earlier, the watermark is the basis for reasoning about the
overall progression of time.   Many operators use the watermark to
correctly organize records, e.g. into the correct time-based window.
Within that window the records may still be unordered.   That said, some
operators do take pains to reorder the records, notably the Flink CEP
operator to correctly detect temporal patterns.  Basically, the operator
buffers records until a watermark arrives; all buffered records older than
the watermark may then be sorted and processed.

It is tempting to write a standalone operator that simply reorders records
as described, but subsequent repartitioning to downstream operators would
reintroduce disorder.  Therefore one must ensure that subsequent processing
is done with a 'forward' partitioning strategy.

Hope this helps!
Eron

On Fri, Sep 8, 2017 at 3:50 AM, AndreaKinn <ki...@hotmail.it> wrote:

> Thank you, effectively I developed also a simple custom solution for
> watermark looking at flink doc but anyway I see unordered printed streams.
> I have a doubt about flink behaviour: if I understand, flink doesn't
> perform
> automatically reordering of records in a stream, so if for instance a
> record
> arrives in late what is the behaviour of flink? In the doc it's described
> that elements arrive after in late are dropped (allowed lateness default
> value is 0) but also using this watermark emitter:
>
> *public class CustomTimestampExtractor implements
> AssignerWithPeriodicWatermarks<Tuple6&lt;String, String, Date, String,
> String, Double>>{
>
>         private static final long serialVersionUID = 5448621759931440489L;
>         private final long maxOutOfOrderness = 0;
>     private long currentMaxTimestamp;
>
>         @Override
>         public long extractTimestamp(Tuple6<String, String, Date, String,
> String,
> Double> element, long previousElementTimestamp) {
>                 long timestamp = element.f2.getTime();
>                 currentMaxTimestamp = Math.max(timestamp,
> currentMaxTimestamp);
>                 return timestamp;
>         }
>
>         @Override
>         public Watermark getCurrentWatermark() {
>                 return new Watermark(currentMaxTimestamp -
> maxOutOfOrderness);
>         }
> }*
>
> with maxOutOfOrderness = 0 I see unordered record in the stream.
>
> What I want to obtain is a fully ordered stream, is there a way to
> implement
> it?
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>

Re: Handle event time

Posted by AndreaKinn <ki...@hotmail.it>.
Thank you, effectively I developed also a simple custom solution for
watermark looking at flink doc but anyway I see unordered printed streams.
I have a doubt about flink behaviour: if I understand, flink doesn't perform
automatically reordering of records in a stream, so if for instance a record
arrives in late what is the behaviour of flink? In the doc it's described
that elements arrive after in late are dropped (allowed lateness default
value is 0) but also using this watermark emitter:

*public class CustomTimestampExtractor implements
AssignerWithPeriodicWatermarks<Tuple6&lt;String, String, Date, String,
String, Double>>{

	private static final long serialVersionUID = 5448621759931440489L;
	private final long maxOutOfOrderness = 0;
    private long currentMaxTimestamp;
    
	@Override
	public long extractTimestamp(Tuple6<String, String, Date, String, String,
Double> element, long previousElementTimestamp) {
		long timestamp = element.f2.getTime();
		currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
		return timestamp;
	}

	@Override
	public Watermark getCurrentWatermark() {
		return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
	}
}*

with maxOutOfOrderness = 0 I see unordered record in the stream.

What I want to obtain is a fully ordered stream, is there a way to implement
it?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Handle event time

Posted by Xingcan Cui <xi...@gmail.com>.
Hi AndreaKinn,

The AscendingTimestampExtractor do not work as you think. It should be
applied for streams where timestamps are
monotonously ascending, naturally.

Flink uses watermark to deal with unordered data. When a watermark *t* is
received, it means there should be no more
records whose timestamps are less than or equal to *t*. However, you must
implement your own watermark generation
policy. There are two basic watermark
assigners: AssignerWithPeriodicWatermarks for generating watermarks
periodically
and  AssignerWithPunctuatedWatermarks for generating watermarks when
encountered certain records.

For more information, please refer to [1] and [2].

Best,
Xingcan

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/event_time.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/event_timestamps_watermarks.html

On Fri, Sep 8, 2017 at 4:24 AM, AndreaKinn <ki...@hotmail.it> wrote:

> Hi,
> I'm getting sensor data from a kafka source and I absolutely need they are
> ordered on time data generation basis. I've implemented a custom
> deserialiser and employed an AscendingTimestampExtractor to handle event
> time.
> Obviously I set EventTime as streamTimeCharacteristics.
> Unfortunately when I print the stream I see there are many records
> unordered. Am I doing something wrong?
> I've attached a prove of that:
>
> *env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>                 env.enableCheckpointing(CHECKPOINT_TIME);
>                 env.setParallelism(1);
>         env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
>
>                 Properties properties = new Properties();
>                 properties.setProperty("bootstrap.servers",
> KAFKA_ADDRESS);
>                 properties.setProperty("group.id", GROUP_ID);
>
>                 DataStream<Tuple6&lt;String, String, Date, String, String,
> Double>> stream
> = env
>                                 .addSource(new
> FlinkKafkaConsumer010<>(TOPIC, new CustomDeserializer(),
> properties))
>                                 .assignTimestampsAndWatermarks(new
> AscendingTimestampExtractor<Tuple6&lt;String, String, Date, String,
> String,
> Double>>() {
>
>                                 @Override
>                                 public long extractAscendingTimestamp(Tuple6<String,
> String,
> Date, String, String, Double> element) {
>                                     return element.f2.getTime();
>                                 }
>                                 })
>                                 .keyBy(0);
>
> stream.print()*
>
> <http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/file/t985/Screen_Shot_2017-09-07_at_21.png>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>