You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Jozef Vilcek <jo...@gmail.com> on 2018/07/19 09:42:58 UTC

Write bulks files from streaming app

Hey,

I am looking for the advice.

I am trying to do a stream processing with Beam on Flink runtime. Reading
data from Kafka, doing some processing with it which is not important here
and in the same time want to store consumed data to history storage for
archive and reprocessing, which is HDFS.

Now, the part of writing batches to HDFS is giving me hard time. Logically,
I want to do:

fileIO = FileIO.writeDynamic()
        .by(destinationFn)
        .via(AvroIO.sink(avroClass))
        .to(path)
        .withNaming(namingFn)
        .withTempDirectory(tmp)
        .withNumShards(shards)

data
   .withFixedWindow(1H, afterWatermarkTrigger, discardFiredPanes)
   .saveTo(fileIO)


This write generates in Flink execution graph 3 operators, which I do not
full understand yet.

Now, the problem is, that I am not able to run this at scale.

If I want to write big enough files to not to have lots of files on HDFS, I
keep running into the OOM. With Flink, I use rocksdb state backend and I
was warned about this JIRA which is probably related to my OOM
https://issues.apache.org/jira/browse/FLINK-8297
Therefore, I need to trigger more often and small batches which leads to
too many files on HDFS.

Question here is, if there is some path I do not see how to make this work
( write bulks of data to HDFS of my choosing without running to memory
troubles ). Also, keeping whole window data which is designated for write
to output to filesystem in state involves more IO.

Thanks for any thoughts and guidelines,
Jozef

Re: Write bulks files from streaming app

Posted by Jozef Vilcek <jo...@gmail.com>.
Just to share my findings to others ...

I noticed that elements forwarded from WriteFileResults.
getPerDestinationOutputFilenames() have always pane index 0 and correct
timing of written window (such as EARLY, LATE).
I am not sure what is going on but following Window.into() were not working.

Instead solution for me is to use custom function with state and timers.
Similar like  GroupIntoBatches.ofSize() is doing.

On Sun, Jul 22, 2018 at 6:42 PM Jozef Vilcek <jo...@gmail.com> wrote:

> Here is a pseudocode (sorry) of what I am doing right now:
>
> PCollection<KV<String, String>> writtenFiles = dataStream
>    .withFixedWindow(
>          duration = 1H,
>          trigger = AfterWatermark.pastEndOfWindow()
>                          .withLateFirings(AfterFirst.of(
>
> AfterPane.elementCountAtLeast(lateCount),
>
> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(5 minutes)))
>
>  .withEarlyFirings(AfterPane.elementCountAtLeast(maxBulkSize)),
>         discardFiredPanes,
>         lateness = 1 day)
>    .saveWithDynamicDestinationsTo(fileIO)
>    .getPerDestinationOutputFilenames()
>
> writtenFiles
>     .groupBy(x -> x.getKey())
>     .withFixedWindow(
>          duration = 1H,
>          trigger = AfterWatermark.pastEndOfWindow()
>
>  .withLateFirings(AfterPane.elementCountAtLeast(1)),
>         discardFiredPanes,
>         lateness = 1 day)
>    .map(x -> println(x); x)
>
> but in second window over written files I do observe EARLY fires with
> group-by value iterator having always one file (propagated from write files
> result). ON_TIME fires are always empty.
>
> What am I missing here? How not to get early fires one by one but get all
> written files at the ON_TIME window fire?
>
>
> On Sun, Jul 22, 2018 at 4:27 PM Stephan Ewen <se...@apache.org> wrote:
>
>> For what it's worth, in Flink directly we found that this pattern is
>> generally not a well working one: windowing data in large windows in order
>> to perform large bulk writes.
>>
>> Instead, the sinks (to file systems) continuously write (possibly across
>> different destination files) files, ensure persistence at checkpoints, can
>> roll back the output in a file system specific ways to the previous
>> checkpoint. That way, there is no data buffering in state (memory, rocksdb,
>> etc) at all, only metadata tracking.
>>
>> For bulk encoders (like parquet), one needs an additional step, to
>> encode/compress when the specific destination file is done (if you think in
>> Hadoop terms, that would be in the "commit" step).
>>
>>
>> On Sun, Jul 22, 2018 at 2:10 PM, Jozef Vilcek <jo...@gmail.com>
>> wrote:
>>
>>> I looked into Wait.on() but doc say it waits untill window is completely
>>> done, so it is not quite fir for my case, as my lateness can be a day or
>>> two and I would like to compact and publish hourly data sooner.
>>>
>>> What i am thinking of is write triggers under different location than
>>> target. I will have lot's of EARLY fires for main data and than some LATE
>>> fires. What I would like to do is observe all EARLY fires for window
>>> (ideally in ON_TIME event time) in one group and move those files to target
>>> dir by merging them. Observed LATE fires would be just moved immediately
>>> because that is not much and does not hurt to keep them fragmented now.
>>>
>>> The question is if it make sense and can be done with Beam? FileIO
>>> returns WriteFilesResult where I can call
>>> `getPerDestinationOutputFilenames()` which returns me a collection of KV
>>> with key being destination and value being a file which was written. I
>>> tried to window it again with different triggers (no early trigger) and
>>> groupBy key, but so far, no luck as it never yield a collection of files in
>>> which were emitted as EARLY in first window.
>>>
>>>
>>> On Fri, Jul 20, 2018 at 9:06 PM Raghu Angadi <ra...@google.com> wrote:
>>>
>>>> On Fri, Jul 20, 2018 at 2:58 AM Jozef Vilcek <jo...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hm, that is interesting idea to make the write composite and merge
>>>>> files later. Do not know Beam well yet.
>>>>> I will look into it and learn about Wait.on() transform (wonder how it
>>>>> will work with late fires). Thanks!
>>>>>
>>>>> But keeps me thinking...
>>>>> Does it make sense to have support from SDK?
>>>>> Is my use case that uncommon? Not fit for Beam? How does others out
>>>>> there does similar thing?
>>>>>
>>>>
>>>> SDK does allow it. Looks like you are running into scaling and memory
>>>> limits with amount of state stored in large windows. This is something that
>>>> will improve. I am not familiar enough with Flink runner to  comment on
>>>> specifics. I was mainly thinking of a work around.
>>>>
>>>> Raghu.
>>>>
>>>>
>>>>>
>>>>> On Thu, Jul 19, 2018 at 11:21 PM Raghu Angadi <ra...@google.com>
>>>>> wrote:
>>>>>
>>>>>> One option (but requires more code): Write to smaller files with
>>>>>> frequent triggers to directory_X and once the window properly closes, copy
>>>>>> all the files to a single file in your own DoFn. This is certainly more
>>>>>> code on your part, but might be worth it. You can use Wait.on() transoform
>>>>>> to run your finalizer DoFn right after the window that writes smaller files
>>>>>> closes.
>>>>>>
>>>>>>
>>>>>> On Thu, Jul 19, 2018 at 2:43 AM Jozef Vilcek <jo...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hey,
>>>>>>>
>>>>>>> I am looking for the advice.
>>>>>>>
>>>>>>> I am trying to do a stream processing with Beam on Flink runtime.
>>>>>>> Reading data from Kafka, doing some processing with it which is not
>>>>>>> important here and in the same time want to store consumed data to history
>>>>>>> storage for archive and reprocessing, which is HDFS.
>>>>>>>
>>>>>>> Now, the part of writing batches to HDFS is giving me hard time.
>>>>>>> Logically, I want to do:
>>>>>>>
>>>>>>> fileIO = FileIO.writeDynamic()
>>>>>>>         .by(destinationFn)
>>>>>>>         .via(AvroIO.sink(avroClass))
>>>>>>>         .to(path)
>>>>>>>         .withNaming(namingFn)
>>>>>>>         .withTempDirectory(tmp)
>>>>>>>         .withNumShards(shards)
>>>>>>>
>>>>>>> data
>>>>>>>    .withFixedWindow(1H, afterWatermarkTrigger, discardFiredPanes)
>>>>>>>    .saveTo(fileIO)
>>>>>>>
>>>>>>>
>>>>>>> This write generates in Flink execution graph 3 operators, which I
>>>>>>> do not full understand yet.
>>>>>>>
>>>>>>> Now, the problem is, that I am not able to run this at scale.
>>>>>>>
>>>>>>> If I want to write big enough files to not to have lots of files on
>>>>>>> HDFS, I keep running into the OOM. With Flink, I use rocksdb state backend
>>>>>>> and I was warned about this JIRA which is probably related to my OOM
>>>>>>> https://issues.apache.org/jira/browse/FLINK-8297
>>>>>>> Therefore, I need to trigger more often and small batches which
>>>>>>> leads to too many files on HDFS.
>>>>>>>
>>>>>>> Question here is, if there is some path I do not see how to make
>>>>>>> this work ( write bulks of data to HDFS of my choosing without running to
>>>>>>> memory troubles ). Also, keeping whole window data which is designated for
>>>>>>> write to output to filesystem in state involves more IO.
>>>>>>>
>>>>>>> Thanks for any thoughts and guidelines,
>>>>>>> Jozef
>>>>>>>
>>>>>>>
>>

Re: Write bulks files from streaming app

Posted by Jozef Vilcek <jo...@gmail.com>.
Here is a pseudocode (sorry) of what I am doing right now:

PCollection<KV<String, String>> writtenFiles = dataStream
   .withFixedWindow(
         duration = 1H,
         trigger = AfterWatermark.pastEndOfWindow()
                         .withLateFirings(AfterFirst.of(

AfterPane.elementCountAtLeast(lateCount),

AfterProcessingTime.pastFirstElementInPane().plusDelayOf(5 minutes)))

 .withEarlyFirings(AfterPane.elementCountAtLeast(maxBulkSize)),
        discardFiredPanes,
        lateness = 1 day)
   .saveWithDynamicDestinationsTo(fileIO)
   .getPerDestinationOutputFilenames()

writtenFiles
    .groupBy(x -> x.getKey())
    .withFixedWindow(
         duration = 1H,
         trigger = AfterWatermark.pastEndOfWindow()

 .withLateFirings(AfterPane.elementCountAtLeast(1)),
        discardFiredPanes,
        lateness = 1 day)
   .map(x -> println(x); x)

but in second window over written files I do observe EARLY fires with
group-by value iterator having always one file (propagated from write files
result). ON_TIME fires are always empty.

What am I missing here? How not to get early fires one by one but get all
written files at the ON_TIME window fire?


On Sun, Jul 22, 2018 at 4:27 PM Stephan Ewen <se...@apache.org> wrote:

> For what it's worth, in Flink directly we found that this pattern is
> generally not a well working one: windowing data in large windows in order
> to perform large bulk writes.
>
> Instead, the sinks (to file systems) continuously write (possibly across
> different destination files) files, ensure persistence at checkpoints, can
> roll back the output in a file system specific ways to the previous
> checkpoint. That way, there is no data buffering in state (memory, rocksdb,
> etc) at all, only metadata tracking.
>
> For bulk encoders (like parquet), one needs an additional step, to
> encode/compress when the specific destination file is done (if you think in
> Hadoop terms, that would be in the "commit" step).
>
>
> On Sun, Jul 22, 2018 at 2:10 PM, Jozef Vilcek <jo...@gmail.com>
> wrote:
>
>> I looked into Wait.on() but doc say it waits untill window is completely
>> done, so it is not quite fir for my case, as my lateness can be a day or
>> two and I would like to compact and publish hourly data sooner.
>>
>> What i am thinking of is write triggers under different location than
>> target. I will have lot's of EARLY fires for main data and than some LATE
>> fires. What I would like to do is observe all EARLY fires for window
>> (ideally in ON_TIME event time) in one group and move those files to target
>> dir by merging them. Observed LATE fires would be just moved immediately
>> because that is not much and does not hurt to keep them fragmented now.
>>
>> The question is if it make sense and can be done with Beam? FileIO
>> returns WriteFilesResult where I can call
>> `getPerDestinationOutputFilenames()` which returns me a collection of KV
>> with key being destination and value being a file which was written. I
>> tried to window it again with different triggers (no early trigger) and
>> groupBy key, but so far, no luck as it never yield a collection of files in
>> which were emitted as EARLY in first window.
>>
>>
>> On Fri, Jul 20, 2018 at 9:06 PM Raghu Angadi <ra...@google.com> wrote:
>>
>>> On Fri, Jul 20, 2018 at 2:58 AM Jozef Vilcek <jo...@gmail.com>
>>> wrote:
>>>
>>>> Hm, that is interesting idea to make the write composite and merge
>>>> files later. Do not know Beam well yet.
>>>> I will look into it and learn about Wait.on() transform (wonder how it
>>>> will work with late fires). Thanks!
>>>>
>>>> But keeps me thinking...
>>>> Does it make sense to have support from SDK?
>>>> Is my use case that uncommon? Not fit for Beam? How does others out
>>>> there does similar thing?
>>>>
>>>
>>> SDK does allow it. Looks like you are running into scaling and memory
>>> limits with amount of state stored in large windows. This is something that
>>> will improve. I am not familiar enough with Flink runner to  comment on
>>> specifics. I was mainly thinking of a work around.
>>>
>>> Raghu.
>>>
>>>
>>>>
>>>> On Thu, Jul 19, 2018 at 11:21 PM Raghu Angadi <ra...@google.com>
>>>> wrote:
>>>>
>>>>> One option (but requires more code): Write to smaller files with
>>>>> frequent triggers to directory_X and once the window properly closes, copy
>>>>> all the files to a single file in your own DoFn. This is certainly more
>>>>> code on your part, but might be worth it. You can use Wait.on() transoform
>>>>> to run your finalizer DoFn right after the window that writes smaller files
>>>>> closes.
>>>>>
>>>>>
>>>>> On Thu, Jul 19, 2018 at 2:43 AM Jozef Vilcek <jo...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hey,
>>>>>>
>>>>>> I am looking for the advice.
>>>>>>
>>>>>> I am trying to do a stream processing with Beam on Flink runtime.
>>>>>> Reading data from Kafka, doing some processing with it which is not
>>>>>> important here and in the same time want to store consumed data to history
>>>>>> storage for archive and reprocessing, which is HDFS.
>>>>>>
>>>>>> Now, the part of writing batches to HDFS is giving me hard time.
>>>>>> Logically, I want to do:
>>>>>>
>>>>>> fileIO = FileIO.writeDynamic()
>>>>>>         .by(destinationFn)
>>>>>>         .via(AvroIO.sink(avroClass))
>>>>>>         .to(path)
>>>>>>         .withNaming(namingFn)
>>>>>>         .withTempDirectory(tmp)
>>>>>>         .withNumShards(shards)
>>>>>>
>>>>>> data
>>>>>>    .withFixedWindow(1H, afterWatermarkTrigger, discardFiredPanes)
>>>>>>    .saveTo(fileIO)
>>>>>>
>>>>>>
>>>>>> This write generates in Flink execution graph 3 operators, which I do
>>>>>> not full understand yet.
>>>>>>
>>>>>> Now, the problem is, that I am not able to run this at scale.
>>>>>>
>>>>>> If I want to write big enough files to not to have lots of files on
>>>>>> HDFS, I keep running into the OOM. With Flink, I use rocksdb state backend
>>>>>> and I was warned about this JIRA which is probably related to my OOM
>>>>>> https://issues.apache.org/jira/browse/FLINK-8297
>>>>>> Therefore, I need to trigger more often and small batches which leads
>>>>>> to too many files on HDFS.
>>>>>>
>>>>>> Question here is, if there is some path I do not see how to make this
>>>>>> work ( write bulks of data to HDFS of my choosing without running to memory
>>>>>> troubles ). Also, keeping whole window data which is designated for write
>>>>>> to output to filesystem in state involves more IO.
>>>>>>
>>>>>> Thanks for any thoughts and guidelines,
>>>>>> Jozef
>>>>>>
>>>>>>
>

Re: Write bulks files from streaming app

Posted by Jozef Vilcek <jo...@gmail.com>.
Here is a pseudocode (sorry) of what I am doing right now:

PCollection<KV<String, String>> writtenFiles = dataStream
   .withFixedWindow(
         duration = 1H,
         trigger = AfterWatermark.pastEndOfWindow()
                         .withLateFirings(AfterFirst.of(

AfterPane.elementCountAtLeast(lateCount),

AfterProcessingTime.pastFirstElementInPane().plusDelayOf(5 minutes)))

 .withEarlyFirings(AfterPane.elementCountAtLeast(maxBulkSize)),
        discardFiredPanes,
        lateness = 1 day)
   .saveWithDynamicDestinationsTo(fileIO)
   .getPerDestinationOutputFilenames()

writtenFiles
    .groupBy(x -> x.getKey())
    .withFixedWindow(
         duration = 1H,
         trigger = AfterWatermark.pastEndOfWindow()

 .withLateFirings(AfterPane.elementCountAtLeast(1)),
        discardFiredPanes,
        lateness = 1 day)
   .map(x -> println(x); x)

but in second window over written files I do observe EARLY fires with
group-by value iterator having always one file (propagated from write files
result). ON_TIME fires are always empty.

What am I missing here? How not to get early fires one by one but get all
written files at the ON_TIME window fire?


On Sun, Jul 22, 2018 at 4:27 PM Stephan Ewen <se...@apache.org> wrote:

> For what it's worth, in Flink directly we found that this pattern is
> generally not a well working one: windowing data in large windows in order
> to perform large bulk writes.
>
> Instead, the sinks (to file systems) continuously write (possibly across
> different destination files) files, ensure persistence at checkpoints, can
> roll back the output in a file system specific ways to the previous
> checkpoint. That way, there is no data buffering in state (memory, rocksdb,
> etc) at all, only metadata tracking.
>
> For bulk encoders (like parquet), one needs an additional step, to
> encode/compress when the specific destination file is done (if you think in
> Hadoop terms, that would be in the "commit" step).
>
>
> On Sun, Jul 22, 2018 at 2:10 PM, Jozef Vilcek <jo...@gmail.com>
> wrote:
>
>> I looked into Wait.on() but doc say it waits untill window is completely
>> done, so it is not quite fir for my case, as my lateness can be a day or
>> two and I would like to compact and publish hourly data sooner.
>>
>> What i am thinking of is write triggers under different location than
>> target. I will have lot's of EARLY fires for main data and than some LATE
>> fires. What I would like to do is observe all EARLY fires for window
>> (ideally in ON_TIME event time) in one group and move those files to target
>> dir by merging them. Observed LATE fires would be just moved immediately
>> because that is not much and does not hurt to keep them fragmented now.
>>
>> The question is if it make sense and can be done with Beam? FileIO
>> returns WriteFilesResult where I can call
>> `getPerDestinationOutputFilenames()` which returns me a collection of KV
>> with key being destination and value being a file which was written. I
>> tried to window it again with different triggers (no early trigger) and
>> groupBy key, but so far, no luck as it never yield a collection of files in
>> which were emitted as EARLY in first window.
>>
>>
>> On Fri, Jul 20, 2018 at 9:06 PM Raghu Angadi <ra...@google.com> wrote:
>>
>>> On Fri, Jul 20, 2018 at 2:58 AM Jozef Vilcek <jo...@gmail.com>
>>> wrote:
>>>
>>>> Hm, that is interesting idea to make the write composite and merge
>>>> files later. Do not know Beam well yet.
>>>> I will look into it and learn about Wait.on() transform (wonder how it
>>>> will work with late fires). Thanks!
>>>>
>>>> But keeps me thinking...
>>>> Does it make sense to have support from SDK?
>>>> Is my use case that uncommon? Not fit for Beam? How does others out
>>>> there does similar thing?
>>>>
>>>
>>> SDK does allow it. Looks like you are running into scaling and memory
>>> limits with amount of state stored in large windows. This is something that
>>> will improve. I am not familiar enough with Flink runner to  comment on
>>> specifics. I was mainly thinking of a work around.
>>>
>>> Raghu.
>>>
>>>
>>>>
>>>> On Thu, Jul 19, 2018 at 11:21 PM Raghu Angadi <ra...@google.com>
>>>> wrote:
>>>>
>>>>> One option (but requires more code): Write to smaller files with
>>>>> frequent triggers to directory_X and once the window properly closes, copy
>>>>> all the files to a single file in your own DoFn. This is certainly more
>>>>> code on your part, but might be worth it. You can use Wait.on() transoform
>>>>> to run your finalizer DoFn right after the window that writes smaller files
>>>>> closes.
>>>>>
>>>>>
>>>>> On Thu, Jul 19, 2018 at 2:43 AM Jozef Vilcek <jo...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hey,
>>>>>>
>>>>>> I am looking for the advice.
>>>>>>
>>>>>> I am trying to do a stream processing with Beam on Flink runtime.
>>>>>> Reading data from Kafka, doing some processing with it which is not
>>>>>> important here and in the same time want to store consumed data to history
>>>>>> storage for archive and reprocessing, which is HDFS.
>>>>>>
>>>>>> Now, the part of writing batches to HDFS is giving me hard time.
>>>>>> Logically, I want to do:
>>>>>>
>>>>>> fileIO = FileIO.writeDynamic()
>>>>>>         .by(destinationFn)
>>>>>>         .via(AvroIO.sink(avroClass))
>>>>>>         .to(path)
>>>>>>         .withNaming(namingFn)
>>>>>>         .withTempDirectory(tmp)
>>>>>>         .withNumShards(shards)
>>>>>>
>>>>>> data
>>>>>>    .withFixedWindow(1H, afterWatermarkTrigger, discardFiredPanes)
>>>>>>    .saveTo(fileIO)
>>>>>>
>>>>>>
>>>>>> This write generates in Flink execution graph 3 operators, which I do
>>>>>> not full understand yet.
>>>>>>
>>>>>> Now, the problem is, that I am not able to run this at scale.
>>>>>>
>>>>>> If I want to write big enough files to not to have lots of files on
>>>>>> HDFS, I keep running into the OOM. With Flink, I use rocksdb state backend
>>>>>> and I was warned about this JIRA which is probably related to my OOM
>>>>>> https://issues.apache.org/jira/browse/FLINK-8297
>>>>>> Therefore, I need to trigger more often and small batches which leads
>>>>>> to too many files on HDFS.
>>>>>>
>>>>>> Question here is, if there is some path I do not see how to make this
>>>>>> work ( write bulks of data to HDFS of my choosing without running to memory
>>>>>> troubles ). Also, keeping whole window data which is designated for write
>>>>>> to output to filesystem in state involves more IO.
>>>>>>
>>>>>> Thanks for any thoughts and guidelines,
>>>>>> Jozef
>>>>>>
>>>>>>
>

Re: Write bulks files from streaming app

Posted by Stephan Ewen <se...@apache.org>.
For what it's worth, in Flink directly we found that this pattern is
generally not a well working one: windowing data in large windows in order
to perform large bulk writes.

Instead, the sinks (to file systems) continuously write (possibly across
different destination files) files, ensure persistence at checkpoints, can
roll back the output in a file system specific ways to the previous
checkpoint. That way, there is no data buffering in state (memory, rocksdb,
etc) at all, only metadata tracking.

For bulk encoders (like parquet), one needs an additional step, to
encode/compress when the specific destination file is done (if you think in
Hadoop terms, that would be in the "commit" step).


On Sun, Jul 22, 2018 at 2:10 PM, Jozef Vilcek <jo...@gmail.com> wrote:

> I looked into Wait.on() but doc say it waits untill window is completely
> done, so it is not quite fir for my case, as my lateness can be a day or
> two and I would like to compact and publish hourly data sooner.
>
> What i am thinking of is write triggers under different location than
> target. I will have lot's of EARLY fires for main data and than some LATE
> fires. What I would like to do is observe all EARLY fires for window
> (ideally in ON_TIME event time) in one group and move those files to target
> dir by merging them. Observed LATE fires would be just moved immediately
> because that is not much and does not hurt to keep them fragmented now.
>
> The question is if it make sense and can be done with Beam? FileIO returns
> WriteFilesResult where I can call `getPerDestinationOutputFilenames()`
> which returns me a collection of KV with key being destination and value
> being a file which was written. I tried to window it again with different
> triggers (no early trigger) and groupBy key, but so far, no luck as it
> never yield a collection of files in which were emitted as EARLY in first
> window.
>
>
> On Fri, Jul 20, 2018 at 9:06 PM Raghu Angadi <ra...@google.com> wrote:
>
>> On Fri, Jul 20, 2018 at 2:58 AM Jozef Vilcek <jo...@gmail.com>
>> wrote:
>>
>>> Hm, that is interesting idea to make the write composite and merge files
>>> later. Do not know Beam well yet.
>>> I will look into it and learn about Wait.on() transform (wonder how it
>>> will work with late fires). Thanks!
>>>
>>> But keeps me thinking...
>>> Does it make sense to have support from SDK?
>>> Is my use case that uncommon? Not fit for Beam? How does others out
>>> there does similar thing?
>>>
>>
>> SDK does allow it. Looks like you are running into scaling and memory
>> limits with amount of state stored in large windows. This is something that
>> will improve. I am not familiar enough with Flink runner to  comment on
>> specifics. I was mainly thinking of a work around.
>>
>> Raghu.
>>
>>
>>>
>>> On Thu, Jul 19, 2018 at 11:21 PM Raghu Angadi <ra...@google.com>
>>> wrote:
>>>
>>>> One option (but requires more code): Write to smaller files with
>>>> frequent triggers to directory_X and once the window properly closes, copy
>>>> all the files to a single file in your own DoFn. This is certainly more
>>>> code on your part, but might be worth it. You can use Wait.on() transoform
>>>> to run your finalizer DoFn right after the window that writes smaller files
>>>> closes.
>>>>
>>>>
>>>> On Thu, Jul 19, 2018 at 2:43 AM Jozef Vilcek <jo...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hey,
>>>>>
>>>>> I am looking for the advice.
>>>>>
>>>>> I am trying to do a stream processing with Beam on Flink runtime.
>>>>> Reading data from Kafka, doing some processing with it which is not
>>>>> important here and in the same time want to store consumed data to history
>>>>> storage for archive and reprocessing, which is HDFS.
>>>>>
>>>>> Now, the part of writing batches to HDFS is giving me hard time.
>>>>> Logically, I want to do:
>>>>>
>>>>> fileIO = FileIO.writeDynamic()
>>>>>         .by(destinationFn)
>>>>>         .via(AvroIO.sink(avroClass))
>>>>>         .to(path)
>>>>>         .withNaming(namingFn)
>>>>>         .withTempDirectory(tmp)
>>>>>         .withNumShards(shards)
>>>>>
>>>>> data
>>>>>    .withFixedWindow(1H, afterWatermarkTrigger, discardFiredPanes)
>>>>>    .saveTo(fileIO)
>>>>>
>>>>>
>>>>> This write generates in Flink execution graph 3 operators, which I do
>>>>> not full understand yet.
>>>>>
>>>>> Now, the problem is, that I am not able to run this at scale.
>>>>>
>>>>> If I want to write big enough files to not to have lots of files on
>>>>> HDFS, I keep running into the OOM. With Flink, I use rocksdb state backend
>>>>> and I was warned about this JIRA which is probably related to my OOM
>>>>> https://issues.apache.org/jira/browse/FLINK-8297
>>>>> Therefore, I need to trigger more often and small batches which leads
>>>>> to too many files on HDFS.
>>>>>
>>>>> Question here is, if there is some path I do not see how to make this
>>>>> work ( write bulks of data to HDFS of my choosing without running to memory
>>>>> troubles ). Also, keeping whole window data which is designated for write
>>>>> to output to filesystem in state involves more IO.
>>>>>
>>>>> Thanks for any thoughts and guidelines,
>>>>> Jozef
>>>>>
>>>>>

Re: Write bulks files from streaming app

Posted by Stephan Ewen <se...@apache.org>.
For what it's worth, in Flink directly we found that this pattern is
generally not a well working one: windowing data in large windows in order
to perform large bulk writes.

Instead, the sinks (to file systems) continuously write (possibly across
different destination files) files, ensure persistence at checkpoints, can
roll back the output in a file system specific ways to the previous
checkpoint. That way, there is no data buffering in state (memory, rocksdb,
etc) at all, only metadata tracking.

For bulk encoders (like parquet), one needs an additional step, to
encode/compress when the specific destination file is done (if you think in
Hadoop terms, that would be in the "commit" step).


On Sun, Jul 22, 2018 at 2:10 PM, Jozef Vilcek <jo...@gmail.com> wrote:

> I looked into Wait.on() but doc say it waits untill window is completely
> done, so it is not quite fir for my case, as my lateness can be a day or
> two and I would like to compact and publish hourly data sooner.
>
> What i am thinking of is write triggers under different location than
> target. I will have lot's of EARLY fires for main data and than some LATE
> fires. What I would like to do is observe all EARLY fires for window
> (ideally in ON_TIME event time) in one group and move those files to target
> dir by merging them. Observed LATE fires would be just moved immediately
> because that is not much and does not hurt to keep them fragmented now.
>
> The question is if it make sense and can be done with Beam? FileIO returns
> WriteFilesResult where I can call `getPerDestinationOutputFilenames()`
> which returns me a collection of KV with key being destination and value
> being a file which was written. I tried to window it again with different
> triggers (no early trigger) and groupBy key, but so far, no luck as it
> never yield a collection of files in which were emitted as EARLY in first
> window.
>
>
> On Fri, Jul 20, 2018 at 9:06 PM Raghu Angadi <ra...@google.com> wrote:
>
>> On Fri, Jul 20, 2018 at 2:58 AM Jozef Vilcek <jo...@gmail.com>
>> wrote:
>>
>>> Hm, that is interesting idea to make the write composite and merge files
>>> later. Do not know Beam well yet.
>>> I will look into it and learn about Wait.on() transform (wonder how it
>>> will work with late fires). Thanks!
>>>
>>> But keeps me thinking...
>>> Does it make sense to have support from SDK?
>>> Is my use case that uncommon? Not fit for Beam? How does others out
>>> there does similar thing?
>>>
>>
>> SDK does allow it. Looks like you are running into scaling and memory
>> limits with amount of state stored in large windows. This is something that
>> will improve. I am not familiar enough with Flink runner to  comment on
>> specifics. I was mainly thinking of a work around.
>>
>> Raghu.
>>
>>
>>>
>>> On Thu, Jul 19, 2018 at 11:21 PM Raghu Angadi <ra...@google.com>
>>> wrote:
>>>
>>>> One option (but requires more code): Write to smaller files with
>>>> frequent triggers to directory_X and once the window properly closes, copy
>>>> all the files to a single file in your own DoFn. This is certainly more
>>>> code on your part, but might be worth it. You can use Wait.on() transoform
>>>> to run your finalizer DoFn right after the window that writes smaller files
>>>> closes.
>>>>
>>>>
>>>> On Thu, Jul 19, 2018 at 2:43 AM Jozef Vilcek <jo...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hey,
>>>>>
>>>>> I am looking for the advice.
>>>>>
>>>>> I am trying to do a stream processing with Beam on Flink runtime.
>>>>> Reading data from Kafka, doing some processing with it which is not
>>>>> important here and in the same time want to store consumed data to history
>>>>> storage for archive and reprocessing, which is HDFS.
>>>>>
>>>>> Now, the part of writing batches to HDFS is giving me hard time.
>>>>> Logically, I want to do:
>>>>>
>>>>> fileIO = FileIO.writeDynamic()
>>>>>         .by(destinationFn)
>>>>>         .via(AvroIO.sink(avroClass))
>>>>>         .to(path)
>>>>>         .withNaming(namingFn)
>>>>>         .withTempDirectory(tmp)
>>>>>         .withNumShards(shards)
>>>>>
>>>>> data
>>>>>    .withFixedWindow(1H, afterWatermarkTrigger, discardFiredPanes)
>>>>>    .saveTo(fileIO)
>>>>>
>>>>>
>>>>> This write generates in Flink execution graph 3 operators, which I do
>>>>> not full understand yet.
>>>>>
>>>>> Now, the problem is, that I am not able to run this at scale.
>>>>>
>>>>> If I want to write big enough files to not to have lots of files on
>>>>> HDFS, I keep running into the OOM. With Flink, I use rocksdb state backend
>>>>> and I was warned about this JIRA which is probably related to my OOM
>>>>> https://issues.apache.org/jira/browse/FLINK-8297
>>>>> Therefore, I need to trigger more often and small batches which leads
>>>>> to too many files on HDFS.
>>>>>
>>>>> Question here is, if there is some path I do not see how to make this
>>>>> work ( write bulks of data to HDFS of my choosing without running to memory
>>>>> troubles ). Also, keeping whole window data which is designated for write
>>>>> to output to filesystem in state involves more IO.
>>>>>
>>>>> Thanks for any thoughts and guidelines,
>>>>> Jozef
>>>>>
>>>>>

Re: Write bulks files from streaming app

Posted by Jozef Vilcek <jo...@gmail.com>.
I looked into Wait.on() but doc say it waits untill window is completely
done, so it is not quite fir for my case, as my lateness can be a day or
two and I would like to compact and publish hourly data sooner.

What i am thinking of is write triggers under different location than
target. I will have lot's of EARLY fires for main data and than some LATE
fires. What I would like to do is observe all EARLY fires for window
(ideally in ON_TIME event time) in one group and move those files to target
dir by merging them. Observed LATE fires would be just moved immediately
because that is not much and does not hurt to keep them fragmented now.

The question is if it make sense and can be done with Beam? FileIO returns
WriteFilesResult where I can call `getPerDestinationOutputFilenames()`
which returns me a collection of KV with key being destination and value
being a file which was written. I tried to window it again with different
triggers (no early trigger) and groupBy key, but so far, no luck as it
never yield a collection of files in which were emitted as EARLY in first
window.


On Fri, Jul 20, 2018 at 9:06 PM Raghu Angadi <ra...@google.com> wrote:

> On Fri, Jul 20, 2018 at 2:58 AM Jozef Vilcek <jo...@gmail.com>
> wrote:
>
>> Hm, that is interesting idea to make the write composite and merge files
>> later. Do not know Beam well yet.
>> I will look into it and learn about Wait.on() transform (wonder how it
>> will work with late fires). Thanks!
>>
>> But keeps me thinking...
>> Does it make sense to have support from SDK?
>> Is my use case that uncommon? Not fit for Beam? How does others out there
>> does similar thing?
>>
>
> SDK does allow it. Looks like you are running into scaling and memory
> limits with amount of state stored in large windows. This is something that
> will improve. I am not familiar enough with Flink runner to  comment on
> specifics. I was mainly thinking of a work around.
>
> Raghu.
>
>
>>
>> On Thu, Jul 19, 2018 at 11:21 PM Raghu Angadi <ra...@google.com> wrote:
>>
>>> One option (but requires more code): Write to smaller files with
>>> frequent triggers to directory_X and once the window properly closes, copy
>>> all the files to a single file in your own DoFn. This is certainly more
>>> code on your part, but might be worth it. You can use Wait.on() transoform
>>> to run your finalizer DoFn right after the window that writes smaller files
>>> closes.
>>>
>>>
>>> On Thu, Jul 19, 2018 at 2:43 AM Jozef Vilcek <jo...@gmail.com>
>>> wrote:
>>>
>>>> Hey,
>>>>
>>>> I am looking for the advice.
>>>>
>>>> I am trying to do a stream processing with Beam on Flink runtime.
>>>> Reading data from Kafka, doing some processing with it which is not
>>>> important here and in the same time want to store consumed data to history
>>>> storage for archive and reprocessing, which is HDFS.
>>>>
>>>> Now, the part of writing batches to HDFS is giving me hard time.
>>>> Logically, I want to do:
>>>>
>>>> fileIO = FileIO.writeDynamic()
>>>>         .by(destinationFn)
>>>>         .via(AvroIO.sink(avroClass))
>>>>         .to(path)
>>>>         .withNaming(namingFn)
>>>>         .withTempDirectory(tmp)
>>>>         .withNumShards(shards)
>>>>
>>>> data
>>>>    .withFixedWindow(1H, afterWatermarkTrigger, discardFiredPanes)
>>>>    .saveTo(fileIO)
>>>>
>>>>
>>>> This write generates in Flink execution graph 3 operators, which I do
>>>> not full understand yet.
>>>>
>>>> Now, the problem is, that I am not able to run this at scale.
>>>>
>>>> If I want to write big enough files to not to have lots of files on
>>>> HDFS, I keep running into the OOM. With Flink, I use rocksdb state backend
>>>> and I was warned about this JIRA which is probably related to my OOM
>>>> https://issues.apache.org/jira/browse/FLINK-8297
>>>> Therefore, I need to trigger more often and small batches which leads
>>>> to too many files on HDFS.
>>>>
>>>> Question here is, if there is some path I do not see how to make this
>>>> work ( write bulks of data to HDFS of my choosing without running to memory
>>>> troubles ). Also, keeping whole window data which is designated for write
>>>> to output to filesystem in state involves more IO.
>>>>
>>>> Thanks for any thoughts and guidelines,
>>>> Jozef
>>>>
>>>>

Re: Write bulks files from streaming app

Posted by Raghu Angadi <ra...@google.com>.
On Fri, Jul 20, 2018 at 2:58 AM Jozef Vilcek <jo...@gmail.com> wrote:

> Hm, that is interesting idea to make the write composite and merge files
> later. Do not know Beam well yet.
> I will look into it and learn about Wait.on() transform (wonder how it
> will work with late fires). Thanks!
>
> But keeps me thinking...
> Does it make sense to have support from SDK?
> Is my use case that uncommon? Not fit for Beam? How does others out there
> does similar thing?
>

SDK does allow it. Looks like you are running into scaling and memory
limits with amount of state stored in large windows. This is something that
will improve. I am not familiar enough with Flink runner to  comment on
specifics. I was mainly thinking of a work around.

Raghu.


>
> On Thu, Jul 19, 2018 at 11:21 PM Raghu Angadi <ra...@google.com> wrote:
>
>> One option (but requires more code): Write to smaller files with frequent
>> triggers to directory_X and once the window properly closes, copy all the
>> files to a single file in your own DoFn. This is certainly more code on
>> your part, but might be worth it. You can use Wait.on() transoform to run
>> your finalizer DoFn right after the window that writes smaller files closes.
>>
>>
>> On Thu, Jul 19, 2018 at 2:43 AM Jozef Vilcek <jo...@gmail.com>
>> wrote:
>>
>>> Hey,
>>>
>>> I am looking for the advice.
>>>
>>> I am trying to do a stream processing with Beam on Flink runtime.
>>> Reading data from Kafka, doing some processing with it which is not
>>> important here and in the same time want to store consumed data to history
>>> storage for archive and reprocessing, which is HDFS.
>>>
>>> Now, the part of writing batches to HDFS is giving me hard time.
>>> Logically, I want to do:
>>>
>>> fileIO = FileIO.writeDynamic()
>>>         .by(destinationFn)
>>>         .via(AvroIO.sink(avroClass))
>>>         .to(path)
>>>         .withNaming(namingFn)
>>>         .withTempDirectory(tmp)
>>>         .withNumShards(shards)
>>>
>>> data
>>>    .withFixedWindow(1H, afterWatermarkTrigger, discardFiredPanes)
>>>    .saveTo(fileIO)
>>>
>>>
>>> This write generates in Flink execution graph 3 operators, which I do
>>> not full understand yet.
>>>
>>> Now, the problem is, that I am not able to run this at scale.
>>>
>>> If I want to write big enough files to not to have lots of files on
>>> HDFS, I keep running into the OOM. With Flink, I use rocksdb state backend
>>> and I was warned about this JIRA which is probably related to my OOM
>>> https://issues.apache.org/jira/browse/FLINK-8297
>>> Therefore, I need to trigger more often and small batches which leads to
>>> too many files on HDFS.
>>>
>>> Question here is, if there is some path I do not see how to make this
>>> work ( write bulks of data to HDFS of my choosing without running to memory
>>> troubles ). Also, keeping whole window data which is designated for write
>>> to output to filesystem in state involves more IO.
>>>
>>> Thanks for any thoughts and guidelines,
>>> Jozef
>>>
>>>

Re: Write bulks files from streaming app

Posted by Jozef Vilcek <jo...@gmail.com>.
Hm, that is interesting idea to make the write composite and merge files
later. Do not know Beam well yet.
I will look into it and learn about Wait.on() transform (wonder how it will
work with late fires). Thanks!

But keeps me thinking...
Does it make sense to have support from SDK?
Is my use case that uncommon? Not fit for Beam? How does others out there
does similar thing?


On Thu, Jul 19, 2018 at 11:21 PM Raghu Angadi <ra...@google.com> wrote:

> One option (but requires more code): Write to smaller files with frequent
> triggers to directory_X and once the window properly closes, copy all the
> files to a single file in your own DoFn. This is certainly more code on
> your part, but might be worth it. You can use Wait.on() transoform to run
> your finalizer DoFn right after the window that writes smaller files closes.
>
>
> On Thu, Jul 19, 2018 at 2:43 AM Jozef Vilcek <jo...@gmail.com>
> wrote:
>
>> Hey,
>>
>> I am looking for the advice.
>>
>> I am trying to do a stream processing with Beam on Flink runtime. Reading
>> data from Kafka, doing some processing with it which is not important here
>> and in the same time want to store consumed data to history storage for
>> archive and reprocessing, which is HDFS.
>>
>> Now, the part of writing batches to HDFS is giving me hard time.
>> Logically, I want to do:
>>
>> fileIO = FileIO.writeDynamic()
>>         .by(destinationFn)
>>         .via(AvroIO.sink(avroClass))
>>         .to(path)
>>         .withNaming(namingFn)
>>         .withTempDirectory(tmp)
>>         .withNumShards(shards)
>>
>> data
>>    .withFixedWindow(1H, afterWatermarkTrigger, discardFiredPanes)
>>    .saveTo(fileIO)
>>
>>
>> This write generates in Flink execution graph 3 operators, which I do not
>> full understand yet.
>>
>> Now, the problem is, that I am not able to run this at scale.
>>
>> If I want to write big enough files to not to have lots of files on HDFS,
>> I keep running into the OOM. With Flink, I use rocksdb state backend and I
>> was warned about this JIRA which is probably related to my OOM
>> https://issues.apache.org/jira/browse/FLINK-8297
>> Therefore, I need to trigger more often and small batches which leads to
>> too many files on HDFS.
>>
>> Question here is, if there is some path I do not see how to make this
>> work ( write bulks of data to HDFS of my choosing without running to memory
>> troubles ). Also, keeping whole window data which is designated for write
>> to output to filesystem in state involves more IO.
>>
>> Thanks for any thoughts and guidelines,
>> Jozef
>>
>>

Re: Write bulks files from streaming app

Posted by Raghu Angadi <ra...@google.com>.
One option (but requires more code): Write to smaller files with frequent
triggers to directory_X and once the window properly closes, copy all the
files to a single file in your own DoFn. This is certainly more code on
your part, but might be worth it. You can use Wait.on() transoform to run
your finalizer DoFn right after the window that writes smaller files closes.


On Thu, Jul 19, 2018 at 2:43 AM Jozef Vilcek <jo...@gmail.com> wrote:

> Hey,
>
> I am looking for the advice.
>
> I am trying to do a stream processing with Beam on Flink runtime. Reading
> data from Kafka, doing some processing with it which is not important here
> and in the same time want to store consumed data to history storage for
> archive and reprocessing, which is HDFS.
>
> Now, the part of writing batches to HDFS is giving me hard time.
> Logically, I want to do:
>
> fileIO = FileIO.writeDynamic()
>         .by(destinationFn)
>         .via(AvroIO.sink(avroClass))
>         .to(path)
>         .withNaming(namingFn)
>         .withTempDirectory(tmp)
>         .withNumShards(shards)
>
> data
>    .withFixedWindow(1H, afterWatermarkTrigger, discardFiredPanes)
>    .saveTo(fileIO)
>
>
> This write generates in Flink execution graph 3 operators, which I do not
> full understand yet.
>
> Now, the problem is, that I am not able to run this at scale.
>
> If I want to write big enough files to not to have lots of files on HDFS,
> I keep running into the OOM. With Flink, I use rocksdb state backend and I
> was warned about this JIRA which is probably related to my OOM
> https://issues.apache.org/jira/browse/FLINK-8297
> Therefore, I need to trigger more often and small batches which leads to
> too many files on HDFS.
>
> Question here is, if there is some path I do not see how to make this work
> ( write bulks of data to HDFS of my choosing without running to memory
> troubles ). Also, keeping whole window data which is designated for write
> to output to filesystem in state involves more IO.
>
> Thanks for any thoughts and guidelines,
> Jozef
>
>