You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Robert Schmidtke <ro...@gmail.com> on 2017/01/04 15:38:26 UTC

Re: Reading worker-local input files

Hi Fabian,

thanks for your directions! They worked flawlessly. I am aware of the
reduced robustness, but then again my input is only available on each
worker and not replicated. In case anyone is wondering, here is how I did
it:
*https://github.com/robert-schmidtke/hdfs-statistics-adapter/tree/2a456a832c9d84b324a966c431171f761f3444f5
<https://github.com/robert-schmidtke/hdfs-statistics-adapter/tree/2a456a832c9d84b324a966c431171f761f3444f5>*

Thanks again!
Robert

On Tue, Dec 27, 2016 at 4:36 PM, Fabian Hueske <fh...@gmail.com> wrote:

>
> Hi Robert,
>
> this is indeed a bit tricky to do. The problem is mostly with the
> generation of the input splits, setup of Flink, and the scheduling of tasks.
>
> 1) you have to ensure that on each worker at least one DataSource task is
> scheduled. The easiest way to do this is to have a bare metal setup (no
> YARN) and a single TaskManager per worker. Each TM should have the same
> number of slots and the DataSource should have a parallelism of #TMs *
> slots. This will ensure that the same number of DataSource tasks is started
> on each worker.
>
> 2) you need to tweak the input split generation. Flink's FileInputFormat
> assume that it can access all files to be read via a distributed file
> system. Your InputFormat should have a parameter for the list of
> taskmanager (hostnames, IP addresses) and the number of slots per TM. The
> InputFormat.createInputSplits() should create one input split for each
> parallel task. Each split should have (hostname, local index)
>
> 3) you need to tweak the input split assignment. You need to provide a
> custom input split provider that ensures that splits are only assigned to
> the correct task manager. Otherwise it might happen that a TaskManager
> processes the split of another TM and some data is read twice while other
> data is not read at all.
>
> 4) once a split is assigned to a task the InputFormat.open() method is
> called. Based on the local index, the task should decide which files (or
> parts of files) it needs to read. This decision must be deterministic (only
> depend on local index) and ensure that all data (files / parts of files)
> are read exactly once (you'll need the number of slots per host for that).
>
> You see, this is not trivial. Moreover, such a setup is not flexible,
> quite fragile, and not fault tolerant.
> Since you need to read local files are not available anywhere else, your
> job will fail if a TM goes down.
>
> If possible, I would recommend to move the data into a distributed file
> system.
>
> Best,
> Fabian
>
> 2016-12-27 13:04 GMT+01:00 Robert Schmidtke <ro...@gmail.com>:
>
>> Hi everyone,
>>
>> I'm using Flink and/or Hadoop on my cluster, and I'm having them generate
>> log data in each worker node's /local folder (regular mount point). Now I
>> would like to process these files using Flink, but I'm not quite sure how I
>> could tell Flink to use each worker node's /local folder as input path,
>> because I'd expect Flink to look in the /local folder of the submitting
>> node only. Do I have to put these files into HDFS or is there a way to tell
>> Flink the file:///local file URI refers to worker-local data? Thanks in
>> advance for any hints and best
>>
>> Robert
>>
>> --
>> My GPG Key ID: 336E2680
>>
>
>


-- 
My GPG Key ID: 336E2680

On Tue, Dec 27, 2016 at 4:36 PM, Fabian Hueske <fh...@gmail.com> wrote:

>
> Hi Robert,
>
> this is indeed a bit tricky to do. The problem is mostly with the
> generation of the input splits, setup of Flink, and the scheduling of tasks.
>
> 1) you have to ensure that on each worker at least one DataSource task is
> scheduled. The easiest way to do this is to have a bare metal setup (no
> YARN) and a single TaskManager per worker. Each TM should have the same
> number of slots and the DataSource should have a parallelism of #TMs *
> slots. This will ensure that the same number of DataSource tasks is started
> on each worker.
>
> 2) you need to tweak the input split generation. Flink's FileInputFormat
> assume that it can access all files to be read via a distributed file
> system. Your InputFormat should have a parameter for the list of
> taskmanager (hostnames, IP addresses) and the number of slots per TM. The
> InputFormat.createInputSplits() should create one input split for each
> parallel task. Each split should have (hostname, local index)
>
> 3) you need to tweak the input split assignment. You need to provide a
> custom input split provider that ensures that splits are only assigned to
> the correct task manager. Otherwise it might happen that a TaskManager
> processes the split of another TM and some data is read twice while other
> data is not read at all.
>
> 4) once a split is assigned to a task the InputFormat.open() method is
> called. Based on the local index, the task should decide which files (or
> parts of files) it needs to read. This decision must be deterministic (only
> depend on local index) and ensure that all data (files / parts of files)
> are read exactly once (you'll need the number of slots per host for that).
>
> You see, this is not trivial. Moreover, such a setup is not flexible,
> quite fragile, and not fault tolerant.
> Since you need to read local files are not available anywhere else, your
> job will fail if a TM goes down.
>
> If possible, I would recommend to move the data into a distributed file
> system.
>
> Best,
> Fabian
>
> 2016-12-27 13:04 GMT+01:00 Robert Schmidtke <ro...@gmail.com>:
>
>> Hi everyone,
>>
>> I'm using Flink and/or Hadoop on my cluster, and I'm having them generate
>> log data in each worker node's /local folder (regular mount point). Now I
>> would like to process these files using Flink, but I'm not quite sure how I
>> could tell Flink to use each worker node's /local folder as input path,
>> because I'd expect Flink to look in the /local folder of the submitting
>> node only. Do I have to put these files into HDFS or is there a way to tell
>> Flink the file:///local file URI refers to worker-local data? Thanks in
>> advance for any hints and best
>>
>> Robert
>>
>> --
>> My GPG Key ID: 336E2680
>>
>
>


-- 
My GPG Key ID: 336E2680