You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by subash basnet <ya...@gmail.com> on 2016/04/27 16:14:05 UTC

Unable to write stream as csv

Hello all,

I am able to write the Wikipedia edit data to the kafka and as a text file
as per the given example of WikipediaAnalysis. But when I try to write it
as csv, the blank files initially created never gets filled with data.
Below is the code:

DataStream<Tuple2<String, Long>> result =
keyedEdits.timeWindow(Time.seconds(5)).fold(new Tuple2<>("", 0L),
new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> fold(Tuple2<String, Long> acc,
WikipediaEditEvent event) {
acc.f0 = event.getUser();
acc.f1 += event.getByteDiff();
return acc;
}
});

*result.writeAsText("file:///home/softwares/flink-1.0.0/kmeans/streaming/result",
FileSystem.WriteMode.OVERWRITE); *-------------------------> works

*result.writeAsCsv("file:///home/softwares/flink-1.0.0/kmeans/streaming/result",
FileSystem.WriteMode.OVERWRITE);* --------------------------> doesn't work

Why is data getting written to file as text but not as csv?

Best Regards,
Subash Basnet

Re: Unable to write stream as csv

Posted by Aljoscha Krettek <al...@apache.org>.
I think there is a problem with the interaction of legacy OutputFormats and
streaming programs. Flush is not called, the CsvOutputFormat only writes in
flush(), therefore we don't see any results.

On Mon, 2 May 2016 at 11:59 Fabian Hueske <fh...@gmail.com> wrote:

> Have you checked the log files as well?
>
> 2016-05-01 14:07 GMT+02:00 subash basnet <ya...@gmail.com>:
>
>> Hello there,
>>
>> If anyone could help me know why the below *result* DataStream get's
>> written as text, but not as csv?. As it's in a tuple format I guess it
>> should be the same for both text and csv. It shows no error just simply
>> doesn't write to file when result is written as csv.
>>
>> DataStream<Tuple2<String, Long>> *result* =
>> keyedEdits.timeWindow(Time.seconds(5)).fold(new Tuple2<>("", 0L),
>> new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() {
>> @Override
>> public Tuple2<String, Long> fold(Tuple2<String, Long> acc,
>> WikipediaEditEvent event) {
>> acc.f0 = event.getUser();
>> acc.f1 += event.getByteDiff();
>> return acc;
>> }
>> });
>>
>> *result.writeAsText(.....);
>> ----------------------------------------------------------------------> It
>> is working. **result.writeAsCsv(.....);
>> -----------------------------------------------------------------------> It
>> is not working. *
>>
>> Best Regards,
>> Subash Basnet
>>
>> On Wed, Apr 27, 2016 at 4:14 PM, subash basnet <ya...@gmail.com>
>> wrote:
>>
>>> Hello all,
>>>
>>> I am able to write the Wikipedia edit data to the kafka and as a text
>>> file as per the given example of WikipediaAnalysis. But when I try to write
>>> it as csv, the blank files initially created never gets filled with data.
>>> Below is the code:
>>>
>>> DataStream<Tuple2<String, Long>> result =
>>> keyedEdits.timeWindow(Time.seconds(5)).fold(new Tuple2<>("", 0L),
>>> new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() {
>>> @Override
>>> public Tuple2<String, Long> fold(Tuple2<String, Long> acc,
>>> WikipediaEditEvent event) {
>>> acc.f0 = event.getUser();
>>> acc.f1 += event.getByteDiff();
>>> return acc;
>>> }
>>> });
>>>
>>> *result.writeAsText("file:///home/softwares/flink-1.0.0/kmeans/streaming/result",
>>> FileSystem.WriteMode.OVERWRITE); *-------------------------> works
>>>
>>> *result.writeAsCsv("file:///home/softwares/flink-1.0.0/kmeans/streaming/result",
>>> FileSystem.WriteMode.OVERWRITE);* --------------------------> doesn't
>>> work
>>>
>>> Why is data getting written to file as text but not as csv?
>>>
>>> Best Regards,
>>> Subash Basnet
>>>
>>>
>>
>

Re: Unable to write stream as csv

Posted by Fabian Hueske <fh...@gmail.com>.
Have you checked the log files as well?

2016-05-01 14:07 GMT+02:00 subash basnet <ya...@gmail.com>:

> Hello there,
>
> If anyone could help me know why the below *result* DataStream get's
> written as text, but not as csv?. As it's in a tuple format I guess it
> should be the same for both text and csv. It shows no error just simply
> doesn't write to file when result is written as csv.
>
> DataStream<Tuple2<String, Long>> *result* =
> keyedEdits.timeWindow(Time.seconds(5)).fold(new Tuple2<>("", 0L),
> new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() {
> @Override
> public Tuple2<String, Long> fold(Tuple2<String, Long> acc,
> WikipediaEditEvent event) {
> acc.f0 = event.getUser();
> acc.f1 += event.getByteDiff();
> return acc;
> }
> });
>
> *result.writeAsText(.....);
> ----------------------------------------------------------------------> It
> is working. **result.writeAsCsv(.....);
> -----------------------------------------------------------------------> It
> is not working. *
>
> Best Regards,
> Subash Basnet
>
> On Wed, Apr 27, 2016 at 4:14 PM, subash basnet <ya...@gmail.com> wrote:
>
>> Hello all,
>>
>> I am able to write the Wikipedia edit data to the kafka and as a text
>> file as per the given example of WikipediaAnalysis. But when I try to write
>> it as csv, the blank files initially created never gets filled with data.
>> Below is the code:
>>
>> DataStream<Tuple2<String, Long>> result =
>> keyedEdits.timeWindow(Time.seconds(5)).fold(new Tuple2<>("", 0L),
>> new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() {
>> @Override
>> public Tuple2<String, Long> fold(Tuple2<String, Long> acc,
>> WikipediaEditEvent event) {
>> acc.f0 = event.getUser();
>> acc.f1 += event.getByteDiff();
>> return acc;
>> }
>> });
>>
>> *result.writeAsText("file:///home/softwares/flink-1.0.0/kmeans/streaming/result",
>> FileSystem.WriteMode.OVERWRITE); *-------------------------> works
>>
>> *result.writeAsCsv("file:///home/softwares/flink-1.0.0/kmeans/streaming/result",
>> FileSystem.WriteMode.OVERWRITE);* --------------------------> doesn't
>> work
>>
>> Why is data getting written to file as text but not as csv?
>>
>> Best Regards,
>> Subash Basnet
>>
>>
>

Re: Unable to write stream as csv

Posted by subash basnet <ya...@gmail.com>.
Hello there,

If anyone could help me know why the below *result* DataStream get's
written as text, but not as csv?. As it's in a tuple format I guess it
should be the same for both text and csv. It shows no error just simply
doesn't write to file when result is written as csv.

DataStream<Tuple2<String, Long>> *result* =
keyedEdits.timeWindow(Time.seconds(5)).fold(new Tuple2<>("", 0L),
new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> fold(Tuple2<String, Long> acc,
WikipediaEditEvent event) {
acc.f0 = event.getUser();
acc.f1 += event.getByteDiff();
return acc;
}
});

*result.writeAsText(.....);
----------------------------------------------------------------------> It
is working. **result.writeAsCsv(.....);
-----------------------------------------------------------------------> It
is not working. *

Best Regards,
Subash Basnet

On Wed, Apr 27, 2016 at 4:14 PM, subash basnet <ya...@gmail.com> wrote:

> Hello all,
>
> I am able to write the Wikipedia edit data to the kafka and as a text file
> as per the given example of WikipediaAnalysis. But when I try to write it
> as csv, the blank files initially created never gets filled with data.
> Below is the code:
>
> DataStream<Tuple2<String, Long>> result =
> keyedEdits.timeWindow(Time.seconds(5)).fold(new Tuple2<>("", 0L),
> new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() {
> @Override
> public Tuple2<String, Long> fold(Tuple2<String, Long> acc,
> WikipediaEditEvent event) {
> acc.f0 = event.getUser();
> acc.f1 += event.getByteDiff();
> return acc;
> }
> });
>
> *result.writeAsText("file:///home/softwares/flink-1.0.0/kmeans/streaming/result",
> FileSystem.WriteMode.OVERWRITE); *-------------------------> works
>
> *result.writeAsCsv("file:///home/softwares/flink-1.0.0/kmeans/streaming/result",
> FileSystem.WriteMode.OVERWRITE);* --------------------------> doesn't work
>
> Why is data getting written to file as text but not as csv?
>
> Best Regards,
> Subash Basnet
>
>