You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Robert Schmidtke <ro...@gmail.com> on 2017/04/07 08:20:14 UTC

Disk I/O in Flink

Hi,

I'm currently examining the I/O patterns of Flink, and I'd like to know
when/how Flink goes to disk. Let me give an introduction of what I have
done so far.

I am running TeraGen (from the Hadoop examples package) + TeraSort (
https://github.com/robert-schmidtke/terasort) on a 16 node cluster, each
node with 64 GiB of memory, 2x32 cores, and roughly half a terabyte of
disk. I'm using YARN and HDFS. The underlying file system is XFS.

Now before running TeraGen and TeraSort, I reset the XFS counters to zero,
and after TeraGen + TeraSort are finished, I dump the XFS counters again.
Accumulated over the entire cluster I get 3 TiB of writes and 3.2 TiB of
reads. What I'd have expected would be 2 TiB of writes (1 for TeraGen, 1
for TeraSort) and 1 TiB of reads (during TeraSort).

Unsatisfied by the coarseness of these numbers I developed an HDFS wrapper
that logs file system statistics for each call to hdfs://..., such as start
time/end time, no. of bytes read/written etc. I can plot these numbers and
see what I expect: during TeraGen I have 1 TiB of writes to hdfs://...,
during TeraSort I have 1 TiB of reads from and 1 TiB of writes to
hdfs://... So far, so good.

Now this still did not explain the disk I/O, so I added bytecode
instrumentation to a range of Java classes, like FileIn/OutputStream,
RandomAccessFile, FileChannel, ZipFile, multiple *Buffer classes for memory
mapped files etc., and have the same statistics: start/end of a read
from/write to disk, no. of bytes involved and such. I can plot these
numbers too and see that the HDFS JVMs write 1 TiB of data to disk during
TeraGen (expected) and read and write 1 TiB from and to disk during
TeraSort (expected).

Sorry for the enormous introduction, but now there's finally the
interesting part: Flink's JVMs read from and write to disk 1 TiB of data
each during TeraSort. I'm suspecting there is some sort of spilling
involved, potentially because I have not done the setup properly. But that
is not the crucial point: my statistics give a total of 3 TiB of writes to
disk (2 TiB for HDFS, 1 TiB for Flink), which agrees with the XFS counters
from above. However, my statistics only give 2 TiB of reads from disk (1
TiB for HDFS, 1 TiB for Flink), so I'm missing an entire TiB of reads from
disk somewhere. I have done the same with Hadoop TeraSort, and there I'm
not missing any data, meaning my statistics agree with XFS for TeraSort on
Hadoop, which is why I suspect there are some cases where Flink goes to
disk without me noticing it.

Therefore here finally the question: in which cases does Flink go to disk,
and how does it do so (meaning precisely which Java classes are involved,
so I can check my bytecode instrumentation)? This would also include any
kind of resource distribution via HDFS/YARN I guess (like JAR files and I
don't know what). Seeing that I'm missing an amount of data equal to the
size of my input set I'd suspect there must be some sort of
shuffling/spilling at play here, but I'm not sure. Maybe there is also some
sort of remote I/O involved via sockets or so that I'm missing.

Any hints as to where Flink might incur disk I/O are greatly appreciated!
I'm also happy with doing the digging myself, once pointed to the proper
packages in the Apache Flink source tree (I have done my fair share of
inspection already, but could not be sure whether or not I have missed
something). Thanks a lot in advance!

Robert

-- 
My GPG Key ID: 336E2680

Re: Disk I/O in Flink

Posted by Robert Schmidtke <ro...@gmail.com>.
Minor update: I have executed the flink-runtime tests on XFS, Lustre and
DVS (Cray DataWarp), and I observe divergences on XFS and Lustre, but not
on DVS. It turns out that cached reads are reported by the file systems as
well, so I don't think caching is an issue here. There might still be some
threading issues which I do not cover, and maybe for some reason DVS
serializes access, which is why my statistics and DVS agree to 100%. I'll
get more experiments going and report back.

Robert

On Sat, Apr 29, 2017 at 4:53 PM, Robert Schmidtke <ro...@gmail.com>
wrote:

> Hey Martin,
>
> I'm still on it. I have switched to analyzing the flink-runtime tests, as
> I observe similar divergence there. I'm not sure how long it'll take, but
> if I find something I'll make sure to let you all know :)
>
> Robert
>
> On Sat, Apr 29, 2017 at 3:12 PM, Martin Eden <ma...@gmail.com>
> wrote:
>
>> Hi Robert,
>>
>> Any updates on the below for the community?
>>
>> Thanks,
>> M
>>
>> On Tue, Apr 25, 2017 at 8:50 AM, Robert Schmidtke <ro.schmidtke@gmail.com
>> > wrote:
>>
>>> Hi Ufuk, thanks for coming back to me on this.
>>>
>>> The records are 100 bytes in size, the benchmark being TeraSort, so that
>>> should not be an issue. I have played around with the input size, and here
>>> are my observations:
>>>
>>> 128 GiB input: 0 Spilling in Flink.
>>> 256 GiB input: 88 GiB Spilling in Flink (so 88 GiB of reads, 88 GiB of
>>> writes), and my instrumentation covers all of it.
>>> 384 GiB input: 391 GiB Spilling in Flink, and I cover all of it.
>>> 512 GiB input: 522 GiB Spilling in Flink, but I miss 140 GiB of it.
>>> 640 GiB input: 653 GiB Spilling in Flink, but I miss 281 GiB of it.
>>> 768 GiB input: 784 GiB Spilling in Flink, but I miss 490 GiB of it.
>>> 896 GiB input: 914 GiB Spilling in Flink, but I miss 662 GiB of it.
>>> 1024 GiB input: 1045 GiB Spilling in Flink, but I miss 968 GiB of it.
>>>
>>> So regardless of how well configured my system is and spilling is even
>>> necessary, it seems that with larger spilling amounts, the way the data is
>>> spilled changes (and I start missing larger and larger portions of I/O
>>> until almost 100%).
>>> Now since I have written the instrumentation myself, I cannot guarantee
>>> that it is flawless and I might have missed something.
>>> I'm currently looking into how the file channels are being accessed in
>>> parallel by multiple threads, which I cover as well and my tests verify it,
>>> but maybe there are special access patterns here.
>>>
>>> Robert
>>>
>>> On Mon, Apr 24, 2017 at 2:25 PM, Ufuk Celebi <uc...@apache.org> wrote:
>>>
>>>> Hey Robert,
>>>>
>>>> for batch that should cover the relevant spilling code. If the records
>>>> are >= 5 MB, the SpillingAdaptiveSpanningRecordDeserializer will spill
>>>> incoming records as well. But that should be covered by the
>>>> FileChannel instrumentation as well?
>>>>
>>>> – Ufuk
>>>>
>>>>
>>>> On Tue, Apr 18, 2017 at 3:57 PM, Robert Schmidtke
>>>> <ro...@gmail.com> wrote:
>>>> > Hi,
>>>> >
>>>> > I have already looked at the UnilateralSortMerger, concluding that
>>>> all I/O
>>>> > eventually goes via SegmentReadRequest and SegmentWriteRequest (which
>>>> in
>>>> > turn use java.nio.channels.FileChannel) in AsynchronousFileIOChannel.
>>>> Are
>>>> > there more interaction points between Flink and the underlying file
>>>> system
>>>> > that I might want to consider?
>>>> >
>>>> > Thanks!
>>>> > Robert
>>>> >
>>>> > On Fri, Apr 7, 2017 at 5:02 PM, Kurt Young <yk...@gmail.com> wrote:
>>>> >>
>>>> >> Hi,
>>>> >>
>>>> >> You probably want check out UnilateralSortMerger.java, this is the
>>>> class
>>>> >> which is responsible for external sort for flink. Here is a short
>>>> >> description for how it works: there are totally 3 threads working
>>>> together,
>>>> >> one for reading, one for sorting partial data in memory, and the
>>>> last one is
>>>> >> responsible for spilling. Flink will first figure out how many
>>>> memory it can
>>>> >> use during the in-memory sort, and manage them as MemorySegments.
>>>> Once these
>>>> >> memory runs out, the sorting thread will take over these memory and
>>>> do the
>>>> >> in-memory sorting (For more details about in-memory sorting, you can
>>>> see
>>>> >> NormalizedKeySorter). After this, the spilling thread will write
>>>> this sorted
>>>> >> data to disk and make these memory available again for reading. This
>>>> will
>>>> >> repeated until all data has been processed.
>>>> >> Normally, the data will be read twice (one from source, and one from
>>>> disk)
>>>> >> and write once, but if you spilled too much files, flink will first
>>>> merge
>>>> >> some all the files and make sure the last merge step will not exceed
>>>> some
>>>> >> limit (default 128). Hope this can help you.
>>>> >>
>>>> >> Best,
>>>> >> Kurt
>>>> >>
>>>> >> On Fri, Apr 7, 2017 at 4:20 PM, Robert Schmidtke <
>>>> ro.schmidtke@gmail.com>
>>>> >> wrote:
>>>> >>>
>>>> >>> Hi,
>>>> >>>
>>>> >>> I'm currently examining the I/O patterns of Flink, and I'd like to
>>>> know
>>>> >>> when/how Flink goes to disk. Let me give an introduction of what I
>>>> have done
>>>> >>> so far.
>>>> >>>
>>>> >>> I am running TeraGen (from the Hadoop examples package) + TeraSort
>>>> >>> (https://github.com/robert-schmidtke/terasort) on a 16 node
>>>> cluster, each
>>>> >>> node with 64 GiB of memory, 2x32 cores, and roughly half a terabyte
>>>> of disk.
>>>> >>> I'm using YARN and HDFS. The underlying file system is XFS.
>>>> >>>
>>>> >>> Now before running TeraGen and TeraSort, I reset the XFS counters to
>>>> >>> zero, and after TeraGen + TeraSort are finished, I dump the XFS
>>>> counters
>>>> >>> again. Accumulated over the entire cluster I get 3 TiB of writes
>>>> and 3.2 TiB
>>>> >>> of reads. What I'd have expected would be 2 TiB of writes (1 for
>>>> TeraGen, 1
>>>> >>> for TeraSort) and 1 TiB of reads (during TeraSort).
>>>> >>>
>>>> >>> Unsatisfied by the coarseness of these numbers I developed an HDFS
>>>> >>> wrapper that logs file system statistics for each call to
>>>> hdfs://..., such
>>>> >>> as start time/end time, no. of bytes read/written etc. I can plot
>>>> these
>>>> >>> numbers and see what I expect: during TeraGen I have 1 TiB of
>>>> writes to
>>>> >>> hdfs://..., during TeraSort I have 1 TiB of reads from and 1 TiB of
>>>> writes
>>>> >>> to hdfs://... So far, so good.
>>>> >>>
>>>> >>> Now this still did not explain the disk I/O, so I added bytecode
>>>> >>> instrumentation to a range of Java classes, like
>>>> FileIn/OutputStream,
>>>> >>> RandomAccessFile, FileChannel, ZipFile, multiple *Buffer classes
>>>> for memory
>>>> >>> mapped files etc., and have the same statistics: start/end of a read
>>>> >>> from/write to disk, no. of bytes involved and such. I can plot
>>>> these numbers
>>>> >>> too and see that the HDFS JVMs write 1 TiB of data to disk during
>>>> TeraGen
>>>> >>> (expected) and read and write 1 TiB from and to disk during TeraSort
>>>> >>> (expected).
>>>> >>>
>>>> >>> Sorry for the enormous introduction, but now there's finally the
>>>> >>> interesting part: Flink's JVMs read from and write to disk 1 TiB of
>>>> data
>>>> >>> each during TeraSort. I'm suspecting there is some sort of spilling
>>>> >>> involved, potentially because I have not done the setup properly.
>>>> But that
>>>> >>> is not the crucial point: my statistics give a total of 3 TiB of
>>>> writes to
>>>> >>> disk (2 TiB for HDFS, 1 TiB for Flink), which agrees with the XFS
>>>> counters
>>>> >>> from above. However, my statistics only give 2 TiB of reads from
>>>> disk (1 TiB
>>>> >>> for HDFS, 1 TiB for Flink), so I'm missing an entire TiB of reads
>>>> from disk
>>>> >>> somewhere. I have done the same with Hadoop TeraSort, and there I'm
>>>> not
>>>> >>> missing any data, meaning my statistics agree with XFS for TeraSort
>>>> on
>>>> >>> Hadoop, which is why I suspect there are some cases where Flink
>>>> goes to disk
>>>> >>> without me noticing it.
>>>> >>>
>>>> >>> Therefore here finally the question: in which cases does Flink go to
>>>> >>> disk, and how does it do so (meaning precisely which Java classes
>>>> are
>>>> >>> involved, so I can check my bytecode instrumentation)? This would
>>>> also
>>>> >>> include any kind of resource distribution via HDFS/YARN I guess
>>>> (like JAR
>>>> >>> files and I don't know what). Seeing that I'm missing an amount of
>>>> data
>>>> >>> equal to the size of my input set I'd suspect there must be some
>>>> sort of
>>>> >>> shuffling/spilling at play here, but I'm not sure. Maybe there is
>>>> also some
>>>> >>> sort of remote I/O involved via sockets or so that I'm missing.
>>>> >>>
>>>> >>> Any hints as to where Flink might incur disk I/O are greatly
>>>> appreciated!
>>>> >>> I'm also happy with doing the digging myself, once pointed to the
>>>> proper
>>>> >>> packages in the Apache Flink source tree (I have done my fair share
>>>> of
>>>> >>> inspection already, but could not be sure whether or not I have
>>>> missed
>>>> >>> something). Thanks a lot in advance!
>>>> >>>
>>>> >>> Robert
>>>> >>>
>>>> >>> --
>>>> >>> My GPG Key ID: 336E2680
>>>> >>
>>>> >>
>>>> >
>>>> >
>>>> >
>>>> > --
>>>> > My GPG Key ID: 336E2680
>>>>
>>>
>>>
>>>
>>> --
>>> My GPG Key ID: 336E2680
>>>
>>
>>
>
>
> --
> My GPG Key ID: 336E2680
>



-- 
My GPG Key ID: 336E2680

Re: Disk I/O in Flink

Posted by Robert Schmidtke <ro...@gmail.com>.
Hey Martin,

I'm still on it. I have switched to analyzing the flink-runtime tests, as I
observe similar divergence there. I'm not sure how long it'll take, but if
I find something I'll make sure to let you all know :)

Robert

On Sat, Apr 29, 2017 at 3:12 PM, Martin Eden <ma...@gmail.com>
wrote:

> Hi Robert,
>
> Any updates on the below for the community?
>
> Thanks,
> M
>
> On Tue, Apr 25, 2017 at 8:50 AM, Robert Schmidtke <ro...@gmail.com>
> wrote:
>
>> Hi Ufuk, thanks for coming back to me on this.
>>
>> The records are 100 bytes in size, the benchmark being TeraSort, so that
>> should not be an issue. I have played around with the input size, and here
>> are my observations:
>>
>> 128 GiB input: 0 Spilling in Flink.
>> 256 GiB input: 88 GiB Spilling in Flink (so 88 GiB of reads, 88 GiB of
>> writes), and my instrumentation covers all of it.
>> 384 GiB input: 391 GiB Spilling in Flink, and I cover all of it.
>> 512 GiB input: 522 GiB Spilling in Flink, but I miss 140 GiB of it.
>> 640 GiB input: 653 GiB Spilling in Flink, but I miss 281 GiB of it.
>> 768 GiB input: 784 GiB Spilling in Flink, but I miss 490 GiB of it.
>> 896 GiB input: 914 GiB Spilling in Flink, but I miss 662 GiB of it.
>> 1024 GiB input: 1045 GiB Spilling in Flink, but I miss 968 GiB of it.
>>
>> So regardless of how well configured my system is and spilling is even
>> necessary, it seems that with larger spilling amounts, the way the data is
>> spilled changes (and I start missing larger and larger portions of I/O
>> until almost 100%).
>> Now since I have written the instrumentation myself, I cannot guarantee
>> that it is flawless and I might have missed something.
>> I'm currently looking into how the file channels are being accessed in
>> parallel by multiple threads, which I cover as well and my tests verify it,
>> but maybe there are special access patterns here.
>>
>> Robert
>>
>> On Mon, Apr 24, 2017 at 2:25 PM, Ufuk Celebi <uc...@apache.org> wrote:
>>
>>> Hey Robert,
>>>
>>> for batch that should cover the relevant spilling code. If the records
>>> are >= 5 MB, the SpillingAdaptiveSpanningRecordDeserializer will spill
>>> incoming records as well. But that should be covered by the
>>> FileChannel instrumentation as well?
>>>
>>> – Ufuk
>>>
>>>
>>> On Tue, Apr 18, 2017 at 3:57 PM, Robert Schmidtke
>>> <ro...@gmail.com> wrote:
>>> > Hi,
>>> >
>>> > I have already looked at the UnilateralSortMerger, concluding that all
>>> I/O
>>> > eventually goes via SegmentReadRequest and SegmentWriteRequest (which
>>> in
>>> > turn use java.nio.channels.FileChannel) in AsynchronousFileIOChannel.
>>> Are
>>> > there more interaction points between Flink and the underlying file
>>> system
>>> > that I might want to consider?
>>> >
>>> > Thanks!
>>> > Robert
>>> >
>>> > On Fri, Apr 7, 2017 at 5:02 PM, Kurt Young <yk...@gmail.com> wrote:
>>> >>
>>> >> Hi,
>>> >>
>>> >> You probably want check out UnilateralSortMerger.java, this is the
>>> class
>>> >> which is responsible for external sort for flink. Here is a short
>>> >> description for how it works: there are totally 3 threads working
>>> together,
>>> >> one for reading, one for sorting partial data in memory, and the last
>>> one is
>>> >> responsible for spilling. Flink will first figure out how many memory
>>> it can
>>> >> use during the in-memory sort, and manage them as MemorySegments.
>>> Once these
>>> >> memory runs out, the sorting thread will take over these memory and
>>> do the
>>> >> in-memory sorting (For more details about in-memory sorting, you can
>>> see
>>> >> NormalizedKeySorter). After this, the spilling thread will write this
>>> sorted
>>> >> data to disk and make these memory available again for reading. This
>>> will
>>> >> repeated until all data has been processed.
>>> >> Normally, the data will be read twice (one from source, and one from
>>> disk)
>>> >> and write once, but if you spilled too much files, flink will first
>>> merge
>>> >> some all the files and make sure the last merge step will not exceed
>>> some
>>> >> limit (default 128). Hope this can help you.
>>> >>
>>> >> Best,
>>> >> Kurt
>>> >>
>>> >> On Fri, Apr 7, 2017 at 4:20 PM, Robert Schmidtke <
>>> ro.schmidtke@gmail.com>
>>> >> wrote:
>>> >>>
>>> >>> Hi,
>>> >>>
>>> >>> I'm currently examining the I/O patterns of Flink, and I'd like to
>>> know
>>> >>> when/how Flink goes to disk. Let me give an introduction of what I
>>> have done
>>> >>> so far.
>>> >>>
>>> >>> I am running TeraGen (from the Hadoop examples package) + TeraSort
>>> >>> (https://github.com/robert-schmidtke/terasort) on a 16 node
>>> cluster, each
>>> >>> node with 64 GiB of memory, 2x32 cores, and roughly half a terabyte
>>> of disk.
>>> >>> I'm using YARN and HDFS. The underlying file system is XFS.
>>> >>>
>>> >>> Now before running TeraGen and TeraSort, I reset the XFS counters to
>>> >>> zero, and after TeraGen + TeraSort are finished, I dump the XFS
>>> counters
>>> >>> again. Accumulated over the entire cluster I get 3 TiB of writes and
>>> 3.2 TiB
>>> >>> of reads. What I'd have expected would be 2 TiB of writes (1 for
>>> TeraGen, 1
>>> >>> for TeraSort) and 1 TiB of reads (during TeraSort).
>>> >>>
>>> >>> Unsatisfied by the coarseness of these numbers I developed an HDFS
>>> >>> wrapper that logs file system statistics for each call to
>>> hdfs://..., such
>>> >>> as start time/end time, no. of bytes read/written etc. I can plot
>>> these
>>> >>> numbers and see what I expect: during TeraGen I have 1 TiB of writes
>>> to
>>> >>> hdfs://..., during TeraSort I have 1 TiB of reads from and 1 TiB of
>>> writes
>>> >>> to hdfs://... So far, so good.
>>> >>>
>>> >>> Now this still did not explain the disk I/O, so I added bytecode
>>> >>> instrumentation to a range of Java classes, like FileIn/OutputStream,
>>> >>> RandomAccessFile, FileChannel, ZipFile, multiple *Buffer classes for
>>> memory
>>> >>> mapped files etc., and have the same statistics: start/end of a read
>>> >>> from/write to disk, no. of bytes involved and such. I can plot these
>>> numbers
>>> >>> too and see that the HDFS JVMs write 1 TiB of data to disk during
>>> TeraGen
>>> >>> (expected) and read and write 1 TiB from and to disk during TeraSort
>>> >>> (expected).
>>> >>>
>>> >>> Sorry for the enormous introduction, but now there's finally the
>>> >>> interesting part: Flink's JVMs read from and write to disk 1 TiB of
>>> data
>>> >>> each during TeraSort. I'm suspecting there is some sort of spilling
>>> >>> involved, potentially because I have not done the setup properly.
>>> But that
>>> >>> is not the crucial point: my statistics give a total of 3 TiB of
>>> writes to
>>> >>> disk (2 TiB for HDFS, 1 TiB for Flink), which agrees with the XFS
>>> counters
>>> >>> from above. However, my statistics only give 2 TiB of reads from
>>> disk (1 TiB
>>> >>> for HDFS, 1 TiB for Flink), so I'm missing an entire TiB of reads
>>> from disk
>>> >>> somewhere. I have done the same with Hadoop TeraSort, and there I'm
>>> not
>>> >>> missing any data, meaning my statistics agree with XFS for TeraSort
>>> on
>>> >>> Hadoop, which is why I suspect there are some cases where Flink goes
>>> to disk
>>> >>> without me noticing it.
>>> >>>
>>> >>> Therefore here finally the question: in which cases does Flink go to
>>> >>> disk, and how does it do so (meaning precisely which Java classes are
>>> >>> involved, so I can check my bytecode instrumentation)? This would
>>> also
>>> >>> include any kind of resource distribution via HDFS/YARN I guess
>>> (like JAR
>>> >>> files and I don't know what). Seeing that I'm missing an amount of
>>> data
>>> >>> equal to the size of my input set I'd suspect there must be some
>>> sort of
>>> >>> shuffling/spilling at play here, but I'm not sure. Maybe there is
>>> also some
>>> >>> sort of remote I/O involved via sockets or so that I'm missing.
>>> >>>
>>> >>> Any hints as to where Flink might incur disk I/O are greatly
>>> appreciated!
>>> >>> I'm also happy with doing the digging myself, once pointed to the
>>> proper
>>> >>> packages in the Apache Flink source tree (I have done my fair share
>>> of
>>> >>> inspection already, but could not be sure whether or not I have
>>> missed
>>> >>> something). Thanks a lot in advance!
>>> >>>
>>> >>> Robert
>>> >>>
>>> >>> --
>>> >>> My GPG Key ID: 336E2680
>>> >>
>>> >>
>>> >
>>> >
>>> >
>>> > --
>>> > My GPG Key ID: 336E2680
>>>
>>
>>
>>
>> --
>> My GPG Key ID: 336E2680
>>
>
>


-- 
My GPG Key ID: 336E2680

Re: Disk I/O in Flink

Posted by Martin Eden <ma...@gmail.com>.
Hi Robert,

Any updates on the below for the community?

Thanks,
M

On Tue, Apr 25, 2017 at 8:50 AM, Robert Schmidtke <ro...@gmail.com>
wrote:

> Hi Ufuk, thanks for coming back to me on this.
>
> The records are 100 bytes in size, the benchmark being TeraSort, so that
> should not be an issue. I have played around with the input size, and here
> are my observations:
>
> 128 GiB input: 0 Spilling in Flink.
> 256 GiB input: 88 GiB Spilling in Flink (so 88 GiB of reads, 88 GiB of
> writes), and my instrumentation covers all of it.
> 384 GiB input: 391 GiB Spilling in Flink, and I cover all of it.
> 512 GiB input: 522 GiB Spilling in Flink, but I miss 140 GiB of it.
> 640 GiB input: 653 GiB Spilling in Flink, but I miss 281 GiB of it.
> 768 GiB input: 784 GiB Spilling in Flink, but I miss 490 GiB of it.
> 896 GiB input: 914 GiB Spilling in Flink, but I miss 662 GiB of it.
> 1024 GiB input: 1045 GiB Spilling in Flink, but I miss 968 GiB of it.
>
> So regardless of how well configured my system is and spilling is even
> necessary, it seems that with larger spilling amounts, the way the data is
> spilled changes (and I start missing larger and larger portions of I/O
> until almost 100%).
> Now since I have written the instrumentation myself, I cannot guarantee
> that it is flawless and I might have missed something.
> I'm currently looking into how the file channels are being accessed in
> parallel by multiple threads, which I cover as well and my tests verify it,
> but maybe there are special access patterns here.
>
> Robert
>
> On Mon, Apr 24, 2017 at 2:25 PM, Ufuk Celebi <uc...@apache.org> wrote:
>
>> Hey Robert,
>>
>> for batch that should cover the relevant spilling code. If the records
>> are >= 5 MB, the SpillingAdaptiveSpanningRecordDeserializer will spill
>> incoming records as well. But that should be covered by the
>> FileChannel instrumentation as well?
>>
>> – Ufuk
>>
>>
>> On Tue, Apr 18, 2017 at 3:57 PM, Robert Schmidtke
>> <ro...@gmail.com> wrote:
>> > Hi,
>> >
>> > I have already looked at the UnilateralSortMerger, concluding that all
>> I/O
>> > eventually goes via SegmentReadRequest and SegmentWriteRequest (which in
>> > turn use java.nio.channels.FileChannel) in AsynchronousFileIOChannel.
>> Are
>> > there more interaction points between Flink and the underlying file
>> system
>> > that I might want to consider?
>> >
>> > Thanks!
>> > Robert
>> >
>> > On Fri, Apr 7, 2017 at 5:02 PM, Kurt Young <yk...@gmail.com> wrote:
>> >>
>> >> Hi,
>> >>
>> >> You probably want check out UnilateralSortMerger.java, this is the
>> class
>> >> which is responsible for external sort for flink. Here is a short
>> >> description for how it works: there are totally 3 threads working
>> together,
>> >> one for reading, one for sorting partial data in memory, and the last
>> one is
>> >> responsible for spilling. Flink will first figure out how many memory
>> it can
>> >> use during the in-memory sort, and manage them as MemorySegments. Once
>> these
>> >> memory runs out, the sorting thread will take over these memory and do
>> the
>> >> in-memory sorting (For more details about in-memory sorting, you can
>> see
>> >> NormalizedKeySorter). After this, the spilling thread will write this
>> sorted
>> >> data to disk and make these memory available again for reading. This
>> will
>> >> repeated until all data has been processed.
>> >> Normally, the data will be read twice (one from source, and one from
>> disk)
>> >> and write once, but if you spilled too much files, flink will first
>> merge
>> >> some all the files and make sure the last merge step will not exceed
>> some
>> >> limit (default 128). Hope this can help you.
>> >>
>> >> Best,
>> >> Kurt
>> >>
>> >> On Fri, Apr 7, 2017 at 4:20 PM, Robert Schmidtke <
>> ro.schmidtke@gmail.com>
>> >> wrote:
>> >>>
>> >>> Hi,
>> >>>
>> >>> I'm currently examining the I/O patterns of Flink, and I'd like to
>> know
>> >>> when/how Flink goes to disk. Let me give an introduction of what I
>> have done
>> >>> so far.
>> >>>
>> >>> I am running TeraGen (from the Hadoop examples package) + TeraSort
>> >>> (https://github.com/robert-schmidtke/terasort) on a 16 node cluster,
>> each
>> >>> node with 64 GiB of memory, 2x32 cores, and roughly half a terabyte
>> of disk.
>> >>> I'm using YARN and HDFS. The underlying file system is XFS.
>> >>>
>> >>> Now before running TeraGen and TeraSort, I reset the XFS counters to
>> >>> zero, and after TeraGen + TeraSort are finished, I dump the XFS
>> counters
>> >>> again. Accumulated over the entire cluster I get 3 TiB of writes and
>> 3.2 TiB
>> >>> of reads. What I'd have expected would be 2 TiB of writes (1 for
>> TeraGen, 1
>> >>> for TeraSort) and 1 TiB of reads (during TeraSort).
>> >>>
>> >>> Unsatisfied by the coarseness of these numbers I developed an HDFS
>> >>> wrapper that logs file system statistics for each call to hdfs://...,
>> such
>> >>> as start time/end time, no. of bytes read/written etc. I can plot
>> these
>> >>> numbers and see what I expect: during TeraGen I have 1 TiB of writes
>> to
>> >>> hdfs://..., during TeraSort I have 1 TiB of reads from and 1 TiB of
>> writes
>> >>> to hdfs://... So far, so good.
>> >>>
>> >>> Now this still did not explain the disk I/O, so I added bytecode
>> >>> instrumentation to a range of Java classes, like FileIn/OutputStream,
>> >>> RandomAccessFile, FileChannel, ZipFile, multiple *Buffer classes for
>> memory
>> >>> mapped files etc., and have the same statistics: start/end of a read
>> >>> from/write to disk, no. of bytes involved and such. I can plot these
>> numbers
>> >>> too and see that the HDFS JVMs write 1 TiB of data to disk during
>> TeraGen
>> >>> (expected) and read and write 1 TiB from and to disk during TeraSort
>> >>> (expected).
>> >>>
>> >>> Sorry for the enormous introduction, but now there's finally the
>> >>> interesting part: Flink's JVMs read from and write to disk 1 TiB of
>> data
>> >>> each during TeraSort. I'm suspecting there is some sort of spilling
>> >>> involved, potentially because I have not done the setup properly. But
>> that
>> >>> is not the crucial point: my statistics give a total of 3 TiB of
>> writes to
>> >>> disk (2 TiB for HDFS, 1 TiB for Flink), which agrees with the XFS
>> counters
>> >>> from above. However, my statistics only give 2 TiB of reads from disk
>> (1 TiB
>> >>> for HDFS, 1 TiB for Flink), so I'm missing an entire TiB of reads
>> from disk
>> >>> somewhere. I have done the same with Hadoop TeraSort, and there I'm
>> not
>> >>> missing any data, meaning my statistics agree with XFS for TeraSort on
>> >>> Hadoop, which is why I suspect there are some cases where Flink goes
>> to disk
>> >>> without me noticing it.
>> >>>
>> >>> Therefore here finally the question: in which cases does Flink go to
>> >>> disk, and how does it do so (meaning precisely which Java classes are
>> >>> involved, so I can check my bytecode instrumentation)? This would also
>> >>> include any kind of resource distribution via HDFS/YARN I guess (like
>> JAR
>> >>> files and I don't know what). Seeing that I'm missing an amount of
>> data
>> >>> equal to the size of my input set I'd suspect there must be some sort
>> of
>> >>> shuffling/spilling at play here, but I'm not sure. Maybe there is
>> also some
>> >>> sort of remote I/O involved via sockets or so that I'm missing.
>> >>>
>> >>> Any hints as to where Flink might incur disk I/O are greatly
>> appreciated!
>> >>> I'm also happy with doing the digging myself, once pointed to the
>> proper
>> >>> packages in the Apache Flink source tree (I have done my fair share of
>> >>> inspection already, but could not be sure whether or not I have missed
>> >>> something). Thanks a lot in advance!
>> >>>
>> >>> Robert
>> >>>
>> >>> --
>> >>> My GPG Key ID: 336E2680
>> >>
>> >>
>> >
>> >
>> >
>> > --
>> > My GPG Key ID: 336E2680
>>
>
>
>
> --
> My GPG Key ID: 336E2680
>

Re: Disk I/O in Flink

Posted by Robert Schmidtke <ro...@gmail.com>.
Hi Ufuk, thanks for coming back to me on this.

The records are 100 bytes in size, the benchmark being TeraSort, so that
should not be an issue. I have played around with the input size, and here
are my observations:

128 GiB input: 0 Spilling in Flink.
256 GiB input: 88 GiB Spilling in Flink (so 88 GiB of reads, 88 GiB of
writes), and my instrumentation covers all of it.
384 GiB input: 391 GiB Spilling in Flink, and I cover all of it.
512 GiB input: 522 GiB Spilling in Flink, but I miss 140 GiB of it.
640 GiB input: 653 GiB Spilling in Flink, but I miss 281 GiB of it.
768 GiB input: 784 GiB Spilling in Flink, but I miss 490 GiB of it.
896 GiB input: 914 GiB Spilling in Flink, but I miss 662 GiB of it.
1024 GiB input: 1045 GiB Spilling in Flink, but I miss 968 GiB of it.

So regardless of how well configured my system is and spilling is even
necessary, it seems that with larger spilling amounts, the way the data is
spilled changes (and I start missing larger and larger portions of I/O
until almost 100%).
Now since I have written the instrumentation myself, I cannot guarantee
that it is flawless and I might have missed something.
I'm currently looking into how the file channels are being accessed in
parallel by multiple threads, which I cover as well and my tests verify it,
but maybe there are special access patterns here.

Robert

On Mon, Apr 24, 2017 at 2:25 PM, Ufuk Celebi <uc...@apache.org> wrote:

> Hey Robert,
>
> for batch that should cover the relevant spilling code. If the records
> are >= 5 MB, the SpillingAdaptiveSpanningRecordDeserializer will spill
> incoming records as well. But that should be covered by the
> FileChannel instrumentation as well?
>
> – Ufuk
>
>
> On Tue, Apr 18, 2017 at 3:57 PM, Robert Schmidtke
> <ro...@gmail.com> wrote:
> > Hi,
> >
> > I have already looked at the UnilateralSortMerger, concluding that all
> I/O
> > eventually goes via SegmentReadRequest and SegmentWriteRequest (which in
> > turn use java.nio.channels.FileChannel) in AsynchronousFileIOChannel. Are
> > there more interaction points between Flink and the underlying file
> system
> > that I might want to consider?
> >
> > Thanks!
> > Robert
> >
> > On Fri, Apr 7, 2017 at 5:02 PM, Kurt Young <yk...@gmail.com> wrote:
> >>
> >> Hi,
> >>
> >> You probably want check out UnilateralSortMerger.java, this is the class
> >> which is responsible for external sort for flink. Here is a short
> >> description for how it works: there are totally 3 threads working
> together,
> >> one for reading, one for sorting partial data in memory, and the last
> one is
> >> responsible for spilling. Flink will first figure out how many memory
> it can
> >> use during the in-memory sort, and manage them as MemorySegments. Once
> these
> >> memory runs out, the sorting thread will take over these memory and do
> the
> >> in-memory sorting (For more details about in-memory sorting, you can see
> >> NormalizedKeySorter). After this, the spilling thread will write this
> sorted
> >> data to disk and make these memory available again for reading. This
> will
> >> repeated until all data has been processed.
> >> Normally, the data will be read twice (one from source, and one from
> disk)
> >> and write once, but if you spilled too much files, flink will first
> merge
> >> some all the files and make sure the last merge step will not exceed
> some
> >> limit (default 128). Hope this can help you.
> >>
> >> Best,
> >> Kurt
> >>
> >> On Fri, Apr 7, 2017 at 4:20 PM, Robert Schmidtke <
> ro.schmidtke@gmail.com>
> >> wrote:
> >>>
> >>> Hi,
> >>>
> >>> I'm currently examining the I/O patterns of Flink, and I'd like to know
> >>> when/how Flink goes to disk. Let me give an introduction of what I
> have done
> >>> so far.
> >>>
> >>> I am running TeraGen (from the Hadoop examples package) + TeraSort
> >>> (https://github.com/robert-schmidtke/terasort) on a 16 node cluster,
> each
> >>> node with 64 GiB of memory, 2x32 cores, and roughly half a terabyte of
> disk.
> >>> I'm using YARN and HDFS. The underlying file system is XFS.
> >>>
> >>> Now before running TeraGen and TeraSort, I reset the XFS counters to
> >>> zero, and after TeraGen + TeraSort are finished, I dump the XFS
> counters
> >>> again. Accumulated over the entire cluster I get 3 TiB of writes and
> 3.2 TiB
> >>> of reads. What I'd have expected would be 2 TiB of writes (1 for
> TeraGen, 1
> >>> for TeraSort) and 1 TiB of reads (during TeraSort).
> >>>
> >>> Unsatisfied by the coarseness of these numbers I developed an HDFS
> >>> wrapper that logs file system statistics for each call to hdfs://...,
> such
> >>> as start time/end time, no. of bytes read/written etc. I can plot these
> >>> numbers and see what I expect: during TeraGen I have 1 TiB of writes to
> >>> hdfs://..., during TeraSort I have 1 TiB of reads from and 1 TiB of
> writes
> >>> to hdfs://... So far, so good.
> >>>
> >>> Now this still did not explain the disk I/O, so I added bytecode
> >>> instrumentation to a range of Java classes, like FileIn/OutputStream,
> >>> RandomAccessFile, FileChannel, ZipFile, multiple *Buffer classes for
> memory
> >>> mapped files etc., and have the same statistics: start/end of a read
> >>> from/write to disk, no. of bytes involved and such. I can plot these
> numbers
> >>> too and see that the HDFS JVMs write 1 TiB of data to disk during
> TeraGen
> >>> (expected) and read and write 1 TiB from and to disk during TeraSort
> >>> (expected).
> >>>
> >>> Sorry for the enormous introduction, but now there's finally the
> >>> interesting part: Flink's JVMs read from and write to disk 1 TiB of
> data
> >>> each during TeraSort. I'm suspecting there is some sort of spilling
> >>> involved, potentially because I have not done the setup properly. But
> that
> >>> is not the crucial point: my statistics give a total of 3 TiB of
> writes to
> >>> disk (2 TiB for HDFS, 1 TiB for Flink), which agrees with the XFS
> counters
> >>> from above. However, my statistics only give 2 TiB of reads from disk
> (1 TiB
> >>> for HDFS, 1 TiB for Flink), so I'm missing an entire TiB of reads from
> disk
> >>> somewhere. I have done the same with Hadoop TeraSort, and there I'm not
> >>> missing any data, meaning my statistics agree with XFS for TeraSort on
> >>> Hadoop, which is why I suspect there are some cases where Flink goes
> to disk
> >>> without me noticing it.
> >>>
> >>> Therefore here finally the question: in which cases does Flink go to
> >>> disk, and how does it do so (meaning precisely which Java classes are
> >>> involved, so I can check my bytecode instrumentation)? This would also
> >>> include any kind of resource distribution via HDFS/YARN I guess (like
> JAR
> >>> files and I don't know what). Seeing that I'm missing an amount of data
> >>> equal to the size of my input set I'd suspect there must be some sort
> of
> >>> shuffling/spilling at play here, but I'm not sure. Maybe there is also
> some
> >>> sort of remote I/O involved via sockets or so that I'm missing.
> >>>
> >>> Any hints as to where Flink might incur disk I/O are greatly
> appreciated!
> >>> I'm also happy with doing the digging myself, once pointed to the
> proper
> >>> packages in the Apache Flink source tree (I have done my fair share of
> >>> inspection already, but could not be sure whether or not I have missed
> >>> something). Thanks a lot in advance!
> >>>
> >>> Robert
> >>>
> >>> --
> >>> My GPG Key ID: 336E2680
> >>
> >>
> >
> >
> >
> > --
> > My GPG Key ID: 336E2680
>



-- 
My GPG Key ID: 336E2680

Re: Disk I/O in Flink

Posted by Ufuk Celebi <uc...@apache.org>.
Hey Robert,

for batch that should cover the relevant spilling code. If the records
are >= 5 MB, the SpillingAdaptiveSpanningRecordDeserializer will spill
incoming records as well. But that should be covered by the
FileChannel instrumentation as well?

– Ufuk


On Tue, Apr 18, 2017 at 3:57 PM, Robert Schmidtke
<ro...@gmail.com> wrote:
> Hi,
>
> I have already looked at the UnilateralSortMerger, concluding that all I/O
> eventually goes via SegmentReadRequest and SegmentWriteRequest (which in
> turn use java.nio.channels.FileChannel) in AsynchronousFileIOChannel. Are
> there more interaction points between Flink and the underlying file system
> that I might want to consider?
>
> Thanks!
> Robert
>
> On Fri, Apr 7, 2017 at 5:02 PM, Kurt Young <yk...@gmail.com> wrote:
>>
>> Hi,
>>
>> You probably want check out UnilateralSortMerger.java, this is the class
>> which is responsible for external sort for flink. Here is a short
>> description for how it works: there are totally 3 threads working together,
>> one for reading, one for sorting partial data in memory, and the last one is
>> responsible for spilling. Flink will first figure out how many memory it can
>> use during the in-memory sort, and manage them as MemorySegments. Once these
>> memory runs out, the sorting thread will take over these memory and do the
>> in-memory sorting (For more details about in-memory sorting, you can see
>> NormalizedKeySorter). After this, the spilling thread will write this sorted
>> data to disk and make these memory available again for reading. This will
>> repeated until all data has been processed.
>> Normally, the data will be read twice (one from source, and one from disk)
>> and write once, but if you spilled too much files, flink will first merge
>> some all the files and make sure the last merge step will not exceed some
>> limit (default 128). Hope this can help you.
>>
>> Best,
>> Kurt
>>
>> On Fri, Apr 7, 2017 at 4:20 PM, Robert Schmidtke <ro...@gmail.com>
>> wrote:
>>>
>>> Hi,
>>>
>>> I'm currently examining the I/O patterns of Flink, and I'd like to know
>>> when/how Flink goes to disk. Let me give an introduction of what I have done
>>> so far.
>>>
>>> I am running TeraGen (from the Hadoop examples package) + TeraSort
>>> (https://github.com/robert-schmidtke/terasort) on a 16 node cluster, each
>>> node with 64 GiB of memory, 2x32 cores, and roughly half a terabyte of disk.
>>> I'm using YARN and HDFS. The underlying file system is XFS.
>>>
>>> Now before running TeraGen and TeraSort, I reset the XFS counters to
>>> zero, and after TeraGen + TeraSort are finished, I dump the XFS counters
>>> again. Accumulated over the entire cluster I get 3 TiB of writes and 3.2 TiB
>>> of reads. What I'd have expected would be 2 TiB of writes (1 for TeraGen, 1
>>> for TeraSort) and 1 TiB of reads (during TeraSort).
>>>
>>> Unsatisfied by the coarseness of these numbers I developed an HDFS
>>> wrapper that logs file system statistics for each call to hdfs://..., such
>>> as start time/end time, no. of bytes read/written etc. I can plot these
>>> numbers and see what I expect: during TeraGen I have 1 TiB of writes to
>>> hdfs://..., during TeraSort I have 1 TiB of reads from and 1 TiB of writes
>>> to hdfs://... So far, so good.
>>>
>>> Now this still did not explain the disk I/O, so I added bytecode
>>> instrumentation to a range of Java classes, like FileIn/OutputStream,
>>> RandomAccessFile, FileChannel, ZipFile, multiple *Buffer classes for memory
>>> mapped files etc., and have the same statistics: start/end of a read
>>> from/write to disk, no. of bytes involved and such. I can plot these numbers
>>> too and see that the HDFS JVMs write 1 TiB of data to disk during TeraGen
>>> (expected) and read and write 1 TiB from and to disk during TeraSort
>>> (expected).
>>>
>>> Sorry for the enormous introduction, but now there's finally the
>>> interesting part: Flink's JVMs read from and write to disk 1 TiB of data
>>> each during TeraSort. I'm suspecting there is some sort of spilling
>>> involved, potentially because I have not done the setup properly. But that
>>> is not the crucial point: my statistics give a total of 3 TiB of writes to
>>> disk (2 TiB for HDFS, 1 TiB for Flink), which agrees with the XFS counters
>>> from above. However, my statistics only give 2 TiB of reads from disk (1 TiB
>>> for HDFS, 1 TiB for Flink), so I'm missing an entire TiB of reads from disk
>>> somewhere. I have done the same with Hadoop TeraSort, and there I'm not
>>> missing any data, meaning my statistics agree with XFS for TeraSort on
>>> Hadoop, which is why I suspect there are some cases where Flink goes to disk
>>> without me noticing it.
>>>
>>> Therefore here finally the question: in which cases does Flink go to
>>> disk, and how does it do so (meaning precisely which Java classes are
>>> involved, so I can check my bytecode instrumentation)? This would also
>>> include any kind of resource distribution via HDFS/YARN I guess (like JAR
>>> files and I don't know what). Seeing that I'm missing an amount of data
>>> equal to the size of my input set I'd suspect there must be some sort of
>>> shuffling/spilling at play here, but I'm not sure. Maybe there is also some
>>> sort of remote I/O involved via sockets or so that I'm missing.
>>>
>>> Any hints as to where Flink might incur disk I/O are greatly appreciated!
>>> I'm also happy with doing the digging myself, once pointed to the proper
>>> packages in the Apache Flink source tree (I have done my fair share of
>>> inspection already, but could not be sure whether or not I have missed
>>> something). Thanks a lot in advance!
>>>
>>> Robert
>>>
>>> --
>>> My GPG Key ID: 336E2680
>>
>>
>
>
>
> --
> My GPG Key ID: 336E2680

Re: Disk I/O in Flink

Posted by Robert Schmidtke <ro...@gmail.com>.
Hi,

I have already looked at the UnilateralSortMerger, concluding that all I/O
eventually goes via SegmentReadRequest and SegmentWriteRequest (which in
turn use java.nio.channels.FileChannel) in AsynchronousFileIOChannel. Are
there more interaction points between Flink and the underlying file system
that I might want to consider?

Thanks!
Robert

On Fri, Apr 7, 2017 at 5:02 PM, Kurt Young <yk...@gmail.com> wrote:

> Hi,
>
> You probably want check out UnilateralSortMerger.java, this is the class
> which is responsible for external sort for flink. Here is a short
> description for how it works: there are totally 3 threads working together,
> one for reading, one for sorting partial data in memory, and the last one
> is responsible for spilling. Flink will first figure out how many memory it
> can use during the in-memory sort, and manage them as MemorySegments. Once
> these memory runs out, the sorting thread will take over these memory and
> do the in-memory sorting (For more details about in-memory sorting, you can
> see NormalizedKeySorter). After this, the spilling thread will write this
> sorted data to disk and make these memory available again for reading. This
> will repeated until all data has been processed.
> Normally, the data will be read twice (one from source, and one from disk)
> and write once, but if you spilled too much files, flink will first merge
> some all the files and make sure the last merge step will not exceed some
> limit (default 128). Hope this can help you.
>
> Best,
> Kurt
>
> On Fri, Apr 7, 2017 at 4:20 PM, Robert Schmidtke <ro...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I'm currently examining the I/O patterns of Flink, and I'd like to know
>> when/how Flink goes to disk. Let me give an introduction of what I have
>> done so far.
>>
>> I am running TeraGen (from the Hadoop examples package) + TeraSort (
>> https://github.com/robert-schmidtke/terasort) on a 16 node cluster, each
>> node with 64 GiB of memory, 2x32 cores, and roughly half a terabyte of
>> disk. I'm using YARN and HDFS. The underlying file system is XFS.
>>
>> Now before running TeraGen and TeraSort, I reset the XFS counters to
>> zero, and after TeraGen + TeraSort are finished, I dump the XFS counters
>> again. Accumulated over the entire cluster I get 3 TiB of writes and 3.2
>> TiB of reads. What I'd have expected would be 2 TiB of writes (1 for
>> TeraGen, 1 for TeraSort) and 1 TiB of reads (during TeraSort).
>>
>> Unsatisfied by the coarseness of these numbers I developed an HDFS
>> wrapper that logs file system statistics for each call to hdfs://..., such
>> as start time/end time, no. of bytes read/written etc. I can plot these
>> numbers and see what I expect: during TeraGen I have 1 TiB of writes to
>> hdfs://..., during TeraSort I have 1 TiB of reads from and 1 TiB of writes
>> to hdfs://... So far, so good.
>>
>> Now this still did not explain the disk I/O, so I added bytecode
>> instrumentation to a range of Java classes, like FileIn/OutputStream,
>> RandomAccessFile, FileChannel, ZipFile, multiple *Buffer classes for memory
>> mapped files etc., and have the same statistics: start/end of a read
>> from/write to disk, no. of bytes involved and such. I can plot these
>> numbers too and see that the HDFS JVMs write 1 TiB of data to disk during
>> TeraGen (expected) and read and write 1 TiB from and to disk during
>> TeraSort (expected).
>>
>> Sorry for the enormous introduction, but now there's finally the
>> interesting part: Flink's JVMs read from and write to disk 1 TiB of data
>> each during TeraSort. I'm suspecting there is some sort of spilling
>> involved, potentially because I have not done the setup properly. But that
>> is not the crucial point: my statistics give a total of 3 TiB of writes to
>> disk (2 TiB for HDFS, 1 TiB for Flink), which agrees with the XFS counters
>> from above. However, my statistics only give 2 TiB of reads from disk (1
>> TiB for HDFS, 1 TiB for Flink), so I'm missing an entire TiB of reads from
>> disk somewhere. I have done the same with Hadoop TeraSort, and there I'm
>> not missing any data, meaning my statistics agree with XFS for TeraSort on
>> Hadoop, which is why I suspect there are some cases where Flink goes to
>> disk without me noticing it.
>>
>> Therefore here finally the question: in which cases does Flink go to
>> disk, and how does it do so (meaning precisely which Java classes are
>> involved, so I can check my bytecode instrumentation)? This would also
>> include any kind of resource distribution via HDFS/YARN I guess (like JAR
>> files and I don't know what). Seeing that I'm missing an amount of data
>> equal to the size of my input set I'd suspect there must be some sort of
>> shuffling/spilling at play here, but I'm not sure. Maybe there is also some
>> sort of remote I/O involved via sockets or so that I'm missing.
>>
>> Any hints as to where Flink might incur disk I/O are greatly appreciated!
>> I'm also happy with doing the digging myself, once pointed to the proper
>> packages in the Apache Flink source tree (I have done my fair share of
>> inspection already, but could not be sure whether or not I have missed
>> something). Thanks a lot in advance!
>>
>> Robert
>>
>> --
>> My GPG Key ID: 336E2680
>>
>
>


-- 
My GPG Key ID: 336E2680