You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Dominik Wosiński <wo...@gmail.com> on 2021/03/01 16:32:36 UTC

Watermarks when reading from file

Hey,
I have a question regarding DataStream created from multiple files in s3. I
have several files in AWS s3, say the path is s3://files/, and then there
are several folders for different days, so in the end the full paths look
like : s3://files/day=1/file.parquet, s3://files/day=2/file.parquet. I
wanted to read all the files and sort them via some specific value.

I thought that I could use the fact that the Long.MAX watermark is
generated, so I've decided to use event time window of size larger than the
data in files.

So, I have something like:

val inputFormat =new ParquetAvroInputFormat[TestData](new Path(
  ("s3a://files/")))
inputFormat.setNestedFileEnumeration(true)
val ste = StreamExecutionEnvironment.createLocalEnvironment(1)
ste.createInput(inputFormat)
  .assignTimestampsAndWatermarks(
     new OutOfOrdernessWatermarkStrategy[TestData](3000, _.getTimestamp))
  .keyBy(_.getKey)
  .timeWindow(Time.days(90))
  .sideOutputLateData(sideOutput)
  .process(new ProcessWindowFunction[TestData, TestData, String, TimeWindow] {
    override def process(key: String, context: Context,
                         elements: Iterable[TestData],
                         out: Collector[TestData]): Unit = {
      println("Processing: " + elements.toList.size + " for key:" + key)
      elements.toSeq.sortBy(_.getTimestamp)
        .foreach(out.collect(_))
    }
  })






*ParquetAvroInputFormat and OutOfOrdernessWatermarkStrategy are my classes
but those do not cause the issue described here.*
The data in files is kept for 30 days, so there is no way that window will
be closed before the files are closed and *Long.Max* timestamp generated.

Now, the problem I am observing is that I would expect to see one message
printed per key, since the parallelism is one. But for some reason I am
observing that for some of the keys(most of them really) there are two
windows created*. *I have  30 unique keys and each key contains around 1M
records. And The output I can see is more or less like that:

1. Several messages about Switching to Random IO seek policy
2. Print for most of the keys present in the dataset (but the counts are
quite small, most of them around 100k, some as small as few hundred)
3. More Switching to Random IO seek policy
4. Print again for some keys, but now the counts are much higher.

So, the total count of all processed values is correct. It's just I am
interested why the window gets invoked twice.

Thanks in advance,
Best Regards,
Dom.

Re: Watermarks when reading from file

Posted by Dominik Wosiński <wo...@gmail.com>.
Hey Till,
You were obviously right, my bad here. My math was incorrect. The correct
reasoning is that indeed first 5 days of october will be added to the
window number 1 and the rest of days will end up in the second window.
Solved!

Thanks a lotte,
Best Regards,
Dom.

Re: Watermarks when reading from file

Posted by Dominik Wosiński <wo...@gmail.com>.
Hey,
Thanks for the answer, as I've mentioned in the email the data range is
only 30 days, for the tests I've used  the data from october so I basically
have timestamps starting at midningt of 1st october 2020 and finishing at
23:59 30 october 2020, so if I understand correctly this shouldn't cause
the double windowing, but correct me if I am wrong here.

Best Regards,
Dom.

Re: Watermarks when reading from file

Posted by Till Rohrmann <tr...@apache.org>.
Hi Dominik,

I think the problem could be that TumblingTimeWindows don't start with the
timestamp of the first arriving event but start at a multiple of the window
length. So when defining a 90 day tumbling window you define a window from
0 - 89, 90 - 179, .... If your data ranges from day 79 - 109, then it would
fall into two windows.

Cheers,
Till

On Mon, Mar 1, 2021 at 5:34 PM Dominik Wosiński <wo...@gmail.com> wrote:

> Hey,
> I have a question regarding DataStream created from multiple files in s3. I
> have several files in AWS s3, say the path is s3://files/, and then there
> are several folders for different days, so in the end the full paths look
> like : s3://files/day=1/file.parquet, s3://files/day=2/file.parquet. I
> wanted to read all the files and sort them via some specific value.
>
> I thought that I could use the fact that the Long.MAX watermark is
> generated, so I've decided to use event time window of size larger than the
> data in files.
>
> So, I have something like:
>
> val inputFormat =new ParquetAvroInputFormat[TestData](new Path(
>   ("s3a://files/")))
> inputFormat.setNestedFileEnumeration(true)
> val ste = StreamExecutionEnvironment.createLocalEnvironment(1)
> ste.createInput(inputFormat)
>   .assignTimestampsAndWatermarks(
>      new OutOfOrdernessWatermarkStrategy[TestData](3000, _.getTimestamp))
>   .keyBy(_.getKey)
>   .timeWindow(Time.days(90))
>   .sideOutputLateData(sideOutput)
>   .process(new ProcessWindowFunction[TestData, TestData, String,
> TimeWindow] {
>     override def process(key: String, context: Context,
>                          elements: Iterable[TestData],
>                          out: Collector[TestData]): Unit = {
>       println("Processing: " + elements.toList.size + " for key:" + key)
>       elements.toSeq.sortBy(_.getTimestamp)
>         .foreach(out.collect(_))
>     }
>   })
>
>
>
>
>
>
> *ParquetAvroInputFormat and OutOfOrdernessWatermarkStrategy are my classes
> but those do not cause the issue described here.*
> The data in files is kept for 30 days, so there is no way that window will
> be closed before the files are closed and *Long.Max* timestamp generated.
>
> Now, the problem I am observing is that I would expect to see one message
> printed per key, since the parallelism is one. But for some reason I am
> observing that for some of the keys(most of them really) there are two
> windows created*. *I have  30 unique keys and each key contains around 1M
> records. And The output I can see is more or less like that:
>
> 1. Several messages about Switching to Random IO seek policy
> 2. Print for most of the keys present in the dataset (but the counts are
> quite small, most of them around 100k, some as small as few hundred)
> 3. More Switching to Random IO seek policy
> 4. Print again for some keys, but now the counts are much higher.
>
> So, the total count of all processed values is correct. It's just I am
> interested why the window gets invoked twice.
>
> Thanks in advance,
> Best Regards,
> Dom.
>