You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Robert Schmidtke <ro...@gmail.com> on 2017/05/18 08:01:26 UTC

Re: Disk I/O in Flink

Minor update: I have executed the flink-runtime tests on XFS, Lustre and
DVS (Cray DataWarp), and I observe divergences on XFS and Lustre, but not
on DVS. It turns out that cached reads are reported by the file systems as
well, so I don't think caching is an issue here. There might still be some
threading issues which I do not cover, and maybe for some reason DVS
serializes access, which is why my statistics and DVS agree to 100%. I'll
get more experiments going and report back.

Robert

On Sat, Apr 29, 2017 at 4:53 PM, Robert Schmidtke <ro...@gmail.com>
wrote:

> Hey Martin,
>
> I'm still on it. I have switched to analyzing the flink-runtime tests, as
> I observe similar divergence there. I'm not sure how long it'll take, but
> if I find something I'll make sure to let you all know :)
>
> Robert
>
> On Sat, Apr 29, 2017 at 3:12 PM, Martin Eden <ma...@gmail.com>
> wrote:
>
>> Hi Robert,
>>
>> Any updates on the below for the community?
>>
>> Thanks,
>> M
>>
>> On Tue, Apr 25, 2017 at 8:50 AM, Robert Schmidtke <ro.schmidtke@gmail.com
>> > wrote:
>>
>>> Hi Ufuk, thanks for coming back to me on this.
>>>
>>> The records are 100 bytes in size, the benchmark being TeraSort, so that
>>> should not be an issue. I have played around with the input size, and here
>>> are my observations:
>>>
>>> 128 GiB input: 0 Spilling in Flink.
>>> 256 GiB input: 88 GiB Spilling in Flink (so 88 GiB of reads, 88 GiB of
>>> writes), and my instrumentation covers all of it.
>>> 384 GiB input: 391 GiB Spilling in Flink, and I cover all of it.
>>> 512 GiB input: 522 GiB Spilling in Flink, but I miss 140 GiB of it.
>>> 640 GiB input: 653 GiB Spilling in Flink, but I miss 281 GiB of it.
>>> 768 GiB input: 784 GiB Spilling in Flink, but I miss 490 GiB of it.
>>> 896 GiB input: 914 GiB Spilling in Flink, but I miss 662 GiB of it.
>>> 1024 GiB input: 1045 GiB Spilling in Flink, but I miss 968 GiB of it.
>>>
>>> So regardless of how well configured my system is and spilling is even
>>> necessary, it seems that with larger spilling amounts, the way the data is
>>> spilled changes (and I start missing larger and larger portions of I/O
>>> until almost 100%).
>>> Now since I have written the instrumentation myself, I cannot guarantee
>>> that it is flawless and I might have missed something.
>>> I'm currently looking into how the file channels are being accessed in
>>> parallel by multiple threads, which I cover as well and my tests verify it,
>>> but maybe there are special access patterns here.
>>>
>>> Robert
>>>
>>> On Mon, Apr 24, 2017 at 2:25 PM, Ufuk Celebi <uc...@apache.org> wrote:
>>>
>>>> Hey Robert,
>>>>
>>>> for batch that should cover the relevant spilling code. If the records
>>>> are >= 5 MB, the SpillingAdaptiveSpanningRecordDeserializer will spill
>>>> incoming records as well. But that should be covered by the
>>>> FileChannel instrumentation as well?
>>>>
>>>> – Ufuk
>>>>
>>>>
>>>> On Tue, Apr 18, 2017 at 3:57 PM, Robert Schmidtke
>>>> <ro...@gmail.com> wrote:
>>>> > Hi,
>>>> >
>>>> > I have already looked at the UnilateralSortMerger, concluding that
>>>> all I/O
>>>> > eventually goes via SegmentReadRequest and SegmentWriteRequest (which
>>>> in
>>>> > turn use java.nio.channels.FileChannel) in AsynchronousFileIOChannel.
>>>> Are
>>>> > there more interaction points between Flink and the underlying file
>>>> system
>>>> > that I might want to consider?
>>>> >
>>>> > Thanks!
>>>> > Robert
>>>> >
>>>> > On Fri, Apr 7, 2017 at 5:02 PM, Kurt Young <yk...@gmail.com> wrote:
>>>> >>
>>>> >> Hi,
>>>> >>
>>>> >> You probably want check out UnilateralSortMerger.java, this is the
>>>> class
>>>> >> which is responsible for external sort for flink. Here is a short
>>>> >> description for how it works: there are totally 3 threads working
>>>> together,
>>>> >> one for reading, one for sorting partial data in memory, and the
>>>> last one is
>>>> >> responsible for spilling. Flink will first figure out how many
>>>> memory it can
>>>> >> use during the in-memory sort, and manage them as MemorySegments.
>>>> Once these
>>>> >> memory runs out, the sorting thread will take over these memory and
>>>> do the
>>>> >> in-memory sorting (For more details about in-memory sorting, you can
>>>> see
>>>> >> NormalizedKeySorter). After this, the spilling thread will write
>>>> this sorted
>>>> >> data to disk and make these memory available again for reading. This
>>>> will
>>>> >> repeated until all data has been processed.
>>>> >> Normally, the data will be read twice (one from source, and one from
>>>> disk)
>>>> >> and write once, but if you spilled too much files, flink will first
>>>> merge
>>>> >> some all the files and make sure the last merge step will not exceed
>>>> some
>>>> >> limit (default 128). Hope this can help you.
>>>> >>
>>>> >> Best,
>>>> >> Kurt
>>>> >>
>>>> >> On Fri, Apr 7, 2017 at 4:20 PM, Robert Schmidtke <
>>>> ro.schmidtke@gmail.com>
>>>> >> wrote:
>>>> >>>
>>>> >>> Hi,
>>>> >>>
>>>> >>> I'm currently examining the I/O patterns of Flink, and I'd like to
>>>> know
>>>> >>> when/how Flink goes to disk. Let me give an introduction of what I
>>>> have done
>>>> >>> so far.
>>>> >>>
>>>> >>> I am running TeraGen (from the Hadoop examples package) + TeraSort
>>>> >>> (https://github.com/robert-schmidtke/terasort) on a 16 node
>>>> cluster, each
>>>> >>> node with 64 GiB of memory, 2x32 cores, and roughly half a terabyte
>>>> of disk.
>>>> >>> I'm using YARN and HDFS. The underlying file system is XFS.
>>>> >>>
>>>> >>> Now before running TeraGen and TeraSort, I reset the XFS counters to
>>>> >>> zero, and after TeraGen + TeraSort are finished, I dump the XFS
>>>> counters
>>>> >>> again. Accumulated over the entire cluster I get 3 TiB of writes
>>>> and 3.2 TiB
>>>> >>> of reads. What I'd have expected would be 2 TiB of writes (1 for
>>>> TeraGen, 1
>>>> >>> for TeraSort) and 1 TiB of reads (during TeraSort).
>>>> >>>
>>>> >>> Unsatisfied by the coarseness of these numbers I developed an HDFS
>>>> >>> wrapper that logs file system statistics for each call to
>>>> hdfs://..., such
>>>> >>> as start time/end time, no. of bytes read/written etc. I can plot
>>>> these
>>>> >>> numbers and see what I expect: during TeraGen I have 1 TiB of
>>>> writes to
>>>> >>> hdfs://..., during TeraSort I have 1 TiB of reads from and 1 TiB of
>>>> writes
>>>> >>> to hdfs://... So far, so good.
>>>> >>>
>>>> >>> Now this still did not explain the disk I/O, so I added bytecode
>>>> >>> instrumentation to a range of Java classes, like
>>>> FileIn/OutputStream,
>>>> >>> RandomAccessFile, FileChannel, ZipFile, multiple *Buffer classes
>>>> for memory
>>>> >>> mapped files etc., and have the same statistics: start/end of a read
>>>> >>> from/write to disk, no. of bytes involved and such. I can plot
>>>> these numbers
>>>> >>> too and see that the HDFS JVMs write 1 TiB of data to disk during
>>>> TeraGen
>>>> >>> (expected) and read and write 1 TiB from and to disk during TeraSort
>>>> >>> (expected).
>>>> >>>
>>>> >>> Sorry for the enormous introduction, but now there's finally the
>>>> >>> interesting part: Flink's JVMs read from and write to disk 1 TiB of
>>>> data
>>>> >>> each during TeraSort. I'm suspecting there is some sort of spilling
>>>> >>> involved, potentially because I have not done the setup properly.
>>>> But that
>>>> >>> is not the crucial point: my statistics give a total of 3 TiB of
>>>> writes to
>>>> >>> disk (2 TiB for HDFS, 1 TiB for Flink), which agrees with the XFS
>>>> counters
>>>> >>> from above. However, my statistics only give 2 TiB of reads from
>>>> disk (1 TiB
>>>> >>> for HDFS, 1 TiB for Flink), so I'm missing an entire TiB of reads
>>>> from disk
>>>> >>> somewhere. I have done the same with Hadoop TeraSort, and there I'm
>>>> not
>>>> >>> missing any data, meaning my statistics agree with XFS for TeraSort
>>>> on
>>>> >>> Hadoop, which is why I suspect there are some cases where Flink
>>>> goes to disk
>>>> >>> without me noticing it.
>>>> >>>
>>>> >>> Therefore here finally the question: in which cases does Flink go to
>>>> >>> disk, and how does it do so (meaning precisely which Java classes
>>>> are
>>>> >>> involved, so I can check my bytecode instrumentation)? This would
>>>> also
>>>> >>> include any kind of resource distribution via HDFS/YARN I guess
>>>> (like JAR
>>>> >>> files and I don't know what). Seeing that I'm missing an amount of
>>>> data
>>>> >>> equal to the size of my input set I'd suspect there must be some
>>>> sort of
>>>> >>> shuffling/spilling at play here, but I'm not sure. Maybe there is
>>>> also some
>>>> >>> sort of remote I/O involved via sockets or so that I'm missing.
>>>> >>>
>>>> >>> Any hints as to where Flink might incur disk I/O are greatly
>>>> appreciated!
>>>> >>> I'm also happy with doing the digging myself, once pointed to the
>>>> proper
>>>> >>> packages in the Apache Flink source tree (I have done my fair share
>>>> of
>>>> >>> inspection already, but could not be sure whether or not I have
>>>> missed
>>>> >>> something). Thanks a lot in advance!
>>>> >>>
>>>> >>> Robert
>>>> >>>
>>>> >>> --
>>>> >>> My GPG Key ID: 336E2680
>>>> >>
>>>> >>
>>>> >
>>>> >
>>>> >
>>>> > --
>>>> > My GPG Key ID: 336E2680
>>>>
>>>
>>>
>>>
>>> --
>>> My GPG Key ID: 336E2680
>>>
>>
>>
>
>
> --
> My GPG Key ID: 336E2680
>



-- 
My GPG Key ID: 336E2680