You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Benjamin Kadish <be...@adfin.com> on 2016/03/10 16:52:27 UTC

Fwd: Flink loading an S3 File out of order

I am trying to read a file from S3 in the correct order. It seems to be
that Flink is downloading the file out of order, or at least its
constructing the DataSet out of order. I
tried using hadoop to download the file and it seemed to download it in
order.
I am able to reproduce the problem with the following line:

env.readTextFileWithValue(conf.options.get(S3FileName).get)

   .writeAsText(s"${conf.output}/output",writeMode =
FileSystem.WriteMode.OVERWRITE)

The output looks something like

line 1001
line 1002
...
line 1304
line 1

Is there a way to guarantee order?

-- 
Benjamin Kadish
(260) 441-6159

Re: Flink loading an S3 File out of order

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Benjamin,

Flink reads data usually in parallel. This is done by splitting the input
(e.g., a file) into several input splits. Each input split is independently
processed. Since splits are usually concurrently processed by more than one
task, Flink does not care about the order by default.

You can implement a special InputFormat that uses a custom
InputSplitAssigner to ensure that splits are handed out in order.
This would requires a bit of coding though.

A DataSet is usually distributed among multiple partitions/tasks and does
also not have the concept (complete) order. It is possible to sort the data
of a data set in each individual partition by calling
DataSet.sortPartition(key, order). If you do that with a parallelism of one
(DataSet.sortPartition().setParallelism(1)), you'll have a fully ordered
data set, however only on one machine.
Flink does also support range partitioning (DataSet.partitionByRange()) in
case you want to sort the data in parallel.

Best, Fabian

2016-03-10 16:52 GMT+01:00 Benjamin Kadish <be...@adfin.com>:

> I am trying to read a file from S3 in the correct order. It seems to be
> that Flink is downloading the file out of order, or at least its
> constructing the DataSet out of order. I
> tried using hadoop to download the file and it seemed to download it in
> order.
> I am able to reproduce the problem with the following line:
>
> env.readTextFileWithValue(conf.options.get(S3FileName).get)
>
>    .writeAsText(s"${conf.output}/output",writeMode = FileSystem.WriteMode.OVERWRITE)
>
> The output looks something like
>
> line 1001
> line 1002
> ...
> line 1304
> line 1
>
> Is there a way to guarantee order?
>
> --
> Benjamin Kadish
> (260) 441-6159
>