You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Samir Vasani <sa...@gmail.com> on 2021/09/01 09:55:45 UTC

Re: Move already processed file from one folder to another folder in flink

Hi

I did not understand why you are using table when we are working on a
program?

On Mon, Jul 26, 2021, 7:20 AM Caizhi Weng <ts...@gmail.com> wrote:

> Hi!
>
> For the UDF solution, you can add a "file name" column to your csv file
> like this:
> id,value,filename
> 1,100,
> 2,200,
> 3,300,test.csv
>
> Only the filename of the last record of the csv file is filled, so that
> this indicates the end of file.
>
> Then write a UDF like this:
>
> public class MyUDF extends ScalarFunction {
>   public String eval(String filename) {
>     if (filename != null && filename.length > 0) {
>       // do the file renaming here
>     }
>     return filename
>   }
> }
>
> Suppose your original SQL is like
> SELECT id, value FROM csvSource;
>
> Then you can change your SQL to be
> SELECT id, value, MyUDF(filename) FROM csvSource;
>
> This will, of course, add in a "useless" column and you'll have to deal
> with it in your result file.
>
> For more on UDF, see
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/functions/udfs/
>
> Samir Vasani <sa...@gmail.com> 于2021年7月24日周六 下午9:24写道:
>
>> Hi,
>> Let me know if you have any idea as this is very critical for my project.
>>
>> Thanks & Regards,
>> Samir Vasani
>>
>>
>>
>> On Fri, Jul 23, 2021 at 1:26 PM Samir Vasani <sa...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> Can you elaborate more on UDF as I did not understand it.
>>>
>>> Thanks & Regards,
>>> Samir Vasani
>>>
>>>
>>>
>>> On Fri, Jul 23, 2021 at 1:22 PM Caizhi Weng <ts...@gmail.com>
>>> wrote:
>>>
>>>> Hi!
>>>>
>>>> In this case it won't work, as JobListener#onJobExecuted will only be
>>>> called when the job finishes, successfully or unsuccessfully.
>>>>
>>>> For a forever-running job I would suggest adding a UDF right after the
>>>> source and adding a special "EOF" record in each of the csv file. This UDF
>>>> monitors the data flowing through it, and if it gets the EOF record it
>>>> moves the file.
>>>>
>>>> Samir Vasani <sa...@gmail.com> 于2021年7月23日周五 下午3:44写道:
>>>>
>>>>> Hi Caizhi Weng,
>>>>>
>>>>> Thanks for your input.
>>>>> I would explain the requirement in little more detail.
>>>>> Flink pipeline will be running forever (until some issue happens and
>>>>> we would need to restart) so It will continuously monitor if a new file
>>>>> comes to the *input *folder or not.
>>>>> In this case will your suggestion work?
>>>>>
>>>>>
>>>>> Thanks & Regards,
>>>>> Samir Vasani
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Jul 23, 2021 at 1:07 PM Caizhi Weng <ts...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi!
>>>>>>
>>>>>> JobListener#onJobExecuted might help, if your job is not a
>>>>>> forever-running streaming job. See
>>>>>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/core/execution/JobListener.html
>>>>>>
>>>>>> Samir Vasani <sa...@gmail.com> 于2021年7月23日周五 下午3:22写道:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I am a new bee to flink and facing some challenges to solve below
>>>>>>> use case
>>>>>>>
>>>>>>> Use Case description:
>>>>>>>
>>>>>>> I will receive a csv file with a timestamp on every single day in
>>>>>>> some folder say *input*.The file format would be
>>>>>>> *file_name_dd-mm-yy-hh-mm-ss.csv*.
>>>>>>>
>>>>>>> Now my flink pipeline will read this csv file in a row by row
>>>>>>> fashion and it will be written to my Kafka topic.
>>>>>>>
>>>>>>> Once the pipeline reads the entire file then this file needs to be
>>>>>>> moved to another folder say *historic* so that i can keep *input * folder
>>>>>>> empty for the new file.
>>>>>>>
>>>>>>> I googled a lot but did not find anything so can you guide me to
>>>>>>> achieve this.
>>>>>>>
>>>>>>> Let me know if anything else is required.
>>>>>>>
>>>>>>>
>>>>>>> Samir Vasani
>>>>>>>
>>>>>>