You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Telco Phone <te...@yahoo.com> on 2017/10/30 15:56:12 UTC

count and window question with kafka

I have a process that will take 250,000 records from kafka and produce a file. (Using a CustomFileSync) 
Currently I just have the following:

DataStream<SchemaRecord> stream =env.addSource(new FlinkKafkaConsumer010<SchemaRecord>("topic"", schema, properties)).setParallelism(40).flatMap(new SchemaRecordSplit()).setParallelism(40).name("Splitter").keyBy("partition", "randomkey", "schemaId");
stream.addSink(new CustomFileSystemSink()).setParallelism(40);

In my CustomFileSystemSink I have a for..next loop which closes the file off at 250K rows.

What I am looking to do is to close off the file every 5 min OR 250K rows...

As I read the window types is it possible to read from kafka and have the sink close every 5 min OR 250K rows ?
Hope this makes sense....




Re: count and window question with kafka

Posted by Tony Wei <to...@gmail.com>.
Hi,

I think ProcessFunction[1] is what you want. You can add it after keyBy and
emit the result to sink after timeout or buffer filled.
The reference has a good example that show you how to use it.

Best Regards,
Tony Wei

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/process_function.html


2017-10-30 23:56 GMT+08:00 Telco Phone <te...@yahoo.com>:

> I have a process that will take 250,000 records from kafka and produce a
> file. (Using a CustomFileSync)
>
> Currently I just have the following:
>
>
> DataStream<SchemaRecord> stream =
> env.addSource(new FlinkKafkaConsumer010<SchemaRecord>("topic"", schema,
> properties)).setParallelism(40).flatMap(new SchemaRecordSplit()).
> setParallelism(40).name("Splitter").keyBy("partition", "randomkey",
> "schemaId");
>
> stream.addSink(new CustomFileSystemSink()).setParallelism(40);
>
>
> In my CustomFileSystemSink I have a for..next loop which closes the file
> off at 250K rows.
>
>
> What I am looking to do is to close off the file every 5 min OR 250K
> rows...
>
>
> As I read the window types is it possible to read from kafka and have the
> sink close every 5 min OR 250K rows ?
>
> Hope this makes sense....
>
>
>
>
>