You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by David Magalhães <sp...@gmail.com> on 2020/01/10 04:14:03 UTC

Custom File Sink using EventTime and defined custom file name for parquet file

Hi, I'm working for the first time with Flink and I'm trying to create
solution that will store events from Kafka into Parquet files in S3. This
also should support re-injection of events from Parquet files into a Kafka
topic.

Here <https://gist.github.com/speeddragon/18fbd570557da59d7f6a2c5822cc7ad4>
is the code with a simple usage of StreamingFileSink with BulkEncode that
will get the events and store in parquet files. The files will be partition
by account_id and year and month (yyyyMM). The issue with this approach is
when running the backfill from a certain point in time, it will be hard to
not generate duplicated events, since we will not override the same files,
as the filename is generate by "*part-<sub_task_id>-<sequencial_number>*".

To add predictability, I've used a tumbling window to aggregate multiple
GenericRecord, in order to write the parquet file with a list of them. For
that I've created a custom file sink, but I'm not sure of the properties I
am going to lose compared to the Streaming File Sink. Here
<https://gist.github.com/speeddragon/6a98805d7f4aacff729f3d60b6a57ff8> is
the code. Still, there is something missing in this solution to close a
window for with a giving timeout, so it can write into the sink the last
events if no more events are sent.

Another work around, would be create a StreamingFileSink with a RowEncoder,
and receive a List of GenericRecord, and create a custom Encoder with
*AvroParquetWritter* to write to a File. This way I have access to a custom
rolling policy. But this looks like truly inefficient. Here
<https://gist.github.com/speeddragon/ea19cb07569a52cd78fad8d4af8c9e68> is
the code.

Am I overthinking this solution ? I'm know there are some issues (recently
closed) for the StreamingFileSink to support more custom rolling policies
in BulkEncode, like https://issues.apache.org/jira/browse/FLINK-13027, but
I just notice that now.
<https://gist.github.com/speeddragon/ea19cb07569a52cd78fad8d4af8c9e68>

Re: Custom File Sink using EventTime and defined custom file name for parquet file

Posted by Kostas Kloudas <kk...@apache.org>.
Oops, sorry for not sending the reply to everyone
and thanks David for reposting it here.
Great to hear that you solved your issue!

Kostas



On Wed, Jan 15, 2020 at 1:57 PM David Magalhães <sp...@gmail.com> wrote:
>
> Sorry, I've only saw the replies today.
>
> Regarding my previous email,
>
>> Still, there is something missing in this solution to close a window for with a giving timeout, so it can write into the sink the last events if no more events are sent.
>
>
> I've fixed this using a custom trigger,
>
> val flag = ctx.getPartitionedState(valueStateDescriptor).value()
>
> // Flag only used to register one trigger per window. Flag is cleaned when FIRE action is executed.
> if (!flag) {
>   val delay = window.getEnd - window.getStart
>   ctx.getPartitionedState(valueStateDescriptor).update(true)
>   ctx.registerProcessingTimeTimer(ctx.getCurrentProcessingTime + delay)
>   ctx.registerEventTimeTimer(window.maxTimestamp())
> }
>
> TriggerResult.CONTINUE
>
> Leonard, by "duplicated events" I mean store the same event on different parquet files, since the file format was "part-X-Y". So, if I start to process the same stream again (from a point in the past) I couldn't overwrite the files with exactly the same name.
>
> I think I've read a blogpost about them (Pinterest), I will check the video.
>
> Kostas, replied to only me, I'm adding his response here.
>
>> Hi David,
>> I skimmed through the solution with the window before the sink.
>> If this solution fits your needs, I think you could:
>> 1)  just specify a BucketAssigner instead of writing a custom sink,
>> this will allow you to not lose any functionality from the
>> StreamingFileSink
>> 2)  for the timeout requirement, you could use a (keyed) process
>> function with map state to hold your event-time windows. The key will
>> be the window start (or interval) and you can register timers to fire
>> at the end of the window or after a certain period of inactivity. I
>> think that [1] can be a good starting point.
>> I hope this helps,
>> Kostas
>> [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html
>
>
> I think I can only define partition name on BucketAssigner, because I don't want to have many partition (currently I've accountId and yyyyMM (year and month)). I've checked that on Flink 1.10 [1] we will have access to configure a prefix and suffix for the filename, where I could add the day and hour to the prefix, and when I needed to store again the same events I could start from specific time (probably match with a Kafka offset) and remove the files with prefix date newer than this time.
>
> The only scenario for this case is when for some reason Flink is writing bad files (events with wrong information for some reason), that need to be stored (processed) again.
>
> For 2), my implementation with the trigger solved this.
>
> [1] https://github.com/apache/flink/blob/master/docs/dev/connectors/streamfile_sink.md
>
> On Tue, Jan 14, 2020 at 6:28 PM Till Rohrmann <tr...@apache.org> wrote:
>>
>> Hi David,
>>
>> I'm pulling in Kostas who worked on the StreamingFileSink and might be able to answer some of your questions.
>>
>> Cheers,
>> Till
>>
>> On Mon, Jan 13, 2020 at 2:45 PM Leonard Xu <xb...@gmail.com> wrote:
>>>
>>> Hi, David
>>>
>>> For you first description, I’m a little confused about duplicated records when backfilling, could you describe your usage scenario/code more?
>>>
>>> I remembered a backfill user solution from Pinterest which is very similar to yours and using Flink too[1], hope that can help you.
>>>
>>> Best,
>>> Leonard
>>>
>>> [1] https://www.youtube.com/watch?v=3-X6FJ5JS4E&list=PLDX4T_cnKjD207Aa8b5CsZjc7Z_KRezGz&index=64
>>>
>>> 在 2020年1月10日,12:14,David Magalhães <sp...@gmail.com> 写道:
>>>
>>> Hi, I'm working for the first time with Flink and I'm trying to create solution that will store events from Kafka into Parquet files in S3. This also should support re-injection of events from Parquet files into a Kafka topic.
>>>
>>> Here is the code with a simple usage of StreamingFileSink with BulkEncode that will get the events and store in parquet files. The files will be partition by account_id and year and month (yyyyMM). The issue with this approach is when running the backfill from a certain point in time, it will be hard to not generate duplicated events, since we will not override the same files, as the filename is generate by "part-<sub_task_id>-<sequencial_number>".
>>>
>>> To add predictability, I've used a tumbling window to aggregate multiple GenericRecord, in order to write the parquet file with a list of them. For that I've created a custom file sink, but I'm not sure of the properties I am going to lose compared to the Streaming File Sink. Here is the code. Still, there is something missing in this solution to close a window for with a giving timeout, so it can write into the sink the last events if no more events are sent.
>>>
>>> Another work around, would be create a StreamingFileSink with a RowEncoder, and receive a List of GenericRecord, and create a custom Encoder with AvroParquetWritter to write to a File. This way I have access to a custom rolling policy. But this looks like truly inefficient. Here is the code.
>>>
>>> Am I overthinking this solution ? I'm know there are some issues (recently closed) for the StreamingFileSink to support more custom rolling policies in BulkEncode, like https://issues.apache.org/jira/browse/FLINK-13027, but I just notice that now.
>>>
>>>

Re: Custom File Sink using EventTime and defined custom file name for parquet file

Posted by David Magalhães <sp...@gmail.com>.
Sorry, I've only saw the replies today.

Regarding my previous email,

Still, there is something missing in this solution to close a window for
> with a giving timeout, so it can write into the sink the last events if no
> more events are sent.


I've fixed this using a custom trigger,

val flag = ctx.getPartitionedState(valueStateDescriptor).value()

// Flag only used to register one trigger per window. Flag is cleaned when
FIRE action is executed.
if (!flag) {
  val delay = window.getEnd - window.getStart
  ctx.getPartitionedState(valueStateDescriptor).update(true)
  ctx.registerProcessingTimeTimer(ctx.getCurrentProcessingTime + delay)
  ctx.registerEventTimeTimer(window.maxTimestamp())
}

TriggerResult.CONTINUE

Leonard, by "duplicated events" I mean store the same event on different
parquet files, since the file format was "part-X-Y". So, if I start to
process the same stream again (from a point in the past) I couldn't
overwrite the files with exactly the same name.

I think I've read a blogpost about them (Pinterest), I will check the video.

Kostas, replied to only me, I'm adding his response here.

Hi David,
> I skimmed through the solution with the window before the sink.
> If this solution fits your needs, I think you could:
> 1)  just specify a BucketAssigner instead of writing a custom sink,
> this will allow you to not lose any functionality from the
> StreamingFileSink
> 2)  for the timeout requirement, you could use a (keyed) process
> function with map state to hold your event-time windows. The key will
> be the window start (or interval) and you can register timers to fire
> at the end of the window or after a certain period of inactivity. I
> think that [1] can be a good starting point.
> I hope this helps,
> Kostas
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html


I think I can only define partition name on *BucketAssigner*, because I
don't want to have many partition (currently I've *accountId* and *yyyyMM*
(year and month)). I've checked that on Flink 1.10 [1] we will have access
to configure a prefix and suffix for the filename, where I could add the
day and hour to the prefix, and when I needed to store again the same
events I could start from specific time (probably match with a Kafka
offset) and remove the files with prefix date newer than this time.

The only scenario for this case is when for some reason Flink is writing
bad files (events with wrong information for some reason), that need to be
stored (processed) again.

For 2), my implementation with the trigger solved this.

[1]
https://github.com/apache/flink/blob/master/docs/dev/connectors/streamfile_sink.md

On Tue, Jan 14, 2020 at 6:28 PM Till Rohrmann <tr...@apache.org> wrote:

> Hi David,
>
> I'm pulling in Kostas who worked on the StreamingFileSink and might be
> able to answer some of your questions.
>
> Cheers,
> Till
>
> On Mon, Jan 13, 2020 at 2:45 PM Leonard Xu <xb...@gmail.com> wrote:
>
>> Hi, David
>>
>> For you first description, I’m a little confused about duplicated records
>> when backfilling, could you describe your usage scenario/code more?
>>
>> I remembered a backfill user solution from Pinterest which is very
>> similar to yours and using Flink too[1], hope that can help you.
>>
>> Best,
>> Leonard
>>
>> [1]
>> https://www.youtube.com/watch?v=3-X6FJ5JS4E&list=PLDX4T_cnKjD207Aa8b5CsZjc7Z_KRezGz&index=64
>>
>>
>> 在 2020年1月10日,12:14,David Magalhães <sp...@gmail.com> 写道:
>>
>> Hi, I'm working for the first time with Flink and I'm trying to create
>> solution that will store events from Kafka into Parquet files in S3. This
>> also should support re-injection of events from Parquet files into a Kafka
>> topic.
>>
>> Here
>> <https://gist.github.com/speeddragon/18fbd570557da59d7f6a2c5822cc7ad4>
>> is the code with a simple usage of StreamingFileSink with BulkEncode that
>> will get the events and store in parquet files. The files will be partition
>> by account_id and year and month (yyyyMM). The issue with this approach is
>> when running the backfill from a certain point in time, it will be hard to
>> not generate duplicated events, since we will not override the same files,
>> as the filename is generate by "*part-<sub_task_id>-<sequencial_number>*
>> ".
>>
>> To add predictability, I've used a tumbling window to aggregate multiple
>> GenericRecord, in order to write the parquet file with a list of them. For
>> that I've created a custom file sink, but I'm not sure of the properties I
>> am going to lose compared to the Streaming File Sink. Here
>> <https://gist.github.com/speeddragon/6a98805d7f4aacff729f3d60b6a57ff8>
>> is the code. Still, there is something missing in this solution to close a
>> window for with a giving timeout, so it can write into the sink the last
>> events if no more events are sent.
>>
>> Another work around, would be create a StreamingFileSink with a
>> RowEncoder, and receive a List of GenericRecord, and create a custom
>> Encoder with *AvroParquetWritter* to write to a File. This way I have
>> access to a custom rolling policy. But this looks like truly inefficient.
>> Here
>> <https://gist.github.com/speeddragon/ea19cb07569a52cd78fad8d4af8c9e68>
>> is the code.
>>
>> Am I overthinking this solution ? I'm know there are some issues
>> (recently closed) for the StreamingFileSink to support more custom rolling
>> policies in BulkEncode, like
>> https://issues.apache.org/jira/browse/FLINK-13027, but I just notice
>> that now.
>> <https://gist.github.com/speeddragon/ea19cb07569a52cd78fad8d4af8c9e68>
>>
>>
>>

Re: Custom File Sink using EventTime and defined custom file name for parquet file

Posted by Till Rohrmann <tr...@apache.org>.
Hi David,

I'm pulling in Kostas who worked on the StreamingFileSink and might be able
to answer some of your questions.

Cheers,
Till

On Mon, Jan 13, 2020 at 2:45 PM Leonard Xu <xb...@gmail.com> wrote:

> Hi, David
>
> For you first description, I’m a little confused about duplicated records
> when backfilling, could you describe your usage scenario/code more?
>
> I remembered a backfill user solution from Pinterest which is very similar
> to yours and using Flink too[1], hope that can help you.
>
> Best,
> Leonard
>
> [1]
> https://www.youtube.com/watch?v=3-X6FJ5JS4E&list=PLDX4T_cnKjD207Aa8b5CsZjc7Z_KRezGz&index=64
>
>
> 在 2020年1月10日,12:14,David Magalhães <sp...@gmail.com> 写道:
>
> Hi, I'm working for the first time with Flink and I'm trying to create
> solution that will store events from Kafka into Parquet files in S3. This
> also should support re-injection of events from Parquet files into a Kafka
> topic.
>
> Here
> <https://gist.github.com/speeddragon/18fbd570557da59d7f6a2c5822cc7ad4> is
> the code with a simple usage of StreamingFileSink with BulkEncode that will
> get the events and store in parquet files. The files will be partition by
> account_id and year and month (yyyyMM). The issue with this approach is
> when running the backfill from a certain point in time, it will be hard to
> not generate duplicated events, since we will not override the same files,
> as the filename is generate by "*part-<sub_task_id>-<sequencial_number>*".
>
> To add predictability, I've used a tumbling window to aggregate multiple
> GenericRecord, in order to write the parquet file with a list of them. For
> that I've created a custom file sink, but I'm not sure of the properties I
> am going to lose compared to the Streaming File Sink. Here
> <https://gist.github.com/speeddragon/6a98805d7f4aacff729f3d60b6a57ff8> is
> the code. Still, there is something missing in this solution to close a
> window for with a giving timeout, so it can write into the sink the last
> events if no more events are sent.
>
> Another work around, would be create a StreamingFileSink with a
> RowEncoder, and receive a List of GenericRecord, and create a custom
> Encoder with *AvroParquetWritter* to write to a File. This way I have
> access to a custom rolling policy. But this looks like truly inefficient.
> Here
> <https://gist.github.com/speeddragon/ea19cb07569a52cd78fad8d4af8c9e68> is
> the code.
>
> Am I overthinking this solution ? I'm know there are some issues (recently
> closed) for the StreamingFileSink to support more custom rolling policies
> in BulkEncode, like https://issues.apache.org/jira/browse/FLINK-13027,
> but I just notice that now.
> <https://gist.github.com/speeddragon/ea19cb07569a52cd78fad8d4af8c9e68>
>
>
>

Re: Custom File Sink using EventTime and defined custom file name for parquet file

Posted by Leonard Xu <xb...@gmail.com>.
Hi, David

For you first description, I’m a little confused about duplicated records when backfilling, could you describe your usage scenario/code more?
 
I remembered a backfill user solution from Pinterest which is very similar to yours and using Flink too[1], hope that can help you.

Best,
Leonard

[1] https://www.youtube.com/watch?v=3-X6FJ5JS4E&list=PLDX4T_cnKjD207Aa8b5CsZjc7Z_KRezGz&index=64 <https://www.youtube.com/watch?v=3-X6FJ5JS4E&list=PLDX4T_cnKjD207Aa8b5CsZjc7Z_KRezGz&index=64> 

> 在 2020年1月10日,12:14,David Magalhães <sp...@gmail.com> 写道:
> 
> Hi, I'm working for the first time with Flink and I'm trying to create solution that will store events from Kafka into Parquet files in S3. This also should support re-injection of events from Parquet files into a Kafka topic.
> 
> Here <https://gist.github.com/speeddragon/18fbd570557da59d7f6a2c5822cc7ad4> is the code with a simple usage of StreamingFileSink with BulkEncode that will get the events and store in parquet files. The files will be partition by account_id and year and month (yyyyMM). The issue with this approach is when running the backfill from a certain point in time, it will be hard to not generate duplicated events, since we will not override the same files, as the filename is generate by "part-<sub_task_id>-<sequencial_number>".
> 
> To add predictability, I've used a tumbling window to aggregate multiple GenericRecord, in order to write the parquet file with a list of them. For that I've created a custom file sink, but I'm not sure of the properties I am going to lose compared to the Streaming File Sink. Here <https://gist.github.com/speeddragon/6a98805d7f4aacff729f3d60b6a57ff8> is the code. Still, there is something missing in this solution to close a window for with a giving timeout, so it can write into the sink the last events if no more events are sent.
> 
> Another work around, would be create a StreamingFileSink with a RowEncoder, and receive a List of GenericRecord, and create a custom Encoder with AvroParquetWritter to write to a File. This way I have access to a custom rolling policy. But this looks like truly inefficient. Here <https://gist.github.com/speeddragon/ea19cb07569a52cd78fad8d4af8c9e68> is the code.
> 
> Am I overthinking this solution ? I'm know there are some issues (recently closed) for the StreamingFileSink to support more custom rolling policies in BulkEncode, like https://issues.apache.org/jira/browse/FLINK-13027 <https://issues.apache.org/jira/browse/FLINK-13027>, but I just notice that now.
>  <https://gist.github.com/speeddragon/ea19cb07569a52cd78fad8d4af8c9e68>