You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Radu Prodan <ra...@gmail.com> on 2016/02/04 11:36:17 UTC

Flink writeAsCsv

Hi all,

I am new to flink. I wrote a simple program and I want it to output as csv
file.

timeWindowAll(Time.of(3, TimeUnit.MINUTES))

.apply(newFunction1())

.writeAsCsv("file:///user/someuser/Documents/somefile.csv");


When I change the sink to . print(), it works and outputs some results.

I want it to output the result of every window. However, it outputs nothing
and the file is not created. Am I missing anything?


-best

Radu

Re: Flink writeAsCsv

Posted by Fabian Hueske <fh...@gmail.com>.
You can get the end time of a window from the TimeWindow object which is
passed to the AllWindowFunction. This is basically a window ID / index.
I would go for a custom output sink which writes records to files based on
their timestamp.
IMO, this would be cleaner & easier than implementing the file output into
the window function.



2016-02-04 13:49 GMT+01:00 Radu Tudoran <ra...@huawei.com>:

> Hi Radu,
>
>
>
> It is indeed interesting to know how each window could be registered
> separately -  I am not sure it any of the existing mechanisms in Flink
> support this.
>
> I think you need to create your own output sink. It is a bit tricky to
> pass the window sequence number (actually I do  not think such an index is
> kept – but you can create one by yourself). Maybe an easier option is to
> manage the writing of the data yourself in the window function or in a
> custom created evictor. In the window and in the evictor you have access to
> all data and you can create specific files for each window triggered
>
>
>
>
>
>
>
> *From:* Radu Prodan [mailto:raduprodan6@gmail.com]
> *Sent:* Thursday, February 04, 2016 11:58 AM
> *To:* user@flink.apache.org
> *Subject:* Re: Flink writeAsCsv
>
>
>
> Hi Marton,
>
>
>
> Thanks to your comment I managed to get it worked. At least it outputs the
> results. However, what I need is to output each window result seperately.
> Now, it outputs the results of parallel working windows (I think) and
> appends the new results to them. For example, If I have parallelism of 10,
> then I will have at most 10 files and each file will grow in size as
> windows continue.
>
> What I want is, to have seperate file for a window. For example, after
> n'th window is computed output it to some file and close the file.
>
>
>
> -best
>
> Radu
>
>
>
> On Thu, Feb 4, 2016 at 11:42 AM Márton Balassi <ba...@gmail.com>
> wrote:
>
> Hey Radu,
>
>
>
> As you are using the streaming api I assume that you call env.execute() in
> both cases. Is that the case?
>
>
>
> Do you see any errors appearing? My first call would be if your data type
> is not a tuple type then writeAsCsv does not work by default.
>
>
>
> Best,
>
>
>
> Marton
>
>
>
> On Thu, Feb 4, 2016 at 11:36 AM, Radu Prodan <ra...@gmail.com>
> wrote:
>
> Hi all,
>
>
>
> I am new to flink. I wrote a simple program and I want it to output as csv
> file.
>
>
>
> timeWindowAll(Time.of(3, TimeUnit.MINUTES))
>
> .apply(newFunction1())
>
> .writeAsCsv("file:///user/someuser/Documents/somefile.csv");
>
>
>
> When I change the sink to . print(), it works and outputs some results.
>
> I want it to output the result of every window. However, it outputs
> nothing and the file is not created. Am I missing anything?
>
>
>
> -best
>
> Radu
>
>
>
>
>
>
>
>
>
>
>
>

RE: Flink writeAsCsv

Posted by Radu Tudoran <ra...@huawei.com>.
Hi Radu,

It is indeed interesting to know how each window could be registered separately -  I am not sure it any of the existing mechanisms in Flink support this.
I think you need to create your own output sink. It is a bit tricky to pass the window sequence number (actually I do  not think such an index is kept – but you can create one by yourself). Maybe an easier option is to manage the writing of the data yourself in the window function or in a custom created evictor. In the window and in the evictor you have access to all data and you can create specific files for each window triggered



From: Radu Prodan [mailto:raduprodan6@gmail.com]
Sent: Thursday, February 04, 2016 11:58 AM
To: user@flink.apache.org
Subject: Re: Flink writeAsCsv

Hi Marton,

Thanks to your comment I managed to get it worked. At least it outputs the results. However, what I need is to output each window result seperately.  Now, it outputs the results of parallel working windows (I think) and appends the new results to them. For example, If I have parallelism of 10, then I will have at most 10 files and each file will grow in size as windows continue.
What I want is, to have seperate file for a window. For example, after n'th window is computed output it to some file and close the file.

-best
Radu

On Thu, Feb 4, 2016 at 11:42 AM Márton Balassi <ba...@gmail.com>> wrote:
Hey Radu,

As you are using the streaming api I assume that you call env.execute() in both cases. Is that the case?

Do you see any errors appearing? My first call would be if your data type is not a tuple type then writeAsCsv does not work by default.

Best,

Marton

On Thu, Feb 4, 2016 at 11:36 AM, Radu Prodan <ra...@gmail.com>> wrote:
Hi all,

I am new to flink. I wrote a simple program and I want it to output as csv file.


timeWindowAll(Time.of(3, TimeUnit.MINUTES))

.apply(newFunction1())

.writeAsCsv("file:///user/someuser/Documents/somefile.csv<file:///\\user\someuser\Documents\somefile.csv>");



When I change the sink to . print(), it works and outputs some results.

I want it to output the result of every window. However, it outputs nothing and the file is not created. Am I missing anything?



-best

Radu








Re: Flink writeAsCsv

Posted by Radu Prodan <ra...@gmail.com>.
Hi Marton,

Thanks to your comment I managed to get it worked. At least it outputs the
results. However, what I need is to output each window result seperately.
Now, it outputs the results of parallel working windows (I think) and
appends the new results to them. For example, If I have parallelism of 10,
then I will have at most 10 files and each file will grow in size as
windows continue.
What I want is, to have seperate file for a window. For example, after n'th
window is computed output it to some file and close the file.

-best
Radu

On Thu, Feb 4, 2016 at 11:42 AM Márton Balassi <ba...@gmail.com>
wrote:

> Hey Radu,
>
> As you are using the streaming api I assume that you call env.execute() in
> both cases. Is that the case?
>
> Do you see any errors appearing? My first call would be if your data type
> is not a tuple type then writeAsCsv does not work by default.
>
> Best,
>
> Marton
>
> On Thu, Feb 4, 2016 at 11:36 AM, Radu Prodan <ra...@gmail.com>
> wrote:
>
>> Hi all,
>>
>> I am new to flink. I wrote a simple program and I want it to output as
>> csv file.
>>
>> timeWindowAll(Time.of(3, TimeUnit.MINUTES))
>>
>> .apply(newFunction1())
>>
>> .writeAsCsv("file:///user/someuser/Documents/somefile.csv");
>>
>>
>> When I change the sink to . print(), it works and outputs some results.
>>
>> I want it to output the result of every window. However, it outputs
>> nothing and the file is not created. Am I missing anything?
>>
>>
>> -best
>>
>> Radu
>>
>>
>>
>>
>>
>>
>

Re: Flink writeAsCsv

Posted by Márton Balassi <ba...@gmail.com>.
Hey Radu,

As you are using the streaming api I assume that you call env.execute() in
both cases. Is that the case?

Do you see any errors appearing? My first call would be if your data type
is not a tuple type then writeAsCsv does not work by default.

Best,

Marton

On Thu, Feb 4, 2016 at 11:36 AM, Radu Prodan <ra...@gmail.com> wrote:

> Hi all,
>
> I am new to flink. I wrote a simple program and I want it to output as csv
> file.
>
> timeWindowAll(Time.of(3, TimeUnit.MINUTES))
>
> .apply(newFunction1())
>
> .writeAsCsv("file:///user/someuser/Documents/somefile.csv");
>
>
> When I change the sink to . print(), it works and outputs some results.
>
> I want it to output the result of every window. However, it outputs
> nothing and the file is not created. Am I missing anything?
>
>
> -best
>
> Radu
>
>
>
>
>
>