You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Marek Maj <ma...@gmail.com> on 2020/09/10 15:09:37 UTC

Streaming data to parquet

Hello Flink Community,

When designing our data pipelines, we very often encounter the requirement
to stream traffic (usually from kafka) to external distributed file system
(usually HDFS or S3). This data is typically meant to be queried from
hive/presto or similar tools. Preferably data sits in columnar format like
parquet.

Currently, using flink, it is possible to leverage StreamingFileSink to
achieve what we want to some extent. It satisfies our requirements to
partition data by event time, ensure exactly-once semantics and
fault-tolerance with checkpointing. Unfortunately, when using bulk writer
like PaquetWriter, that comes with a price of producing a big number of
files which degrades the performance of queries.

I believe that many companies struggle with similar use cases. I know that
some of them have already approached that problem. Solutions like Alibaba
Hologres or Netflix solution with Iceberg described during FF 2019 emerged.
Given that full transition to real-time data warehouse may take a
significant amount of time and effort, I would like to primarily focus on
solutions for tools like hive/presto backed up by a distributed file
system. Usually those are the systems that we are integrating with.

So what options do we have? Maybe I missed some existing open source tool?

Currently, I can come up with two approaches using flink exclusively:
1. Cache incoming traffic in flink state until trigger fires according to
rolling strategy, probably with some late events special strategy and then
output data with StreamingFileSink. Solution is not perfect as it may
introduce additional latency and queries will still be less performant
compared to fully compacted files (late events problem). And the biggest
issue I am afraid of is actually a performance drop while releasing data
from flink state and its peak character
2. Focus on implementing batch rewrite job that will compact data offline.
Source for the job could be both kafka or small files produced by another
job that uses plain StreamingFileSink. The drawback is that whole system
gets more complex, additional maintenance is needed and, maybe what is more
troubling, we enter to batch world again (how could we know that no more
late data will come and we can safely run the job)

I would really love to hear what are community thoughts on that.

Kind regards
Marek

Re: Streaming data to parquet

Posted by Senthil Kumar <se...@vmware.com>.
Arvid, Jan and Ayush,

Thanks for the ideas! -Kumar

From: Jan Lukavský <je...@seznam.cz>
Date: Monday, September 14, 2020 at 6:23 AM
To: "user@flink.apache.org" <us...@flink.apache.org>
Subject: Re: Streaming data to parquet


Hi,

I'd like to mention another approach, which might not be as "flinkish", but removes the source of issues which arise when writing bulk files. The actual cause of issues here is that when creating bulk output, the most efficient option is to have _reversed flow of commit_. That is to say - on contrary of Flink's checkpoint barrier flowing from sources to sinks - the optimal performance in bulk case is to let the sink commit source once it finishes the bulk write (with whatever period). This is currently impossible to achieve with Flink, but what works for me the best is to use Flink sinks to write streaming commit log (e.g. Kafka) and then have independent processes (Kafka consumers or equivalent) to read output topics, pack them and push to bulk store, once the write is finished, the Kafka topic is committed. It requires deployment of additional application, but that is low overhead in deployments like k8s.

Moreover, this solves the dilemma between quick commits (for real-time data) and large files, because one can read data from both streaming (real real-time) source and do a union with batch data stored at bulk store. Both these techniques are implemented in [1] (disclaimer: I'm one of the core developers of that platform).

Jan

[1] https://github.com/O2-Czech-Republic/proxima-platform<https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2FO2-Czech-Republic%2Fproxima-platform&data=02%7C01%7Csenthilku%40vmware.com%7C63bdd93d8fb9422d323d08d858a90584%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C0%7C637356830252891966&sdata=8rveOaZCcqKeHHFqbaNkVhYooHQiY3oFdwnC1yGgca8%3D&reserved=0>
On 9/14/20 2:03 PM, Arvid Heise wrote:
Hi Kumar,

for late events, I have seen two approaches:

* Initial compaction every day, repeated compaction after two days, and after 1 week.
* Using something like delta lake [1], which is a set of specially structured parquet files. Usually you also compact them after some time (e.g. 1 week in your case), but you can query them efficiently in the meantime.

However, I'm not aware of some out-of-the-box delta lake solution for Flink. This might be something that we could put on the community agenda if there is a general interest.

[1] https://slacker.ro/2019/08/21/diving-into-delta-lake-unpacking-the-transaction-log/<https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fslacker.ro%2F2019%2F08%2F21%2Fdiving-into-delta-lake-unpacking-the-transaction-log%2F&data=02%7C01%7Csenthilku%40vmware.com%7C63bdd93d8fb9422d323d08d858a90584%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C0%7C637356830252901961&sdata=1aYcV%2F93blapUS2ml2iHcF%2F8XVxCnJLwuV0H6VKMaRI%3D&reserved=0>

On Fri, Sep 11, 2020 at 5:16 PM Senthil Kumar <se...@vmware.com>> wrote:
Hello Ayush,

I am interesting in knowing about your “really simple” implementation.

So assuming the streaming parquet output goes to S3 bucket: Initial (partitioned by event time)

Do you write another Flink batch application (step 2) which partitions the data from Initial in larger “event time” chunks
and writes it out to another S3 bucket?

In our case, we are getting straggling records with event times which might be up to 1 week old.
One approach is to simply write the batch job after 1 week, but then we lose the ability to query the recent data in an efficient fashion.

I would appreciate any ideas etc.

Cheers
Kumar

From: Ayush Verma <ay...@gmail.com>>
Date: Friday, September 11, 2020 at 8:14 AM
To: Robert Metzger <rm...@apache.org>>
Cc: Marek Maj <ma...@gmail.com>>, user <us...@flink.apache.org>>
Subject: Re: Streaming data to parquet

Hi,

Looking at the problem broadly, file size is directly tied up with how often you commit. No matter which system you use, this variable will always be there. If you commit frequently, you will be close to realtime, but you will have numerous small files. If you commit after long intervals, you will have larger files, but this is as good as a "batch world". We solved this problem at my company by having 2 systems. One to commit the files at small intervals, thus bringing data into durable storage reliably, and one to roll up these small files. It's actually really simple to implement this if you don't try to do it in a single job.

Best
Ayush

On Fri, Sep 11, 2020 at 2:22 PM Robert Metzger <rm...@apache.org>> wrote:
Hi Marek,

what you are describing is a known problem in Flink. There are some thoughts on how to address this in https://issues.apache.org/jira/browse/FLINK-11499<https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FFLINK-11499&data=02%7C01%7Csenthilku%40vmware.com%7C63bdd93d8fb9422d323d08d858a90584%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C0%7C637356830252901961&sdata=h5MMTvI5KMVFTCQdR1YMQjFgtA4Zv06k%2B24iyNsamEo%3D&reserved=0> and https://issues.apache.org/jira/browse/FLINK-17505<https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FFLINK-17505&data=02%7C01%7Csenthilku%40vmware.com%7C63bdd93d8fb9422d323d08d858a90584%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C0%7C637356830252911955&sdata=aRpstPxp7u3GSKAzzOpp3VWbcbTLt6lKqWAfoVBVq0c%3D&reserved=0>
Maybe some ideas there help you already for your current problem (use long checkpoint intervals).

A related idea to (2) is to write your data with the Avro format, and then regularly use a batch job to transform your data from Avro to Parquet.

I hope these are some helpful pointers. I don't have a good overview over other potential open source solutions.

Best,
Robert


On Thu, Sep 10, 2020 at 5:10 PM Marek Maj <ma...@gmail.com>> wrote:
Hello Flink Community,

When designing our data pipelines, we very often encounter the requirement to stream traffic (usually from kafka) to external distributed file system (usually HDFS or S3). This data is typically meant to be queried from hive/presto or similar tools. Preferably data sits in columnar format like parquet.

Currently, using flink, it is possible to leverage StreamingFileSink to achieve what we want to some extent. It satisfies our requirements to partition data by event time, ensure exactly-once semantics and fault-tolerance with checkpointing. Unfortunately, when using bulk writer like PaquetWriter, that comes with a price of producing a big number of files which degrades the performance of queries.

I believe that many companies struggle with similar use cases. I know that some of them have already approached that problem. Solutions like Alibaba Hologres or Netflix solution with Iceberg described during FF 2019 emerged. Given that full transition to real-time data warehouse may take a significant amount of time and effort, I would like to primarily focus on solutions for tools like hive/presto backed up by a distributed file system. Usually those are the systems that we are integrating with.

So what options do we have? Maybe I missed some existing open source tool?

Currently, I can come up with two approaches using flink exclusively:
1. Cache incoming traffic in flink state until trigger fires according to rolling strategy, probably with some late events special strategy and then output data with StreamingFileSink. Solution is not perfect as it may introduce additional latency and queries will still be less performant compared to fully compacted files (late events problem). And the biggest issue I am afraid of is actually a performance drop while releasing data from flink state and its peak character
2. Focus on implementing batch rewrite job that will compact data offline. Source for the job could be both kafka or small files produced by another job that uses plain StreamingFileSink. The drawback is that whole system gets more complex, additional maintenance is needed and, maybe what is more troubling, we enter to batch world again (how could we know that no more late data will come and we can safely run the job)

I would really love to hear what are community thoughts on that.

Kind regards
Marek


--

Arvid Heise | Senior Java Developer

[Image removed by sender.]<https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fwww.ververica.com%2F&data=02%7C01%7Csenthilku%40vmware.com%7C63bdd93d8fb9422d323d08d858a90584%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C0%7C637356830252911955&sdata=3xHA4vOY9n3fSCGvWUvndXHJ%2FsdfuJ12P0b3fb1JpE8%3D&reserved=0>


Follow us @VervericaData

--

Join Flink Forward<https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fflink-forward.org%2F&data=02%7C01%7Csenthilku%40vmware.com%7C63bdd93d8fb9422d323d08d858a90584%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C0%7C637356830252911955&sdata=8Y5BxUg%2B9GFyCznkdv3zGJuofHc7aTe98vMxL2zax0A%3D&reserved=0> - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng

Re: Streaming data to parquet

Posted by Jan Lukavský <je...@seznam.cz>.
Hi,

I'd like to mention another approach, which might not be as "flinkish", 
but removes the source of issues which arise when writing bulk files. 
The actual cause of issues here is that when creating bulk output, the 
most efficient option is to have _reversed flow of commit_. That is to 
say - on contrary of Flink's checkpoint barrier flowing from sources to 
sinks - the optimal performance in bulk case is to let the sink commit 
source once it finishes the bulk write (with whatever period). This is 
currently impossible to achieve with Flink, but what works for me the 
best is to use Flink sinks to write streaming commit log (e.g. Kafka) 
and then have independent processes (Kafka consumers or equivalent) to 
read output topics, pack them and push to bulk store, once the write is 
finished, the Kafka topic is committed. It requires deployment of 
additional application, but that is low overhead in deployments like k8s.

Moreover, this solves the dilemma between quick commits (for real-time 
data) and large files, because one can read data from both streaming 
(real real-time) source and do a union with batch data stored at bulk 
store. Both these techniques are implemented in [1] (disclaimer: I'm one 
of the core developers of that platform).

Jan

[1] https://github.com/O2-Czech-Republic/proxima-platform

On 9/14/20 2:03 PM, Arvid Heise wrote:
> Hi Kumar,
>
> for late events, I have seen two approaches:
>
> * Initial compaction every day, repeated compaction after two days, 
> and after 1 week.
> * Using something like delta lake [1], which is a set of specially 
> structured parquet files. Usually you also compact them after some 
> time (e.g. 1 week in your case), but you can query them efficiently in 
> the meantime.
>
> However, I'm not aware of some out-of-the-box delta lake solution for 
> Flink. This might be something that we could put on the community 
> agenda if there is a general interest.
>
> [1] 
> https://slacker.ro/2019/08/21/diving-into-delta-lake-unpacking-the-transaction-log/
>
> On Fri, Sep 11, 2020 at 5:16 PM Senthil Kumar <senthilku@vmware.com 
> <ma...@vmware.com>> wrote:
>
>     Hello Ayush,
>
>     I am interesting in knowing about your “really simple” implementation.
>
>     So assuming the streaming parquet output goes to S3 bucket:
>     Initial (partitioned by event time)
>
>     Do you write another Flink batch application (step 2) which
>     partitions the data from Initial in larger “event time” chunks
>
>     and writes it out to another S3 bucket?
>
>     In our case, we are getting straggling records with event times
>     which might be up to 1 week old.
>
>     One approach is to simply write the batch job after 1 week, but
>     then we lose the ability to query the recent data in an efficient
>     fashion.
>
>     I would appreciate any ideas etc.
>
>     Cheers
>
>     Kumar
>
>     *From: *Ayush Verma <ayushverma5@gmail.com
>     <ma...@gmail.com>>
>     *Date: *Friday, September 11, 2020 at 8:14 AM
>     *To: *Robert Metzger <rmetzger@apache.org
>     <ma...@apache.org>>
>     *Cc: *Marek Maj <marekmaj2@gmail.com
>     <ma...@gmail.com>>, user <user@flink.apache.org
>     <ma...@flink.apache.org>>
>     *Subject: *Re: Streaming data to parquet
>
>     Hi,
>
>     Looking at the problem broadly, file size is directly tied up with
>     how often you commit. No matter which system you use, this
>     variable will always be there. If you commit frequently, you will
>     be close to realtime, but you will have numerous small files. If
>     you commit after long intervals, you will have larger files, but
>     this is as good as a "batch world". We solved this problem at my
>     company by having 2 systems. One to commit the files at small
>     intervals, thus bringing data into durable storage reliably, and
>     one to roll up these small files. It's actually really simple to
>     implement this if you don't try to do it in a single job.
>
>     Best
>
>     Ayush
>
>     On Fri, Sep 11, 2020 at 2:22 PM Robert Metzger
>     <rmetzger@apache.org <ma...@apache.org>> wrote:
>
>         Hi Marek,
>
>         what you are describing is a known problem in Flink. There are
>         some thoughts on how to address this in
>         https://issues.apache.org/jira/browse/FLINK-11499
>         <https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FFLINK-11499&data=02%7C01%7Csenthilku%40vmware.com%7C41d91d190b1b451e84b308d8565d0f21%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C1%7C637354304977869315&sdata=u8QY%2FedTNZcUH2%2BYDBAadHKEgN%2BpA2QBxKqywA7xbUA%3D&reserved=0>
>         and https://issues.apache.org/jira/browse/FLINK-17505
>         <https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FFLINK-17505&data=02%7C01%7Csenthilku%40vmware.com%7C41d91d190b1b451e84b308d8565d0f21%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C1%7C637354304977879306&sdata=Jy%2FR4bPXjYx1bM1XMg6QDKzu61vtn291b3MchT6O7N8%3D&reserved=0>
>
>         Maybe some ideas there help you already for your current
>         problem (use long checkpoint intervals).
>
>         A related idea to (2) is to write your data with the Avro
>         format, and then regularly use a batch job to transform your
>         data from Avro to Parquet.
>
>
>         I hope these are some helpful pointers. I don't have a good
>         overview over other potential open source solutions.
>
>         Best,
>
>         Robert
>
>         On Thu, Sep 10, 2020 at 5:10 PM Marek Maj <marekmaj2@gmail.com
>         <ma...@gmail.com>> wrote:
>
>             Hello Flink Community,
>
>             When designing our data pipelines, we very often encounter
>             the requirement to stream traffic (usually from kafka) to
>             external distributed file system (usually HDFS or S3).
>             This data is typically meant to be queried from
>             hive/presto or similar tools. Preferably data sits in
>             columnar format like parquet.
>
>             Currently, using flink, it is possible to leverage
>             StreamingFileSink to achieve what we want to some extent.
>             It satisfies our requirements to partition data by event
>             time, ensure exactly-once semantics and fault-tolerance
>             with checkpointing. Unfortunately, when using bulk writer
>             like PaquetWriter, that comes with a price of producing a
>             big number of files which degrades the performance of
>             queries.
>
>             I believe that many companies struggle with similar use
>             cases. I know that some of them have already approached
>             that problem. Solutions like Alibaba Hologres or Netflix
>             solution with Iceberg described during FF 2019 emerged.
>             Given that full transition to real-time data warehouse may
>             take a significant amount of time and effort, I would like
>             to primarily focus on solutions for tools like hive/presto
>             backed up by a distributed file system. Usually those are
>             the systems that we are integrating with.
>
>             So what options do we have? Maybe I missed some existing
>             open source tool?
>
>             Currently, I can come up with two approaches using flink
>             exclusively:
>
>             1. Cache incoming traffic in flink state until trigger
>             fires according to rolling strategy, probably with some
>             late events special strategy and then output data with
>             StreamingFileSink. Solution is not perfect as it may
>             introduce additional latency and queries will still be
>             less performant compared to fully compacted files (late
>             events problem). And the biggest issue I am afraid of is
>             actually a performance drop while releasing data from
>             flink state and its peak character
>
>             2. Focus on implementing batch rewrite job that will
>             compact data offline. Source for the job could be both
>             kafka or small files produced by another job that uses
>             plain StreamingFileSink. The drawback is that whole system
>             gets more complex, additional maintenance is needed and,
>             maybe what is more troubling, we enter to batch world
>             again (how could we know that no more late data will come
>             and we can safely run the job)
>
>
>             I would really love to hear what are community thoughts on
>             that.
>
>             Kind regards
>             Marek
>
>
>
> -- 
>
> Arvid Heise| Senior Java Developer
>
> <https://www.ververica.com/>
>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/>- The Apache 
> FlinkConference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
>
> Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 
> BManaging Directors: Timothy Alexander Steinert, Yip Park Tung Jason, 
> Ji (Toni) Cheng

Re: Streaming data to parquet

Posted by Arvid Heise <ar...@ververica.com>.
Hi Kumar,

for late events, I have seen two approaches:

* Initial compaction every day, repeated compaction after two days, and
after 1 week.
* Using something like delta lake [1], which is a set of specially
structured parquet files. Usually you also compact them after some time
(e.g. 1 week in your case), but you can query them efficiently in the
meantime.

However, I'm not aware of some out-of-the-box delta lake solution for
Flink. This might be something that we could put on the community agenda if
there is a general interest.

[1]
https://slacker.ro/2019/08/21/diving-into-delta-lake-unpacking-the-transaction-log/

On Fri, Sep 11, 2020 at 5:16 PM Senthil Kumar <se...@vmware.com> wrote:

> Hello Ayush,
>
>
>
> I am interesting in knowing about your “really simple” implementation.
>
>
>
> So assuming the streaming parquet output goes to S3 bucket: Initial
> (partitioned by event time)
>
>
>
> Do you write another Flink batch application (step 2) which partitions the
> data from Initial in larger “event time” chunks
>
> and writes it out to another S3 bucket?
>
>
>
> In our case, we are getting straggling records with event times which
> might be up to 1 week old.
>
> One approach is to simply write the batch job after 1 week, but then we
> lose the ability to query the recent data in an efficient fashion.
>
>
>
> I would appreciate any ideas etc.
>
>
>
> Cheers
>
> Kumar
>
>
>
> *From: *Ayush Verma <ay...@gmail.com>
> *Date: *Friday, September 11, 2020 at 8:14 AM
> *To: *Robert Metzger <rm...@apache.org>
> *Cc: *Marek Maj <ma...@gmail.com>, user <us...@flink.apache.org>
> *Subject: *Re: Streaming data to parquet
>
>
>
> Hi,
>
>
>
> Looking at the problem broadly, file size is directly tied up with how
> often you commit. No matter which system you use, this variable will always
> be there. If you commit frequently, you will be close to realtime, but you
> will have numerous small files. If you commit after long intervals, you
> will have larger files, but this is as good as a "batch world". We solved
> this problem at my company by having 2 systems. One to commit the files at
> small intervals, thus bringing data into durable storage reliably, and one
> to roll up these small files. It's actually really simple to implement this
> if you don't try to do it in a single job.
>
>
>
> Best
>
> Ayush
>
>
>
> On Fri, Sep 11, 2020 at 2:22 PM Robert Metzger <rm...@apache.org>
> wrote:
>
> Hi Marek,
>
>
>
> what you are describing is a known problem in Flink. There are some
> thoughts on how to address this in
> https://issues.apache.org/jira/browse/FLINK-11499
> <https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FFLINK-11499&data=02%7C01%7Csenthilku%40vmware.com%7C41d91d190b1b451e84b308d8565d0f21%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C1%7C637354304977869315&sdata=u8QY%2FedTNZcUH2%2BYDBAadHKEgN%2BpA2QBxKqywA7xbUA%3D&reserved=0>
> and https://issues.apache.org/jira/browse/FLINK-17505
> <https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FFLINK-17505&data=02%7C01%7Csenthilku%40vmware.com%7C41d91d190b1b451e84b308d8565d0f21%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C1%7C637354304977879306&sdata=Jy%2FR4bPXjYx1bM1XMg6QDKzu61vtn291b3MchT6O7N8%3D&reserved=0>
>
> Maybe some ideas there help you already for your current problem (use long
> checkpoint intervals).
>
>
>
> A related idea to (2) is to write your data with the Avro format, and then
> regularly use a batch job to transform your data from Avro to Parquet.
>
>
> I hope these are some helpful pointers. I don't have a good overview over
> other potential open source solutions.
>
>
>
> Best,
>
> Robert
>
>
>
>
>
> On Thu, Sep 10, 2020 at 5:10 PM Marek Maj <ma...@gmail.com> wrote:
>
> Hello Flink Community,
>
>
>
> When designing our data pipelines, we very often encounter the requirement
> to stream traffic (usually from kafka) to external distributed file system
> (usually HDFS or S3). This data is typically meant to be queried from
> hive/presto or similar tools. Preferably data sits in columnar format like
> parquet.
>
> Currently, using flink, it is possible to leverage StreamingFileSink to
> achieve what we want to some extent. It satisfies our requirements to
> partition data by event time, ensure exactly-once semantics and
> fault-tolerance with checkpointing. Unfortunately, when using bulk writer
> like PaquetWriter, that comes with a price of producing a big number of
> files which degrades the performance of queries.
>
> I believe that many companies struggle with similar use cases. I know that
> some of them have already approached that problem. Solutions like Alibaba
> Hologres or Netflix solution with Iceberg described during FF 2019 emerged.
> Given that full transition to real-time data warehouse may take a
> significant amount of time and effort, I would like to primarily focus on
> solutions for tools like hive/presto backed up by a distributed file
> system. Usually those are the systems that we are integrating with.
>
> So what options do we have? Maybe I missed some existing open source tool?
>
> Currently, I can come up with two approaches using flink exclusively:
>
> 1. Cache incoming traffic in flink state until trigger fires according to
> rolling strategy, probably with some late events special strategy and then
> output data with StreamingFileSink. Solution is not perfect as it may
> introduce additional latency and queries will still be less performant
> compared to fully compacted files (late events problem). And the biggest
> issue I am afraid of is actually a performance drop while releasing data
> from flink state and its peak character
>
> 2. Focus on implementing batch rewrite job that will compact data offline.
> Source for the job could be both kafka or small files produced by another
> job that uses plain StreamingFileSink. The drawback is that whole system
> gets more complex, additional maintenance is needed and, maybe what is more
> troubling, we enter to batch world again (how could we know that no more
> late data will come and we can safely run the job)
>
>
> I would really love to hear what are community thoughts on that.
>
> Kind regards
> Marek
>
>

-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Re: Streaming data to parquet

Posted by Senthil Kumar <se...@vmware.com>.
Hello Ayush,

I am interesting in knowing about your “really simple” implementation.

So assuming the streaming parquet output goes to S3 bucket: Initial (partitioned by event time)

Do you write another Flink batch application (step 2) which partitions the data from Initial in larger “event time” chunks
and writes it out to another S3 bucket?

In our case, we are getting straggling records with event times which might be up to 1 week old.
One approach is to simply write the batch job after 1 week, but then we lose the ability to query the recent data in an efficient fashion.

I would appreciate any ideas etc.

Cheers
Kumar

From: Ayush Verma <ay...@gmail.com>
Date: Friday, September 11, 2020 at 8:14 AM
To: Robert Metzger <rm...@apache.org>
Cc: Marek Maj <ma...@gmail.com>, user <us...@flink.apache.org>
Subject: Re: Streaming data to parquet

Hi,

Looking at the problem broadly, file size is directly tied up with how often you commit. No matter which system you use, this variable will always be there. If you commit frequently, you will be close to realtime, but you will have numerous small files. If you commit after long intervals, you will have larger files, but this is as good as a "batch world". We solved this problem at my company by having 2 systems. One to commit the files at small intervals, thus bringing data into durable storage reliably, and one to roll up these small files. It's actually really simple to implement this if you don't try to do it in a single job.

Best
Ayush

On Fri, Sep 11, 2020 at 2:22 PM Robert Metzger <rm...@apache.org>> wrote:
Hi Marek,

what you are describing is a known problem in Flink. There are some thoughts on how to address this in https://issues.apache.org/jira/browse/FLINK-11499<https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FFLINK-11499&data=02%7C01%7Csenthilku%40vmware.com%7C41d91d190b1b451e84b308d8565d0f21%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C1%7C637354304977869315&sdata=u8QY%2FedTNZcUH2%2BYDBAadHKEgN%2BpA2QBxKqywA7xbUA%3D&reserved=0> and https://issues.apache.org/jira/browse/FLINK-17505<https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FFLINK-17505&data=02%7C01%7Csenthilku%40vmware.com%7C41d91d190b1b451e84b308d8565d0f21%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C1%7C637354304977879306&sdata=Jy%2FR4bPXjYx1bM1XMg6QDKzu61vtn291b3MchT6O7N8%3D&reserved=0>
Maybe some ideas there help you already for your current problem (use long checkpoint intervals).

A related idea to (2) is to write your data with the Avro format, and then regularly use a batch job to transform your data from Avro to Parquet.

I hope these are some helpful pointers. I don't have a good overview over other potential open source solutions.

Best,
Robert


On Thu, Sep 10, 2020 at 5:10 PM Marek Maj <ma...@gmail.com>> wrote:
Hello Flink Community,

When designing our data pipelines, we very often encounter the requirement to stream traffic (usually from kafka) to external distributed file system (usually HDFS or S3). This data is typically meant to be queried from hive/presto or similar tools. Preferably data sits in columnar format like parquet.

Currently, using flink, it is possible to leverage StreamingFileSink to achieve what we want to some extent. It satisfies our requirements to partition data by event time, ensure exactly-once semantics and fault-tolerance with checkpointing. Unfortunately, when using bulk writer like PaquetWriter, that comes with a price of producing a big number of files which degrades the performance of queries.

I believe that many companies struggle with similar use cases. I know that some of them have already approached that problem. Solutions like Alibaba Hologres or Netflix solution with Iceberg described during FF 2019 emerged. Given that full transition to real-time data warehouse may take a significant amount of time and effort, I would like to primarily focus on solutions for tools like hive/presto backed up by a distributed file system. Usually those are the systems that we are integrating with.

So what options do we have? Maybe I missed some existing open source tool?

Currently, I can come up with two approaches using flink exclusively:
1. Cache incoming traffic in flink state until trigger fires according to rolling strategy, probably with some late events special strategy and then output data with StreamingFileSink. Solution is not perfect as it may introduce additional latency and queries will still be less performant compared to fully compacted files (late events problem). And the biggest issue I am afraid of is actually a performance drop while releasing data from flink state and its peak character
2. Focus on implementing batch rewrite job that will compact data offline. Source for the job could be both kafka or small files produced by another job that uses plain StreamingFileSink. The drawback is that whole system gets more complex, additional maintenance is needed and, maybe what is more troubling, we enter to batch world again (how could we know that no more late data will come and we can safely run the job)

I would really love to hear what are community thoughts on that.

Kind regards
Marek

Re: Streaming data to parquet

Posted by Ayush Verma <ay...@gmail.com>.
Hi,

Looking at the problem broadly, file size is directly tied up with how
often you commit. No matter which system you use, this variable will always
be there. If you commit frequently, you will be close to realtime, but you
will have numerous small files. If you commit after long intervals, you
will have larger files, but this is as good as a "batch world". We solved
this problem at my company by having 2 systems. One to commit the files at
small intervals, thus bringing data into durable storage reliably, and one
to roll up these small files. It's actually really simple to implement this
if you don't try to do it in a single job.

Best
Ayush

On Fri, Sep 11, 2020 at 2:22 PM Robert Metzger <rm...@apache.org> wrote:

> Hi Marek,
>
> what you are describing is a known problem in Flink. There are some
> thoughts on how to address this in
> https://issues.apache.org/jira/browse/FLINK-11499 and
> https://issues.apache.org/jira/browse/FLINK-17505
> Maybe some ideas there help you already for your current problem (use long
> checkpoint intervals).
>
> A related idea to (2) is to write your data with the Avro format, and then
> regularly use a batch job to transform your data from Avro to Parquet.
>
> I hope these are some helpful pointers. I don't have a good overview over
> other potential open source solutions.
>
> Best,
> Robert
>
>
> On Thu, Sep 10, 2020 at 5:10 PM Marek Maj <ma...@gmail.com> wrote:
>
>> Hello Flink Community,
>>
>> When designing our data pipelines, we very often encounter the
>> requirement to stream traffic (usually from kafka) to external distributed
>> file system (usually HDFS or S3). This data is typically meant to be
>> queried from hive/presto or similar tools. Preferably data sits in columnar
>> format like parquet.
>>
>> Currently, using flink, it is possible to leverage StreamingFileSink to
>> achieve what we want to some extent. It satisfies our requirements to
>> partition data by event time, ensure exactly-once semantics and
>> fault-tolerance with checkpointing. Unfortunately, when using bulk writer
>> like PaquetWriter, that comes with a price of producing a big number of
>> files which degrades the performance of queries.
>>
>> I believe that many companies struggle with similar use cases. I know
>> that some of them have already approached that problem. Solutions like
>> Alibaba Hologres or Netflix solution with Iceberg described during FF 2019
>> emerged. Given that full transition to real-time data warehouse may take a
>> significant amount of time and effort, I would like to primarily focus on
>> solutions for tools like hive/presto backed up by a distributed file
>> system. Usually those are the systems that we are integrating with.
>>
>> So what options do we have? Maybe I missed some existing open source
>> tool?
>>
>> Currently, I can come up with two approaches using flink exclusively:
>> 1. Cache incoming traffic in flink state until trigger fires according to
>> rolling strategy, probably with some late events special strategy and then
>> output data with StreamingFileSink. Solution is not perfect as it may
>> introduce additional latency and queries will still be less performant
>> compared to fully compacted files (late events problem). And the biggest
>> issue I am afraid of is actually a performance drop while releasing data
>> from flink state and its peak character
>> 2. Focus on implementing batch rewrite job that will compact data
>> offline. Source for the job could be both kafka or small files produced by
>> another job that uses plain StreamingFileSink. The drawback is that whole
>> system gets more complex, additional maintenance is needed and, maybe what
>> is more troubling, we enter to batch world again (how could we know that no
>> more late data will come and we can safely run the job)
>>
>> I would really love to hear what are community thoughts on that.
>>
>> Kind regards
>> Marek
>>
>

Re: Streaming data to parquet

Posted by Robert Metzger <rm...@apache.org>.
Hi Marek,

what you are describing is a known problem in Flink. There are some
thoughts on how to address this in
https://issues.apache.org/jira/browse/FLINK-11499 and
https://issues.apache.org/jira/browse/FLINK-17505
Maybe some ideas there help you already for your current problem (use long
checkpoint intervals).

A related idea to (2) is to write your data with the Avro format, and then
regularly use a batch job to transform your data from Avro to Parquet.

I hope these are some helpful pointers. I don't have a good overview over
other potential open source solutions.

Best,
Robert


On Thu, Sep 10, 2020 at 5:10 PM Marek Maj <ma...@gmail.com> wrote:

> Hello Flink Community,
>
> When designing our data pipelines, we very often encounter the requirement
> to stream traffic (usually from kafka) to external distributed file system
> (usually HDFS or S3). This data is typically meant to be queried from
> hive/presto or similar tools. Preferably data sits in columnar format like
> parquet.
>
> Currently, using flink, it is possible to leverage StreamingFileSink to
> achieve what we want to some extent. It satisfies our requirements to
> partition data by event time, ensure exactly-once semantics and
> fault-tolerance with checkpointing. Unfortunately, when using bulk writer
> like PaquetWriter, that comes with a price of producing a big number of
> files which degrades the performance of queries.
>
> I believe that many companies struggle with similar use cases. I know that
> some of them have already approached that problem. Solutions like Alibaba
> Hologres or Netflix solution with Iceberg described during FF 2019 emerged.
> Given that full transition to real-time data warehouse may take a
> significant amount of time and effort, I would like to primarily focus on
> solutions for tools like hive/presto backed up by a distributed file
> system. Usually those are the systems that we are integrating with.
>
> So what options do we have? Maybe I missed some existing open source tool?
>
> Currently, I can come up with two approaches using flink exclusively:
> 1. Cache incoming traffic in flink state until trigger fires according to
> rolling strategy, probably with some late events special strategy and then
> output data with StreamingFileSink. Solution is not perfect as it may
> introduce additional latency and queries will still be less performant
> compared to fully compacted files (late events problem). And the biggest
> issue I am afraid of is actually a performance drop while releasing data
> from flink state and its peak character
> 2. Focus on implementing batch rewrite job that will compact data offline.
> Source for the job could be both kafka or small files produced by another
> job that uses plain StreamingFileSink. The drawback is that whole system
> gets more complex, additional maintenance is needed and, maybe what is more
> troubling, we enter to batch world again (how could we know that no more
> late data will come and we can safely run the job)
>
> I would really love to hear what are community thoughts on that.
>
> Kind regards
> Marek
>