You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Averell <lv...@gmail.com> on 2019/05/01 00:24:35 UTC

Re: Timestamp and key preservation over operators

Hi Fabian, Guowei

Thanks for the help. My flow is as the attached photo. Where (1) and (2) are
the main data streams from file sources, while (3) and (4) are the
enrichment data, also from file sources.
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1586/Screen_Shot_2019-05-01_at_08.png> 

(5) is to merge-parse (1) and (2), which consists of: 
	A tumbling window function, with early trigger (basing on the number of
records in the window: FIRE when there have been at least one msg from each
stream 1 & 2, not waiting for window end-time)
	A flat map function to parse the incoming msg
	A filter and a map

(6) works as a data enricher, to enrich output of (5) with data from (3) and
(4). As (4) is broadcasted, what My implementation for (6) is like:
	/stream5.union(stream3).keyBy(key2).connect(stream4).process(MyFunction6
extends KeyedBroadcastProcessFunction)/
In this KeyedBroadcastProcessFunction, one msg from (5) would trigger one
output, while a msg from (3) or (4) doesn't send out any records, but update
the States only.

Regarding messages type:
	Outputs of (1) and (2) are of the same type EventType1.
	Output of (3) is of type EventType2_1 extends EventType2
	Output of (5) is of type EventType2_2 extends EventType2
	Input of (6) is of type EventType2 (from the unioned-keyed-stream), and of
type Type3 (from the broadcast stream)
	Output of (6) is of the type EventType2_3, which is mapped from EvenType2_1

As seen on my screenshot, only (5) showed watermark, not (6) nor (7). I
noticed that problem because my (7) didn't work as expected. And when I put
an eventTimeExtractor between (6) and (7), then (7) worked.

Typing all the way until now, I guess that I have known where my issue came
from: I have not assign timestamp/watermark for (3) and (4) because I
thought that they are just idle sources of enrichment data.

/*Because of this, I have another question:*/
I read the text regarding Idling sources [1], but not sure how to implement
that for my file sources. Could you please recommend a
solution/good-practice here?

I have one more question about the recommendation [2] to emit timestamp and
watermark from within the source function. Is there any way to do that with
the file sources?

Thanks and best regards,
Averell

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/event_time.html#idling-sources
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/event_timestamps_watermarks.html#source-functions-with-timestamps-and-watermarks



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Timestamp and key preservation over operators

Posted by Averell <lv...@gmail.com>.
Thank you Fabian.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Timestamp and key preservation over operators

Posted by Fabian Hueske <fh...@gmail.com>.
The window operator cannot configured to use the max timestamp of the
events in the window as the timestamp of the output record.
The reason is that such a behavior can produce late records.

If you want to do that, you have to track the max timestamp and assign it
yourself with a timestamp assigner.

Best, Fabian

Am Fr., 3. Mai 2019 um 09:54 Uhr schrieb Averell <lv...@gmail.com>:

> Thank you Fabian.
>
> One more question from me on this topic: as I send out early messages in my
> window function, the timestamp assigned by window function (to the end-time
> of the window) is not like my expectation. I want it to be the time of the
> (last) message that triggered the output.
>
> Is there any way to accomplish that?
> Currently, I have an assignTimestampsAndWatermarks after my window
> function,
> but, as you said, it is against the best practice.
>
> Thanks and regards,
> Averell
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Re: Timestamp and key preservation over operators

Posted by Averell <lv...@gmail.com>.
Thank you Fabian.

One more question from me on this topic: as I send out early messages in my
window function, the timestamp assigned by window function (to the end-time
of the window) is not like my expectation. I want it to be the time of the
(last) message that triggered the output.

Is there any way to accomplish that?
Currently, I have an assignTimestampsAndWatermarks after my window function,
but, as you said, it is against the best practice. 

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Timestamp and key preservation over operators

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Averell,

Yes, timestamps and watermarks do not (completely) move together.
The watermark should always be lower than the timestamps of the currently
processed records.
Otherwise, the records might be processed as late records (depending on the
logic).

The easiest way to check the timestamp of a message is using a
ProcessFunction.
The Context of the processElement() method has a timestamp() method that
returns the timestamp of the current record.

Best, Fabian

Am Fr., 3. Mai 2019 um 06:08 Uhr schrieb Averell <lv...@gmail.com>:

> Thank you Fabian.
>
> I have one more question about timestamp:
> In the previous email, you asked how did I check the timestamp - I don't
> have an answer. Then I only checked the watermark, not the timestamp. I had
> the (wrong) assumption that watermarks advance along with timestamps.
> Today I played with that early trigger window, putting the output into a
> table, and found that the timestamp is set to the window's end-time, but
> the
> watermark seems not. (My window is [10:00-10:15), my incoming msgs both
> have
> time-stamp of 10:00, which trigger one early output with timestamp
> 10:14:59.999, but the watermark stays at 10:00)
>
> Thus, my question: what is the easiest way to check the timestamp of a
> message?
>
> Thanks and regards,
> Averell
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Re: Timestamp and key preservation over operators

Posted by Averell <lv...@gmail.com>.
Thank you Fabian.

I have one more question about timestamp:
In the previous email, you asked how did I check the timestamp - I don't
have an answer. Then I only checked the watermark, not the timestamp. I had
the (wrong) assumption that watermarks advance along with timestamps.
Today I played with that early trigger window, putting the output into a
table, and found that the timestamp is set to the window's end-time, but the
watermark seems not. (My window is [10:00-10:15), my incoming msgs both have
time-stamp of 10:00, which trigger one early output with timestamp
10:14:59.999, but the watermark stays at 10:00) 

Thus, my question: what is the easiest way to check the timestamp of a
message?

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Timestamp and key preservation over operators

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Averell,

The watermark of a stream is always the low watermark of all its input
streams. If one of the input streams does not have watermarks, Flink does
not compute a watermark for the merged stream.
If you do not need time-based operations on streams 3 and 4, setting the
watermark to MAX_WATERMARK should be a good solution.

Best, Fabian

Am Mi., 1. Mai 2019 um 08:50 Uhr schrieb Averell <lv...@gmail.com>:

> Hi Fabian, Guowei,
>
> I have some updates:
> 1. I added timestamp&watermark extractor on all of my remaining sources (3
> &
> 4), and the watermark does propagate to my final operator.
> 2. As I could not find a way to set my file sources as IDLE, I tried to
> tweak the class ContinuousFileReaderOperator to be always IDLE:
> /       nextElement = format.nextRecord(nextElement);
>         if (nextElement != null) {
>                 readerContext.collect(nextElement);
>                 if
> (this.format.getFilePaths()[0].getPath().contains("<myPath>"))
>                         readerContext.markAsTemporarilyIdle();
>         } else {
> / and the result I got was there's no watermark at all for that stream, and
> that IDLE status seemed not to be taken into account (my CEP operator
> didn't
> generate any output). So I do not understand what that IDLE StreamStatus is
> for.
> My temporary solution, for now, is to use MAX_WATERMARK for those idle
> sources. Not sure whether doing that is recommended?
>
> Thanks for your help.
> Regards,
> Averell
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Re: Timestamp and key preservation over operators

Posted by Averell <lv...@gmail.com>.
Hi Fabian, Guowei,

I have some updates:
1. I added timestamp&watermark extractor on all of my remaining sources (3 &
4), and the watermark does propagate to my final operator.
2. As I could not find a way to set my file sources as IDLE, I tried to
tweak the class ContinuousFileReaderOperator to be always IDLE:
/	nextElement = format.nextRecord(nextElement);
	if (nextElement != null) {
		readerContext.collect(nextElement);
		if (this.format.getFilePaths()[0].getPath().contains("<myPath>"))
			readerContext.markAsTemporarilyIdle();
	} else {
/ and the result I got was there's no watermark at all for that stream, and
that IDLE status seemed not to be taken into account (my CEP operator didn't
generate any output). So I do not understand what that IDLE StreamStatus is
for.
My temporary solution, for now, is to use MAX_WATERMARK for those idle
sources. Not sure whether doing that is recommended?

Thanks for your help.
Regards,
Averell





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/