You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by geoff halmo <ge...@gmail.com> on 2018/02/02 17:32:35 UTC

Flink not writing last few elements to disk

Hi Flink community:

I am testing Flink but can't write the final(18 or so elements out to disk)

Setup:
Using NYC yellow taxi from data 2017-09.csv, I sorted the data on
pickup_datetime in bash. I am working in event time.

Skeleton program:
val ds = senv.readFile(input_format, input_path,
FileProcessMode.PROCESS_CONTINUOUSLY, 1000)

ds.flatMap(row => parse(row)
.assignAscendingTimestamps( _.datetime)
.timeWindowAll(Time.hours(1))
.process( new MyProcessAllWIndowFunction() )
.writeCsv

Issue:
The last line is a half line:
tail -n1 output.csv
1506553200000,2017-09-27T:19:00-4:00[user@computer]

When I use .print instead of .writeCsv, the last line on console is
1506826800000,2017-09-30T23:00-400[America/New_York],21353

Re: Flink not writing last few elements to disk

Posted by Piotr Nowojski <pi...@data-artisans.com>.
Hi,

FileProcessMode.PROCESS_CONTINUOUSLY processes the file continuously - the stream will not end. 

Simple `writeAsCsv(…)` on the other hand only flushes the output file on a stream end (see `OutputFormatSinkFunction`).

You can either use `PROCESS_ONCE` mode or use more advanced data sink:
- BucketingSink
- re-use `writeAsCsv(…)` code by extending OutputFormatSinkFunction and implementing `CheckpointedFunction` to flush on snapshots (for at-least-once)
- write your own sink by extending `TwoPhaseCommitSinkFunction` (to support `exactly-once`)

Piotrek

> On 2 Feb 2018, at 18:32, geoff halmo <ge...@gmail.com> wrote:
> 
> Hi Flink community:
> 
> I am testing Flink but can't write the final(18 or so elements out to disk)
> 
> Setup:
> Using NYC yellow taxi from data 2017-09.csv, I sorted the data on
> pickup_datetime in bash. I am working in event time.
> 
> Skeleton program:
> val ds = senv.readFile(input_format, input_path,
> FileProcessMode.PROCESS_CONTINUOUSLY, 1000)
> 
> ds.flatMap(row => parse(row)
> .assignAscendingTimestamps( _.datetime)
> .timeWindowAll(Time.hours(1))
> .process( new MyProcessAllWIndowFunction() )
> .writeCsv
> 
> Issue:
> The last line is a half line:
> tail -n1 output.csv
> 1506553200000,2017-09-27T:19:00-4:00[user@computer]
> 
> When I use .print instead of .writeCsv, the last line on console is
> 1506826800000,2017-09-30T23:00-400[America/New_York],21353