You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Darshan Singh <da...@gmail.com> on 2018/02/18 19:11:33 UTC

Need to understand the execution model of the Flink

Hi I would like to understand the execution model.

1. I have a csv files which is say 10 GB.
2. I created a table from this file.

3. Now I have created filtered tables on this say 10 of these.
4. Now I created a writetosink for all these 10 filtered tables.

Now my question is that are these 10 filetered tables be written in
parallel (suppose i have 40 cores and set up parallelism to say 40 as well.

Next question I have is that the table which I created form the csv file
which is common wont be persisted by flink internally rather for all 10
filtered tables it will read csv files and then apply the filter and write
to sink.

I think that for all 10 filtered tables it will read csv again and again in
this case it will be read 10 times.  Is my understanding correct or I am
missing something.

What if I step 2 I change table to dataset and back?

Thanks

Re: Need to understand the execution model of the Flink

Posted by Niclas Hedhman <ni...@apache.org>.
Do you really need the large single table created in step 2?

If not, what you typically do is that the Csv source first do the common
transformations. Then depending on whether the 10 outputs have different
processing paths or not, you either do a split() to do individual
processing depending on some criteria, or you just have the sink put each
record in separate tables.
You have full control, at each step along the transformation path whether
it can be parallelized or not, and if there are no sequential constraints
on your model, then you can easily fill all cores on all hosts quite easily.

Even if you need the step 2 table, I would still just treat that as a
split(), a branch ending in a Sink that does the storage there. No need to
read records from file over and over again, nor to store them first in step
2 table and read them out again.

Don't ask *me* about what happens in failure scenarios... I have myself not
figured that out yet.

HTH
Niclas

On Mon, Feb 19, 2018 at 3:11 AM, Darshan Singh <da...@gmail.com>
wrote:

> Hi I would like to understand the execution model.
>
> 1. I have a csv files which is say 10 GB.
> 2. I created a table from this file.
>
> 3. Now I have created filtered tables on this say 10 of these.
> 4. Now I created a writetosink for all these 10 filtered tables.
>
> Now my question is that are these 10 filetered tables be written in
> parallel (suppose i have 40 cores and set up parallelism to say 40 as well.
>
> Next question I have is that the table which I created form the csv file
> which is common wont be persisted by flink internally rather for all 10
> filtered tables it will read csv files and then apply the filter and write
> to sink.
>
> I think that for all 10 filtered tables it will read csv again and again
> in this case it will be read 10 times.  Is my understanding correct or I am
> missing something.
>
> What if I step 2 I change table to dataset and back?
>
> Thanks
>



-- 
Niclas Hedhman, Software Developer
http://zest.apache.org - New Energy for Java

Re: Need to understand the execution model of the Flink

Posted by Fabian Hueske <fh...@gmail.com>.
Cardinality and size estimation are fundamental requirements for cost-based
query optimization.
I hope we will work on this at some point but right now it is not on the
roadmap.

In case of very complex plans, it might make sense to write an intermediate
result to persistent storage and start another query.
I don't think there's a good rule of thumb for this because there are many
factors that need to be considered (data size, compute resources,
operators, etc.). You'd have to experiment yourself.

Best, Fabian

2018-02-20 23:52 GMT+01:00 Darshan Singh <da...@gmail.com>:

> Is there any plans for this in future. I could see at the plans and
> without these stats I am bit lost on what to look for like what are pain
> points etc. I can see some very obvious things but not too much with these
> plans.
>
> My question is there a guide or document which describes what your plans
> should look like and what needs to look into this?
>
> Also, I would like to know if there is a very complex execution plan(maybe
> not expensive but very complex) is it usually beneficial to save the
> intermediate datasets/tables and read them back and do the next steps.
>
> Thanks
>
> On Tue, Feb 20, 2018 at 9:34 AM, Fabian Hueske <fh...@gmail.com> wrote:
>
>> No, there is no size or cardinality estimation happening at the moment.
>>
>> Best, Fabian
>>
>> 2018-02-19 21:56 GMT+01:00 Darshan Singh <da...@gmail.com>:
>>
>>> Thanks , is there a metric or other way to know how much space each
>>> task/job is taking? Does execution plan has these details?
>>>
>>> Thanks
>>>
>>> On Mon, Feb 19, 2018 at 10:54 AM, Fabian Hueske <fh...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> that's a difficult question without knowing the details of your job.
>>>> A NoSpaceLeftOnDevice error occurs when a file system is full.
>>>>
>>>> This can happen if:
>>>> - A Flink algorithm writes to disk, e.g., an external sort or the hash
>>>> table of a hybrid hash join. This can happen for GroupBy, Join, Distinct,
>>>> or any other operation that requires to group or join data. Filters will
>>>> never spill to disk.
>>>> - An OutputFormat writes to disk.
>>>>
>>>> The data is written to a temp directory, that can be configured in the
>>>> ./conf/flink-conf.yaml file.
>>>>
>>>> Did you check how the tasks are distributed across the task managers?
>>>> The web UI can help to diagnose such problems.
>>>>
>>>> Best, Fabian
>>>>
>>>> 2018-02-19 11:22 GMT+01:00 Darshan Singh <da...@gmail.com>:
>>>>
>>>>> Thanks Fabian for such detailed explanation.
>>>>>
>>>>> I am using a datset in between so i guess csv is read once. Now to my
>>>>> real issue i have 6 task managers each having 4 cores and i have 2 slots
>>>>> per task manager.
>>>>>
>>>>> Now my csv file is jus 1 gb and i create table and transform to
>>>>> dataset and then run 15 different filters and extra processing which all
>>>>> run in almost parallel.
>>>>>
>>>>> However it fails with error no space left on device on one of the task
>>>>> manager. Space on each task manager is 32 gb in /tmp. So i am not sure why
>>>>> it is running out of space. I do use some joins with othrr tables but those
>>>>> are few megabytes.
>>>>>
>>>>> So i was assuming that somehow all parallel executions were storing
>>>>> data in /tmp and were filling it.
>>>>>
>>>>> So i would like to know wht could be filling space.
>>>>>
>>>>> Thanks
>>>>>
>>>>> On 19 Feb 2018 10:10 am, "Fabian Hueske" <fh...@gmail.com> wrote:
>>>>>
>>>>> Hi,
>>>>>
>>>>> this works as follows.
>>>>>
>>>>> - Table API and SQL queries are translated into regular DataSet jobs
>>>>> (assuming you are running in a batch ExecutionEnvironment).
>>>>> - A query is translated into a sequence of DataSet operators when you
>>>>> 1) transform the Table into a DataSet or 2) write it to a TableSink. In
>>>>> both cases, the optimizer is invoked and recursively goes back from the
>>>>> converted/emitted Table back to its roots, i.e., a TableSource or a
>>>>> DataSet.
>>>>>
>>>>> This means, that if you create a Table from a TableSource and apply
>>>>> multiple filters on it and write each filter to a TableSink, the CSV file
>>>>> will be read 10 times, filtered 10 times and written 10 times. This is not
>>>>> efficient, because, you could also just read the file once and apply all
>>>>> filters in parallel.
>>>>> You can do this by converting the Table that you read with a
>>>>> TableSource into a DataSet and register the DataSet again as a Table. In
>>>>> that case, the translations of all TableSinks will stop at the DataSet and
>>>>> not include the TableSource which reads the file.
>>>>>
>>>>> The following figures illustrate the difference:
>>>>>
>>>>> 1) Without DataSet in the middle:
>>>>>
>>>>> TableSource -> Filter1 -> TableSink1
>>>>> TableSource -> Filter2 -> TableSink2
>>>>> TableSource -> Filter3 -> TableSink3
>>>>>
>>>>> 2) With DataSet in the middle:
>>>>>
>>>>>                         /-> Filter1 -> TableSink1
>>>>> TableSource -<-> Filter2 -> TableSink2
>>>>>                         \-> Filter3 -> TableSink3
>>>>>
>>>>> I'll likely add a feature to internally translate an intermediate
>>>>> Table to make this a bit easier.
>>>>> The underlying problem is that the SQL optimizer cannot translate
>>>>> queries with multiple sinks.
>>>>> Instead, each sink is individually translated and the optimizer does
>>>>> not know that common execution paths could be shared.
>>>>>
>>>>> Best,
>>>>> Fabian
>>>>>
>>>>>
>>>>> 2018-02-19 2:19 GMT+01:00 Darshan Singh <da...@gmail.com>:
>>>>>
>>>>>> Thanks for reply.
>>>>>>
>>>>>> I guess I am not looking for alternate. I am trying to understand
>>>>>> what flink does in this scenario and if 10 tasks ar egoing in parallel I am
>>>>>> sure they will be reading csv as there is no other way.
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>> On Mon, Feb 19, 2018 at 12:48 AM, Niclas Hedhman <ni...@hedhman.org>
>>>>>> wrote:
>>>>>>
>>>>>>>
>>>>>>> Do you really need the large single table created in step 2?
>>>>>>>
>>>>>>> If not, what you typically do is that the Csv source first do the
>>>>>>> common transformations. Then depending on whether the 10 outputs have
>>>>>>> different processing paths or not, you either do a split() to do individual
>>>>>>> processing depending on some criteria, or you just have the sink put each
>>>>>>> record in separate tables.
>>>>>>> You have full control, at each step along the transformation path
>>>>>>> whether it can be parallelized or not, and if there are no sequential
>>>>>>> constraints on your model, then you can easily fill all cores on all hosts
>>>>>>> quite easily.
>>>>>>>
>>>>>>> Even if you need the step 2 table, I would still just treat that as
>>>>>>> a split(), a branch ending in a Sink that does the storage there. No need
>>>>>>> to read records from file over and over again, nor to store them first in
>>>>>>> step 2 table and read them out again.
>>>>>>>
>>>>>>> Don't ask *me* about what happens in failure scenarios... I have
>>>>>>> myself not figured that out yet.
>>>>>>>
>>>>>>> HTH
>>>>>>> Niclas
>>>>>>>
>>>>>>> On Mon, Feb 19, 2018 at 3:11 AM, Darshan Singh <
>>>>>>> darshan.meel@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi I would like to understand the execution model.
>>>>>>>>
>>>>>>>> 1. I have a csv files which is say 10 GB.
>>>>>>>> 2. I created a table from this file.
>>>>>>>>
>>>>>>>> 3. Now I have created filtered tables on this say 10 of these.
>>>>>>>> 4. Now I created a writetosink for all these 10 filtered tables.
>>>>>>>>
>>>>>>>> Now my question is that are these 10 filetered tables be written in
>>>>>>>> parallel (suppose i have 40 cores and set up parallelism to say 40 as well.
>>>>>>>>
>>>>>>>> Next question I have is that the table which I created form the csv
>>>>>>>> file which is common wont be persisted by flink internally rather for all
>>>>>>>> 10 filtered tables it will read csv files and then apply the filter and
>>>>>>>> write to sink.
>>>>>>>>
>>>>>>>> I think that for all 10 filtered tables it will read csv again and
>>>>>>>> again in this case it will be read 10 times.  Is my understanding correct
>>>>>>>> or I am missing something.
>>>>>>>>
>>>>>>>> What if I step 2 I change table to dataset and back?
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Niclas Hedhman, Software Developer
>>>>>>> http://polygene.apache.org - New Energy for Java
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Need to understand the execution model of the Flink

Posted by Darshan Singh <da...@gmail.com>.
Is there any plans for this in future. I could see at the plans and without
these stats I am bit lost on what to look for like what are pain points
etc. I can see some very obvious things but not too much with these plans.

My question is there a guide or document which describes what your plans
should look like and what needs to look into this?

Also, I would like to know if there is a very complex execution plan(maybe
not expensive but very complex) is it usually beneficial to save the
intermediate datasets/tables and read them back and do the next steps.

Thanks

On Tue, Feb 20, 2018 at 9:34 AM, Fabian Hueske <fh...@gmail.com> wrote:

> No, there is no size or cardinality estimation happening at the moment.
>
> Best, Fabian
>
> 2018-02-19 21:56 GMT+01:00 Darshan Singh <da...@gmail.com>:
>
>> Thanks , is there a metric or other way to know how much space each
>> task/job is taking? Does execution plan has these details?
>>
>> Thanks
>>
>> On Mon, Feb 19, 2018 at 10:54 AM, Fabian Hueske <fh...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> that's a difficult question without knowing the details of your job.
>>> A NoSpaceLeftOnDevice error occurs when a file system is full.
>>>
>>> This can happen if:
>>> - A Flink algorithm writes to disk, e.g., an external sort or the hash
>>> table of a hybrid hash join. This can happen for GroupBy, Join, Distinct,
>>> or any other operation that requires to group or join data. Filters will
>>> never spill to disk.
>>> - An OutputFormat writes to disk.
>>>
>>> The data is written to a temp directory, that can be configured in the
>>> ./conf/flink-conf.yaml file.
>>>
>>> Did you check how the tasks are distributed across the task managers?
>>> The web UI can help to diagnose such problems.
>>>
>>> Best, Fabian
>>>
>>> 2018-02-19 11:22 GMT+01:00 Darshan Singh <da...@gmail.com>:
>>>
>>>> Thanks Fabian for such detailed explanation.
>>>>
>>>> I am using a datset in between so i guess csv is read once. Now to my
>>>> real issue i have 6 task managers each having 4 cores and i have 2 slots
>>>> per task manager.
>>>>
>>>> Now my csv file is jus 1 gb and i create table and transform to dataset
>>>> and then run 15 different filters and extra processing which all run in
>>>> almost parallel.
>>>>
>>>> However it fails with error no space left on device on one of the task
>>>> manager. Space on each task manager is 32 gb in /tmp. So i am not sure why
>>>> it is running out of space. I do use some joins with othrr tables but those
>>>> are few megabytes.
>>>>
>>>> So i was assuming that somehow all parallel executions were storing
>>>> data in /tmp and were filling it.
>>>>
>>>> So i would like to know wht could be filling space.
>>>>
>>>> Thanks
>>>>
>>>> On 19 Feb 2018 10:10 am, "Fabian Hueske" <fh...@gmail.com> wrote:
>>>>
>>>> Hi,
>>>>
>>>> this works as follows.
>>>>
>>>> - Table API and SQL queries are translated into regular DataSet jobs
>>>> (assuming you are running in a batch ExecutionEnvironment).
>>>> - A query is translated into a sequence of DataSet operators when you
>>>> 1) transform the Table into a DataSet or 2) write it to a TableSink. In
>>>> both cases, the optimizer is invoked and recursively goes back from the
>>>> converted/emitted Table back to its roots, i.e., a TableSource or a
>>>> DataSet.
>>>>
>>>> This means, that if you create a Table from a TableSource and apply
>>>> multiple filters on it and write each filter to a TableSink, the CSV file
>>>> will be read 10 times, filtered 10 times and written 10 times. This is not
>>>> efficient, because, you could also just read the file once and apply all
>>>> filters in parallel.
>>>> You can do this by converting the Table that you read with a
>>>> TableSource into a DataSet and register the DataSet again as a Table. In
>>>> that case, the translations of all TableSinks will stop at the DataSet and
>>>> not include the TableSource which reads the file.
>>>>
>>>> The following figures illustrate the difference:
>>>>
>>>> 1) Without DataSet in the middle:
>>>>
>>>> TableSource -> Filter1 -> TableSink1
>>>> TableSource -> Filter2 -> TableSink2
>>>> TableSource -> Filter3 -> TableSink3
>>>>
>>>> 2) With DataSet in the middle:
>>>>
>>>>                         /-> Filter1 -> TableSink1
>>>> TableSource -<-> Filter2 -> TableSink2
>>>>                         \-> Filter3 -> TableSink3
>>>>
>>>> I'll likely add a feature to internally translate an intermediate Table
>>>> to make this a bit easier.
>>>> The underlying problem is that the SQL optimizer cannot translate
>>>> queries with multiple sinks.
>>>> Instead, each sink is individually translated and the optimizer does
>>>> not know that common execution paths could be shared.
>>>>
>>>> Best,
>>>> Fabian
>>>>
>>>>
>>>> 2018-02-19 2:19 GMT+01:00 Darshan Singh <da...@gmail.com>:
>>>>
>>>>> Thanks for reply.
>>>>>
>>>>> I guess I am not looking for alternate. I am trying to understand what
>>>>> flink does in this scenario and if 10 tasks ar egoing in parallel I am sure
>>>>> they will be reading csv as there is no other way.
>>>>>
>>>>> Thanks
>>>>>
>>>>> On Mon, Feb 19, 2018 at 12:48 AM, Niclas Hedhman <ni...@hedhman.org>
>>>>> wrote:
>>>>>
>>>>>>
>>>>>> Do you really need the large single table created in step 2?
>>>>>>
>>>>>> If not, what you typically do is that the Csv source first do the
>>>>>> common transformations. Then depending on whether the 10 outputs have
>>>>>> different processing paths or not, you either do a split() to do individual
>>>>>> processing depending on some criteria, or you just have the sink put each
>>>>>> record in separate tables.
>>>>>> You have full control, at each step along the transformation path
>>>>>> whether it can be parallelized or not, and if there are no sequential
>>>>>> constraints on your model, then you can easily fill all cores on all hosts
>>>>>> quite easily.
>>>>>>
>>>>>> Even if you need the step 2 table, I would still just treat that as a
>>>>>> split(), a branch ending in a Sink that does the storage there. No need to
>>>>>> read records from file over and over again, nor to store them first in step
>>>>>> 2 table and read them out again.
>>>>>>
>>>>>> Don't ask *me* about what happens in failure scenarios... I have
>>>>>> myself not figured that out yet.
>>>>>>
>>>>>> HTH
>>>>>> Niclas
>>>>>>
>>>>>> On Mon, Feb 19, 2018 at 3:11 AM, Darshan Singh <
>>>>>> darshan.meel@gmail.com> wrote:
>>>>>>
>>>>>>> Hi I would like to understand the execution model.
>>>>>>>
>>>>>>> 1. I have a csv files which is say 10 GB.
>>>>>>> 2. I created a table from this file.
>>>>>>>
>>>>>>> 3. Now I have created filtered tables on this say 10 of these.
>>>>>>> 4. Now I created a writetosink for all these 10 filtered tables.
>>>>>>>
>>>>>>> Now my question is that are these 10 filetered tables be written in
>>>>>>> parallel (suppose i have 40 cores and set up parallelism to say 40 as well.
>>>>>>>
>>>>>>> Next question I have is that the table which I created form the csv
>>>>>>> file which is common wont be persisted by flink internally rather for all
>>>>>>> 10 filtered tables it will read csv files and then apply the filter and
>>>>>>> write to sink.
>>>>>>>
>>>>>>> I think that for all 10 filtered tables it will read csv again and
>>>>>>> again in this case it will be read 10 times.  Is my understanding correct
>>>>>>> or I am missing something.
>>>>>>>
>>>>>>> What if I step 2 I change table to dataset and back?
>>>>>>>
>>>>>>> Thanks
>>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Niclas Hedhman, Software Developer
>>>>>> http://polygene.apache.org - New Energy for Java
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>

Re: Need to understand the execution model of the Flink

Posted by Fabian Hueske <fh...@gmail.com>.
No, there is no size or cardinality estimation happening at the moment.

Best, Fabian

2018-02-19 21:56 GMT+01:00 Darshan Singh <da...@gmail.com>:

> Thanks , is there a metric or other way to know how much space each
> task/job is taking? Does execution plan has these details?
>
> Thanks
>
> On Mon, Feb 19, 2018 at 10:54 AM, Fabian Hueske <fh...@gmail.com> wrote:
>
>> Hi,
>>
>> that's a difficult question without knowing the details of your job.
>> A NoSpaceLeftOnDevice error occurs when a file system is full.
>>
>> This can happen if:
>> - A Flink algorithm writes to disk, e.g., an external sort or the hash
>> table of a hybrid hash join. This can happen for GroupBy, Join, Distinct,
>> or any other operation that requires to group or join data. Filters will
>> never spill to disk.
>> - An OutputFormat writes to disk.
>>
>> The data is written to a temp directory, that can be configured in the
>> ./conf/flink-conf.yaml file.
>>
>> Did you check how the tasks are distributed across the task managers?
>> The web UI can help to diagnose such problems.
>>
>> Best, Fabian
>>
>> 2018-02-19 11:22 GMT+01:00 Darshan Singh <da...@gmail.com>:
>>
>>> Thanks Fabian for such detailed explanation.
>>>
>>> I am using a datset in between so i guess csv is read once. Now to my
>>> real issue i have 6 task managers each having 4 cores and i have 2 slots
>>> per task manager.
>>>
>>> Now my csv file is jus 1 gb and i create table and transform to dataset
>>> and then run 15 different filters and extra processing which all run in
>>> almost parallel.
>>>
>>> However it fails with error no space left on device on one of the task
>>> manager. Space on each task manager is 32 gb in /tmp. So i am not sure why
>>> it is running out of space. I do use some joins with othrr tables but those
>>> are few megabytes.
>>>
>>> So i was assuming that somehow all parallel executions were storing data
>>> in /tmp and were filling it.
>>>
>>> So i would like to know wht could be filling space.
>>>
>>> Thanks
>>>
>>> On 19 Feb 2018 10:10 am, "Fabian Hueske" <fh...@gmail.com> wrote:
>>>
>>> Hi,
>>>
>>> this works as follows.
>>>
>>> - Table API and SQL queries are translated into regular DataSet jobs
>>> (assuming you are running in a batch ExecutionEnvironment).
>>> - A query is translated into a sequence of DataSet operators when you 1)
>>> transform the Table into a DataSet or 2) write it to a TableSink. In both
>>> cases, the optimizer is invoked and recursively goes back from the
>>> converted/emitted Table back to its roots, i.e., a TableSource or a
>>> DataSet.
>>>
>>> This means, that if you create a Table from a TableSource and apply
>>> multiple filters on it and write each filter to a TableSink, the CSV file
>>> will be read 10 times, filtered 10 times and written 10 times. This is not
>>> efficient, because, you could also just read the file once and apply all
>>> filters in parallel.
>>> You can do this by converting the Table that you read with a TableSource
>>> into a DataSet and register the DataSet again as a Table. In that case, the
>>> translations of all TableSinks will stop at the DataSet and not include the
>>> TableSource which reads the file.
>>>
>>> The following figures illustrate the difference:
>>>
>>> 1) Without DataSet in the middle:
>>>
>>> TableSource -> Filter1 -> TableSink1
>>> TableSource -> Filter2 -> TableSink2
>>> TableSource -> Filter3 -> TableSink3
>>>
>>> 2) With DataSet in the middle:
>>>
>>>                         /-> Filter1 -> TableSink1
>>> TableSource -<-> Filter2 -> TableSink2
>>>                         \-> Filter3 -> TableSink3
>>>
>>> I'll likely add a feature to internally translate an intermediate Table
>>> to make this a bit easier.
>>> The underlying problem is that the SQL optimizer cannot translate
>>> queries with multiple sinks.
>>> Instead, each sink is individually translated and the optimizer does not
>>> know that common execution paths could be shared.
>>>
>>> Best,
>>> Fabian
>>>
>>>
>>> 2018-02-19 2:19 GMT+01:00 Darshan Singh <da...@gmail.com>:
>>>
>>>> Thanks for reply.
>>>>
>>>> I guess I am not looking for alternate. I am trying to understand what
>>>> flink does in this scenario and if 10 tasks ar egoing in parallel I am sure
>>>> they will be reading csv as there is no other way.
>>>>
>>>> Thanks
>>>>
>>>> On Mon, Feb 19, 2018 at 12:48 AM, Niclas Hedhman <ni...@hedhman.org>
>>>> wrote:
>>>>
>>>>>
>>>>> Do you really need the large single table created in step 2?
>>>>>
>>>>> If not, what you typically do is that the Csv source first do the
>>>>> common transformations. Then depending on whether the 10 outputs have
>>>>> different processing paths or not, you either do a split() to do individual
>>>>> processing depending on some criteria, or you just have the sink put each
>>>>> record in separate tables.
>>>>> You have full control, at each step along the transformation path
>>>>> whether it can be parallelized or not, and if there are no sequential
>>>>> constraints on your model, then you can easily fill all cores on all hosts
>>>>> quite easily.
>>>>>
>>>>> Even if you need the step 2 table, I would still just treat that as a
>>>>> split(), a branch ending in a Sink that does the storage there. No need to
>>>>> read records from file over and over again, nor to store them first in step
>>>>> 2 table and read them out again.
>>>>>
>>>>> Don't ask *me* about what happens in failure scenarios... I have
>>>>> myself not figured that out yet.
>>>>>
>>>>> HTH
>>>>> Niclas
>>>>>
>>>>> On Mon, Feb 19, 2018 at 3:11 AM, Darshan Singh <darshan.meel@gmail.com
>>>>> > wrote:
>>>>>
>>>>>> Hi I would like to understand the execution model.
>>>>>>
>>>>>> 1. I have a csv files which is say 10 GB.
>>>>>> 2. I created a table from this file.
>>>>>>
>>>>>> 3. Now I have created filtered tables on this say 10 of these.
>>>>>> 4. Now I created a writetosink for all these 10 filtered tables.
>>>>>>
>>>>>> Now my question is that are these 10 filetered tables be written in
>>>>>> parallel (suppose i have 40 cores and set up parallelism to say 40 as well.
>>>>>>
>>>>>> Next question I have is that the table which I created form the csv
>>>>>> file which is common wont be persisted by flink internally rather for all
>>>>>> 10 filtered tables it will read csv files and then apply the filter and
>>>>>> write to sink.
>>>>>>
>>>>>> I think that for all 10 filtered tables it will read csv again and
>>>>>> again in this case it will be read 10 times.  Is my understanding correct
>>>>>> or I am missing something.
>>>>>>
>>>>>> What if I step 2 I change table to dataset and back?
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Niclas Hedhman, Software Developer
>>>>> http://polygene.apache.org - New Energy for Java
>>>>>
>>>>
>>>>
>>>
>>>
>>
>

Re: Need to understand the execution model of the Flink

Posted by Darshan Singh <da...@gmail.com>.
Thanks , is there a metric or other way to know how much space each
task/job is taking? Does execution plan has these details?

Thanks

On Mon, Feb 19, 2018 at 10:54 AM, Fabian Hueske <fh...@gmail.com> wrote:

> Hi,
>
> that's a difficult question without knowing the details of your job.
> A NoSpaceLeftOnDevice error occurs when a file system is full.
>
> This can happen if:
> - A Flink algorithm writes to disk, e.g., an external sort or the hash
> table of a hybrid hash join. This can happen for GroupBy, Join, Distinct,
> or any other operation that requires to group or join data. Filters will
> never spill to disk.
> - An OutputFormat writes to disk.
>
> The data is written to a temp directory, that can be configured in the
> ./conf/flink-conf.yaml file.
>
> Did you check how the tasks are distributed across the task managers?
> The web UI can help to diagnose such problems.
>
> Best, Fabian
>
> 2018-02-19 11:22 GMT+01:00 Darshan Singh <da...@gmail.com>:
>
>> Thanks Fabian for such detailed explanation.
>>
>> I am using a datset in between so i guess csv is read once. Now to my
>> real issue i have 6 task managers each having 4 cores and i have 2 slots
>> per task manager.
>>
>> Now my csv file is jus 1 gb and i create table and transform to dataset
>> and then run 15 different filters and extra processing which all run in
>> almost parallel.
>>
>> However it fails with error no space left on device on one of the task
>> manager. Space on each task manager is 32 gb in /tmp. So i am not sure why
>> it is running out of space. I do use some joins with othrr tables but those
>> are few megabytes.
>>
>> So i was assuming that somehow all parallel executions were storing data
>> in /tmp and were filling it.
>>
>> So i would like to know wht could be filling space.
>>
>> Thanks
>>
>> On 19 Feb 2018 10:10 am, "Fabian Hueske" <fh...@gmail.com> wrote:
>>
>> Hi,
>>
>> this works as follows.
>>
>> - Table API and SQL queries are translated into regular DataSet jobs
>> (assuming you are running in a batch ExecutionEnvironment).
>> - A query is translated into a sequence of DataSet operators when you 1)
>> transform the Table into a DataSet or 2) write it to a TableSink. In both
>> cases, the optimizer is invoked and recursively goes back from the
>> converted/emitted Table back to its roots, i.e., a TableSource or a
>> DataSet.
>>
>> This means, that if you create a Table from a TableSource and apply
>> multiple filters on it and write each filter to a TableSink, the CSV file
>> will be read 10 times, filtered 10 times and written 10 times. This is not
>> efficient, because, you could also just read the file once and apply all
>> filters in parallel.
>> You can do this by converting the Table that you read with a TableSource
>> into a DataSet and register the DataSet again as a Table. In that case, the
>> translations of all TableSinks will stop at the DataSet and not include the
>> TableSource which reads the file.
>>
>> The following figures illustrate the difference:
>>
>> 1) Without DataSet in the middle:
>>
>> TableSource -> Filter1 -> TableSink1
>> TableSource -> Filter2 -> TableSink2
>> TableSource -> Filter3 -> TableSink3
>>
>> 2) With DataSet in the middle:
>>
>>                         /-> Filter1 -> TableSink1
>> TableSource -<-> Filter2 -> TableSink2
>>                         \-> Filter3 -> TableSink3
>>
>> I'll likely add a feature to internally translate an intermediate Table
>> to make this a bit easier.
>> The underlying problem is that the SQL optimizer cannot translate queries
>> with multiple sinks.
>> Instead, each sink is individually translated and the optimizer does not
>> know that common execution paths could be shared.
>>
>> Best,
>> Fabian
>>
>>
>> 2018-02-19 2:19 GMT+01:00 Darshan Singh <da...@gmail.com>:
>>
>>> Thanks for reply.
>>>
>>> I guess I am not looking for alternate. I am trying to understand what
>>> flink does in this scenario and if 10 tasks ar egoing in parallel I am sure
>>> they will be reading csv as there is no other way.
>>>
>>> Thanks
>>>
>>> On Mon, Feb 19, 2018 at 12:48 AM, Niclas Hedhman <ni...@hedhman.org>
>>> wrote:
>>>
>>>>
>>>> Do you really need the large single table created in step 2?
>>>>
>>>> If not, what you typically do is that the Csv source first do the
>>>> common transformations. Then depending on whether the 10 outputs have
>>>> different processing paths or not, you either do a split() to do individual
>>>> processing depending on some criteria, or you just have the sink put each
>>>> record in separate tables.
>>>> You have full control, at each step along the transformation path
>>>> whether it can be parallelized or not, and if there are no sequential
>>>> constraints on your model, then you can easily fill all cores on all hosts
>>>> quite easily.
>>>>
>>>> Even if you need the step 2 table, I would still just treat that as a
>>>> split(), a branch ending in a Sink that does the storage there. No need to
>>>> read records from file over and over again, nor to store them first in step
>>>> 2 table and read them out again.
>>>>
>>>> Don't ask *me* about what happens in failure scenarios... I have myself
>>>> not figured that out yet.
>>>>
>>>> HTH
>>>> Niclas
>>>>
>>>> On Mon, Feb 19, 2018 at 3:11 AM, Darshan Singh <da...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi I would like to understand the execution model.
>>>>>
>>>>> 1. I have a csv files which is say 10 GB.
>>>>> 2. I created a table from this file.
>>>>>
>>>>> 3. Now I have created filtered tables on this say 10 of these.
>>>>> 4. Now I created a writetosink for all these 10 filtered tables.
>>>>>
>>>>> Now my question is that are these 10 filetered tables be written in
>>>>> parallel (suppose i have 40 cores and set up parallelism to say 40 as well.
>>>>>
>>>>> Next question I have is that the table which I created form the csv
>>>>> file which is common wont be persisted by flink internally rather for all
>>>>> 10 filtered tables it will read csv files and then apply the filter and
>>>>> write to sink.
>>>>>
>>>>> I think that for all 10 filtered tables it will read csv again and
>>>>> again in this case it will be read 10 times.  Is my understanding correct
>>>>> or I am missing something.
>>>>>
>>>>> What if I step 2 I change table to dataset and back?
>>>>>
>>>>> Thanks
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Niclas Hedhman, Software Developer
>>>> http://polygene.apache.org - New Energy for Java
>>>>
>>>
>>>
>>
>>
>

Re: Need to understand the execution model of the Flink

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

that's a difficult question without knowing the details of your job.
A NoSpaceLeftOnDevice error occurs when a file system is full.

This can happen if:
- A Flink algorithm writes to disk, e.g., an external sort or the hash
table of a hybrid hash join. This can happen for GroupBy, Join, Distinct,
or any other operation that requires to group or join data. Filters will
never spill to disk.
- An OutputFormat writes to disk.

The data is written to a temp directory, that can be configured in the
./conf/flink-conf.yaml file.

Did you check how the tasks are distributed across the task managers?
The web UI can help to diagnose such problems.

Best, Fabian

2018-02-19 11:22 GMT+01:00 Darshan Singh <da...@gmail.com>:

> Thanks Fabian for such detailed explanation.
>
> I am using a datset in between so i guess csv is read once. Now to my real
> issue i have 6 task managers each having 4 cores and i have 2 slots per
> task manager.
>
> Now my csv file is jus 1 gb and i create table and transform to dataset
> and then run 15 different filters and extra processing which all run in
> almost parallel.
>
> However it fails with error no space left on device on one of the task
> manager. Space on each task manager is 32 gb in /tmp. So i am not sure why
> it is running out of space. I do use some joins with othrr tables but those
> are few megabytes.
>
> So i was assuming that somehow all parallel executions were storing data
> in /tmp and were filling it.
>
> So i would like to know wht could be filling space.
>
> Thanks
>
> On 19 Feb 2018 10:10 am, "Fabian Hueske" <fh...@gmail.com> wrote:
>
> Hi,
>
> this works as follows.
>
> - Table API and SQL queries are translated into regular DataSet jobs
> (assuming you are running in a batch ExecutionEnvironment).
> - A query is translated into a sequence of DataSet operators when you 1)
> transform the Table into a DataSet or 2) write it to a TableSink. In both
> cases, the optimizer is invoked and recursively goes back from the
> converted/emitted Table back to its roots, i.e., a TableSource or a
> DataSet.
>
> This means, that if you create a Table from a TableSource and apply
> multiple filters on it and write each filter to a TableSink, the CSV file
> will be read 10 times, filtered 10 times and written 10 times. This is not
> efficient, because, you could also just read the file once and apply all
> filters in parallel.
> You can do this by converting the Table that you read with a TableSource
> into a DataSet and register the DataSet again as a Table. In that case, the
> translations of all TableSinks will stop at the DataSet and not include the
> TableSource which reads the file.
>
> The following figures illustrate the difference:
>
> 1) Without DataSet in the middle:
>
> TableSource -> Filter1 -> TableSink1
> TableSource -> Filter2 -> TableSink2
> TableSource -> Filter3 -> TableSink3
>
> 2) With DataSet in the middle:
>
>                         /-> Filter1 -> TableSink1
> TableSource -<-> Filter2 -> TableSink2
>                         \-> Filter3 -> TableSink3
>
> I'll likely add a feature to internally translate an intermediate Table to
> make this a bit easier.
> The underlying problem is that the SQL optimizer cannot translate queries
> with multiple sinks.
> Instead, each sink is individually translated and the optimizer does not
> know that common execution paths could be shared.
>
> Best,
> Fabian
>
>
> 2018-02-19 2:19 GMT+01:00 Darshan Singh <da...@gmail.com>:
>
>> Thanks for reply.
>>
>> I guess I am not looking for alternate. I am trying to understand what
>> flink does in this scenario and if 10 tasks ar egoing in parallel I am sure
>> they will be reading csv as there is no other way.
>>
>> Thanks
>>
>> On Mon, Feb 19, 2018 at 12:48 AM, Niclas Hedhman <ni...@hedhman.org>
>> wrote:
>>
>>>
>>> Do you really need the large single table created in step 2?
>>>
>>> If not, what you typically do is that the Csv source first do the common
>>> transformations. Then depending on whether the 10 outputs have different
>>> processing paths or not, you either do a split() to do individual
>>> processing depending on some criteria, or you just have the sink put each
>>> record in separate tables.
>>> You have full control, at each step along the transformation path
>>> whether it can be parallelized or not, and if there are no sequential
>>> constraints on your model, then you can easily fill all cores on all hosts
>>> quite easily.
>>>
>>> Even if you need the step 2 table, I would still just treat that as a
>>> split(), a branch ending in a Sink that does the storage there. No need to
>>> read records from file over and over again, nor to store them first in step
>>> 2 table and read them out again.
>>>
>>> Don't ask *me* about what happens in failure scenarios... I have myself
>>> not figured that out yet.
>>>
>>> HTH
>>> Niclas
>>>
>>> On Mon, Feb 19, 2018 at 3:11 AM, Darshan Singh <da...@gmail.com>
>>> wrote:
>>>
>>>> Hi I would like to understand the execution model.
>>>>
>>>> 1. I have a csv files which is say 10 GB.
>>>> 2. I created a table from this file.
>>>>
>>>> 3. Now I have created filtered tables on this say 10 of these.
>>>> 4. Now I created a writetosink for all these 10 filtered tables.
>>>>
>>>> Now my question is that are these 10 filetered tables be written in
>>>> parallel (suppose i have 40 cores and set up parallelism to say 40 as well.
>>>>
>>>> Next question I have is that the table which I created form the csv
>>>> file which is common wont be persisted by flink internally rather for all
>>>> 10 filtered tables it will read csv files and then apply the filter and
>>>> write to sink.
>>>>
>>>> I think that for all 10 filtered tables it will read csv again and
>>>> again in this case it will be read 10 times.  Is my understanding correct
>>>> or I am missing something.
>>>>
>>>> What if I step 2 I change table to dataset and back?
>>>>
>>>> Thanks
>>>>
>>>
>>>
>>>
>>> --
>>> Niclas Hedhman, Software Developer
>>> http://polygene.apache.org - New Energy for Java
>>>
>>
>>
>
>

Re: Need to understand the execution model of the Flink

Posted by Darshan Singh <da...@gmail.com>.
Thanks Fabian for such detailed explanation.

I am using a datset in between so i guess csv is read once. Now to my real
issue i have 6 task managers each having 4 cores and i have 2 slots per
task manager.

Now my csv file is jus 1 gb and i create table and transform to dataset and
then run 15 different filters and extra processing which all run in almost
parallel.

However it fails with error no space left on device on one of the task
manager. Space on each task manager is 32 gb in /tmp. So i am not sure why
it is running out of space. I do use some joins with othrr tables but those
are few megabytes.

So i was assuming that somehow all parallel executions were storing data in
/tmp and were filling it.

So i would like to know wht could be filling space.

Thanks

On 19 Feb 2018 10:10 am, "Fabian Hueske" <fh...@gmail.com> wrote:

Hi,

this works as follows.

- Table API and SQL queries are translated into regular DataSet jobs
(assuming you are running in a batch ExecutionEnvironment).
- A query is translated into a sequence of DataSet operators when you 1)
transform the Table into a DataSet or 2) write it to a TableSink. In both
cases, the optimizer is invoked and recursively goes back from the
converted/emitted Table back to its roots, i.e., a TableSource or a
DataSet.

This means, that if you create a Table from a TableSource and apply
multiple filters on it and write each filter to a TableSink, the CSV file
will be read 10 times, filtered 10 times and written 10 times. This is not
efficient, because, you could also just read the file once and apply all
filters in parallel.
You can do this by converting the Table that you read with a TableSource
into a DataSet and register the DataSet again as a Table. In that case, the
translations of all TableSinks will stop at the DataSet and not include the
TableSource which reads the file.

The following figures illustrate the difference:

1) Without DataSet in the middle:

TableSource -> Filter1 -> TableSink1
TableSource -> Filter2 -> TableSink2
TableSource -> Filter3 -> TableSink3

2) With DataSet in the middle:

                        /-> Filter1 -> TableSink1
TableSource -<-> Filter2 -> TableSink2
                        \-> Filter3 -> TableSink3

I'll likely add a feature to internally translate an intermediate Table to
make this a bit easier.
The underlying problem is that the SQL optimizer cannot translate queries
with multiple sinks.
Instead, each sink is individually translated and the optimizer does not
know that common execution paths could be shared.

Best,
Fabian


2018-02-19 2:19 GMT+01:00 Darshan Singh <da...@gmail.com>:

> Thanks for reply.
>
> I guess I am not looking for alternate. I am trying to understand what
> flink does in this scenario and if 10 tasks ar egoing in parallel I am sure
> they will be reading csv as there is no other way.
>
> Thanks
>
> On Mon, Feb 19, 2018 at 12:48 AM, Niclas Hedhman <ni...@hedhman.org>
> wrote:
>
>>
>> Do you really need the large single table created in step 2?
>>
>> If not, what you typically do is that the Csv source first do the common
>> transformations. Then depending on whether the 10 outputs have different
>> processing paths or not, you either do a split() to do individual
>> processing depending on some criteria, or you just have the sink put each
>> record in separate tables.
>> You have full control, at each step along the transformation path whether
>> it can be parallelized or not, and if there are no sequential constraints
>> on your model, then you can easily fill all cores on all hosts quite easily.
>>
>> Even if you need the step 2 table, I would still just treat that as a
>> split(), a branch ending in a Sink that does the storage there. No need to
>> read records from file over and over again, nor to store them first in step
>> 2 table and read them out again.
>>
>> Don't ask *me* about what happens in failure scenarios... I have myself
>> not figured that out yet.
>>
>> HTH
>> Niclas
>>
>> On Mon, Feb 19, 2018 at 3:11 AM, Darshan Singh <da...@gmail.com>
>> wrote:
>>
>>> Hi I would like to understand the execution model.
>>>
>>> 1. I have a csv files which is say 10 GB.
>>> 2. I created a table from this file.
>>>
>>> 3. Now I have created filtered tables on this say 10 of these.
>>> 4. Now I created a writetosink for all these 10 filtered tables.
>>>
>>> Now my question is that are these 10 filetered tables be written in
>>> parallel (suppose i have 40 cores and set up parallelism to say 40 as well.
>>>
>>> Next question I have is that the table which I created form the csv file
>>> which is common wont be persisted by flink internally rather for all 10
>>> filtered tables it will read csv files and then apply the filter and write
>>> to sink.
>>>
>>> I think that for all 10 filtered tables it will read csv again and again
>>> in this case it will be read 10 times.  Is my understanding correct or I am
>>> missing something.
>>>
>>> What if I step 2 I change table to dataset and back?
>>>
>>> Thanks
>>>
>>
>>
>>
>> --
>> Niclas Hedhman, Software Developer
>> http://polygene.apache.org - New Energy for Java
>>
>
>

Re: Need to understand the execution model of the Flink

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

this works as follows.

- Table API and SQL queries are translated into regular DataSet jobs
(assuming you are running in a batch ExecutionEnvironment).
- A query is translated into a sequence of DataSet operators when you 1)
transform the Table into a DataSet or 2) write it to a TableSink. In both
cases, the optimizer is invoked and recursively goes back from the
converted/emitted Table back to its roots, i.e., a TableSource or a
DataSet.

This means, that if you create a Table from a TableSource and apply
multiple filters on it and write each filter to a TableSink, the CSV file
will be read 10 times, filtered 10 times and written 10 times. This is not
efficient, because, you could also just read the file once and apply all
filters in parallel.
You can do this by converting the Table that you read with a TableSource
into a DataSet and register the DataSet again as a Table. In that case, the
translations of all TableSinks will stop at the DataSet and not include the
TableSource which reads the file.

The following figures illustrate the difference:

1) Without DataSet in the middle:

TableSource -> Filter1 -> TableSink1
TableSource -> Filter2 -> TableSink2
TableSource -> Filter3 -> TableSink3

2) With DataSet in the middle:

                        /-> Filter1 -> TableSink1
TableSource -<-> Filter2 -> TableSink2
                        \-> Filter3 -> TableSink3

I'll likely add a feature to internally translate an intermediate Table to
make this a bit easier.
The underlying problem is that the SQL optimizer cannot translate queries
with multiple sinks.
Instead, each sink is individually translated and the optimizer does not
know that common execution paths could be shared.

Best,
Fabian

2018-02-19 2:19 GMT+01:00 Darshan Singh <da...@gmail.com>:

> Thanks for reply.
>
> I guess I am not looking for alternate. I am trying to understand what
> flink does in this scenario and if 10 tasks ar egoing in parallel I am sure
> they will be reading csv as there is no other way.
>
> Thanks
>
> On Mon, Feb 19, 2018 at 12:48 AM, Niclas Hedhman <ni...@hedhman.org>
> wrote:
>
>>
>> Do you really need the large single table created in step 2?
>>
>> If not, what you typically do is that the Csv source first do the common
>> transformations. Then depending on whether the 10 outputs have different
>> processing paths or not, you either do a split() to do individual
>> processing depending on some criteria, or you just have the sink put each
>> record in separate tables.
>> You have full control, at each step along the transformation path whether
>> it can be parallelized or not, and if there are no sequential constraints
>> on your model, then you can easily fill all cores on all hosts quite easily.
>>
>> Even if you need the step 2 table, I would still just treat that as a
>> split(), a branch ending in a Sink that does the storage there. No need to
>> read records from file over and over again, nor to store them first in step
>> 2 table and read them out again.
>>
>> Don't ask *me* about what happens in failure scenarios... I have myself
>> not figured that out yet.
>>
>> HTH
>> Niclas
>>
>> On Mon, Feb 19, 2018 at 3:11 AM, Darshan Singh <da...@gmail.com>
>> wrote:
>>
>>> Hi I would like to understand the execution model.
>>>
>>> 1. I have a csv files which is say 10 GB.
>>> 2. I created a table from this file.
>>>
>>> 3. Now I have created filtered tables on this say 10 of these.
>>> 4. Now I created a writetosink for all these 10 filtered tables.
>>>
>>> Now my question is that are these 10 filetered tables be written in
>>> parallel (suppose i have 40 cores and set up parallelism to say 40 as well.
>>>
>>> Next question I have is that the table which I created form the csv file
>>> which is common wont be persisted by flink internally rather for all 10
>>> filtered tables it will read csv files and then apply the filter and write
>>> to sink.
>>>
>>> I think that for all 10 filtered tables it will read csv again and again
>>> in this case it will be read 10 times.  Is my understanding correct or I am
>>> missing something.
>>>
>>> What if I step 2 I change table to dataset and back?
>>>
>>> Thanks
>>>
>>
>>
>>
>> --
>> Niclas Hedhman, Software Developer
>> http://polygene.apache.org - New Energy for Java
>>
>
>

Re: Need to understand the execution model of the Flink

Posted by Darshan Singh <da...@gmail.com>.
Thanks for reply.

I guess I am not looking for alternate. I am trying to understand what
flink does in this scenario and if 10 tasks ar egoing in parallel I am sure
they will be reading csv as there is no other way.

Thanks

On Mon, Feb 19, 2018 at 12:48 AM, Niclas Hedhman <ni...@hedhman.org> wrote:

>
> Do you really need the large single table created in step 2?
>
> If not, what you typically do is that the Csv source first do the common
> transformations. Then depending on whether the 10 outputs have different
> processing paths or not, you either do a split() to do individual
> processing depending on some criteria, or you just have the sink put each
> record in separate tables.
> You have full control, at each step along the transformation path whether
> it can be parallelized or not, and if there are no sequential constraints
> on your model, then you can easily fill all cores on all hosts quite easily.
>
> Even if you need the step 2 table, I would still just treat that as a
> split(), a branch ending in a Sink that does the storage there. No need to
> read records from file over and over again, nor to store them first in step
> 2 table and read them out again.
>
> Don't ask *me* about what happens in failure scenarios... I have myself
> not figured that out yet.
>
> HTH
> Niclas
>
> On Mon, Feb 19, 2018 at 3:11 AM, Darshan Singh <da...@gmail.com>
> wrote:
>
>> Hi I would like to understand the execution model.
>>
>> 1. I have a csv files which is say 10 GB.
>> 2. I created a table from this file.
>>
>> 3. Now I have created filtered tables on this say 10 of these.
>> 4. Now I created a writetosink for all these 10 filtered tables.
>>
>> Now my question is that are these 10 filetered tables be written in
>> parallel (suppose i have 40 cores and set up parallelism to say 40 as well.
>>
>> Next question I have is that the table which I created form the csv file
>> which is common wont be persisted by flink internally rather for all 10
>> filtered tables it will read csv files and then apply the filter and write
>> to sink.
>>
>> I think that for all 10 filtered tables it will read csv again and again
>> in this case it will be read 10 times.  Is my understanding correct or I am
>> missing something.
>>
>> What if I step 2 I change table to dataset and back?
>>
>> Thanks
>>
>
>
>
> --
> Niclas Hedhman, Software Developer
> http://polygene.apache.org - New Energy for Java
>