You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Puneet Kinra <pu...@customercentria.com> on 2018/03/22 08:17:30 UTC

Query regarding to CountinousFileMonitoring operator

if i set parallelsim equals to 1 still it create multiple splits while
processing.

-- 
*Cheers *

*Puneet Kinra*

*Mobile:+918800167808 | Skype : puneet.kinra@customercentria.com
<pu...@customercentria.com>*

*e-mail :puneet.kinra@customercentria.com
<pu...@customercentria.com>*

Re: Query regarding to CountinousFileMonitoring operator

Posted by Puneet Kinra <pu...@customercentria.com>.
Hi Kostas

Thanks for the reply, Yep i am planning to implement the same.





On Mon, Mar 26, 2018 at 7:53 PM, Kostas Kloudas <k.kloudas@data-artisans.com
> wrote:

> Hi Puneet,
>
> If you mean that after processing a file, you want to move it to another
> directory outside the one containing
> the data to be processed, then I am afraid that this is currently not
> possible. This is because the whole logic
> of how to treat files is included in your FileInputFormat.
>
> It may be possible to do it if you implement your custom FileInputFormat
> that creates the splits, moves the file,
> and modifies the splits to point to the new location of the file before
> shipping them downstream to be read
> (but I have not done it).
>
> Keep in mind that if you do not change the contents of the file, then it
> will not be reprocessed.
>
> Cheers,
> Kostas
>
> On Mar 26, 2018, at 12:18 PM, Puneet Kinra <puneet.kinra@customercentria.
> com> wrote:
>
> Hi Timo
>                 FileInputFormat fileInputFormat = new TextInputFormat(new
> Path(fileSystem+this.path));
> fileInputFormat.setNestedFileEnumeration(true);
> fileInputFormat.setFilesFilter(new UnicaFileFilter(".csv"));
> DataStream<String>value =this.execEnv.readFile(fileInputFormat,
> fileSystem+this.path,
> FileProcessingMode.PROCESS_CONTINUOUSLY, 2L).setParrallelism(1);
>
> *1) Now if i set parallelism equals to 1 the file get sequentially
> processed .*
> *2) Modify splits **are being processed on the same task manger
> sequentially.*
> *3) I want to move the files after being processed.(How to achieve this) .*
>
>
>
>
> On Mon, Mar 26, 2018 at 3:27 PM, Timo Walther <tw...@apache.org> wrote:
>
>> Hi Puneet,
>>
>> can you share a little code example with us? I could not reproduce your
>> problem.
>>
>> You have to keep in mind that a setParallelism() only affects the last
>> operation. If you want to change the default parallelism of the entire
>> pipeline, you have to change it in StreamExecutionEnvironment. Otherwise
>> every following operator will again have the full parallelism which leads
>> to a shuffle operation after your source.
>>
>> I hope this helps.
>>
>> Regards,
>> Timo
>>
>>
>> Am 22.03.18 um 09:17 schrieb Puneet Kinra:
>>
>> if i set parallelsim equals to 1 still it create multiple splits while
>> processing.
>>
>> --
>> *Cheers *
>>
>> *Puneet Kinra*
>>
>> *Mobile:+918800167808 | Skype : puneet.kinra@customercentria.com
>> <pu...@customercentria.com>*
>>
>> *e-mail :puneet.kinra@customercentria.com
>> <pu...@customercentria.com>*
>>
>>
>>
>>
>
>
> --
> *Cheers *
>
> *Puneet Kinra*
>
> *Mobile:+918800167808 | Skype : puneet.kinra@customercentria.com
> <pu...@customercentria.com>*
>
> *e-mail :puneet.kinra@customercentria.com
> <pu...@customercentria.com>*
>
>
>
>


-- 
*Cheers *

*Puneet Kinra*

*Mobile:+918800167808 | Skype : puneet.kinra@customercentria.com
<pu...@customercentria.com>*

*e-mail :puneet.kinra@customercentria.com
<pu...@customercentria.com>*

Re: Query regarding to CountinousFileMonitoring operator

Posted by Kostas Kloudas <k....@data-artisans.com>.
Hi Puneet,

If you mean that after processing a file, you want to move it to another directory outside the one containing 
the data to be processed, then I am afraid that this is currently not possible. This is because the whole logic 
of how to treat files is included in your FileInputFormat. 

It may be possible to do it if you implement your custom FileInputFormat that creates the splits, moves the file, 
and modifies the splits to point to the new location of the file before shipping them downstream to be read 
(but I have not done it).

Keep in mind that if you do not change the contents of the file, then it will not be reprocessed.

Cheers,
Kostas

> On Mar 26, 2018, at 12:18 PM, Puneet Kinra <pu...@customercentria.com> wrote:
> 
> Hi Timo
>                 FileInputFormat fileInputFormat = new TextInputFormat(new Path(fileSystem+this.path));
> 		fileInputFormat.setNestedFileEnumeration(true);
> 		fileInputFormat.setFilesFilter(new UnicaFileFilter(".csv"));
> 		DataStream<String>value =this.execEnv.readFile(fileInputFormat,
> 				fileSystem+this.path,
> 				FileProcessingMode.PROCESS_CONTINUOUSLY, 2L).setParrallelism(1);		
> 
> 1) Now if i set parallelism equals to 1 the file get sequentially processed .
> 2) Modify splits are being processed on the same task manger sequentially.
> 3) I want to move the files after being processed.(How to achieve this) .
> 
> 
> 
> 
> On Mon, Mar 26, 2018 at 3:27 PM, Timo Walther <twalthr@apache.org <ma...@apache.org>> wrote:
> Hi Puneet,
> 
> can you share a little code example with us? I could not reproduce your problem.
> 
> You have to keep in mind that a setParallelism() only affects the last operation. If you want to change the default parallelism of the entire pipeline, you have to change it in StreamExecutionEnvironment. Otherwise every following operator will again have the full parallelism which leads to a shuffle operation after your source.
> 
> I hope this helps.
> 
> Regards,
> Timo
> 
> 
> Am 22.03.18 um 09:17 schrieb Puneet Kinra:
>> if i set parallelsim equals to 1 still it create multiple splits while processing.
>> 
>> -- 
>> Cheers 
>> 
>> Puneet Kinra
>> 
>> Mobile:+918800167808 | Skype : puneet.kinra@customercentria.com <ma...@customercentria.com>
>> e-mail :puneet.kinra@customercentria.com <ma...@customercentria.com>
>> 
> 
> 
> 
> 
> -- 
> Cheers 
> 
> Puneet Kinra
> 
> Mobile:+918800167808 | Skype : puneet.kinra@customercentria.com <ma...@customercentria.com>
> e-mail :puneet.kinra@customercentria.com <ma...@customercentria.com>
> 


Re: Query regarding to CountinousFileMonitoring operator

Posted by Puneet Kinra <pu...@customercentria.com>.
Hi Timo
                FileInputFormat fileInputFormat = new TextInputFormat(new
Path(fileSystem+this.path));
fileInputFormat.setNestedFileEnumeration(true);
fileInputFormat.setFilesFilter(new UnicaFileFilter(".csv"));
DataStream<String>value =this.execEnv.readFile(fileInputFormat,
fileSystem+this.path,
FileProcessingMode.PROCESS_CONTINUOUSLY, 2L).setParrallelism(1);

*1) Now if i set parallelism equals to 1 the file get sequentially
processed .*
*2) Modify splits **are being processed on the same task manger
sequentially.*
*3) I want to move the files after being processed.(How to achieve this) .*




On Mon, Mar 26, 2018 at 3:27 PM, Timo Walther <tw...@apache.org> wrote:

> Hi Puneet,
>
> can you share a little code example with us? I could not reproduce your
> problem.
>
> You have to keep in mind that a setParallelism() only affects the last
> operation. If you want to change the default parallelism of the entire
> pipeline, you have to change it in StreamExecutionEnvironment. Otherwise
> every following operator will again have the full parallelism which leads
> to a shuffle operation after your source.
>
> I hope this helps.
>
> Regards,
> Timo
>
>
> Am 22.03.18 um 09:17 schrieb Puneet Kinra:
>
> if i set parallelsim equals to 1 still it create multiple splits while
> processing.
>
> --
> *Cheers *
>
> *Puneet Kinra*
>
> *Mobile:+918800167808 | Skype : puneet.kinra@customercentria.com
> <pu...@customercentria.com>*
>
> *e-mail :puneet.kinra@customercentria.com
> <pu...@customercentria.com>*
>
>
>
>


-- 
*Cheers *

*Puneet Kinra*

*Mobile:+918800167808 | Skype : puneet.kinra@customercentria.com
<pu...@customercentria.com>*

*e-mail :puneet.kinra@customercentria.com
<pu...@customercentria.com>*

Re: Query regarding to CountinousFileMonitoring operator

Posted by Timo Walther <tw...@apache.org>.
Hi Puneet,

can you share a little code example with us? I could not reproduce your 
problem.

You have to keep in mind that a setParallelism() only affects the last 
operation. If you want to change the default parallelism of the entire 
pipeline, you have to change it in StreamExecutionEnvironment. Otherwise 
every following operator will again have the full parallelism which 
leads to a shuffle operation after your source.

I hope this helps.

Regards,
Timo


Am 22.03.18 um 09:17 schrieb Puneet Kinra:
> if i set parallelsim equals to 1 still it create multiple splits while 
> processing.
>
> -- 
> *Cheers *
> *
> *
> *Puneet Kinra*
> *
> *
>
> *Mobile:+918800167808 | Skype : puneet.kinra@customercentria.com 
> <ma...@customercentria.com>*
>
> *e-mail :puneet.kinra@customercentria.com 
> <ma...@customercentria.com>*
>
>