You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Rad Rad <ra...@gmail.com> on 2018/10/31 16:53:37 UTC

Ask about counting elements per window

Hi All, 

I have a GPS stream consumed by FlinkKafkaConsumer which contains a set of
GPSs of different users. I need to count a number of users per a specific
window of this stream. 

Could anyone help me, a part of my code is below 
 

	// read data from Kafka 
		DataStream stream = env.addSource(
				new FlinkKafkaConsumer09<>(parameterTool.getRequired("topic")
						, new JSONDeserializationSchema(), parameterTool.getProperties()));

		DataStream<Tuple7&lt;String, String,String, String, Float, Float,
Timestamp>>  gpsStream  = stream.flatMap(gpsFlatMapFunc). 


Thanks in advanced. 





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Ask about counting elements per window

Posted by Rad Rad <ra...@gmail.com>.
Thanks very much.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Ask about counting elements per window

Posted by Hequn Cheng <ch...@gmail.com>.
Hi Rad,

You can take a look at the group window[1] of SQL. I think it may help you.

Best, Hequn
[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#aggregations

On Thu, Nov 1, 2018 at 12:53 AM Rad Rad <ra...@gmail.com> wrote:

> Hi All,
>
> I have a GPS stream consumed by FlinkKafkaConsumer which contains a set of
> GPSs of different users. I need to count a number of users per a specific
> window of this stream.
>
> Could anyone help me, a part of my code is below
>
>
>         // read data from Kafka
>                 DataStream stream = env.addSource(
>                                 new
> FlinkKafkaConsumer09<>(parameterTool.getRequired("topic")
>                                                 , new
> JSONDeserializationSchema(), parameterTool.getProperties()));
>
>                 DataStream<Tuple7&lt;String, String,String, String, Float,
> Float,
> Timestamp>>  gpsStream  = stream.flatMap(gpsFlatMapFunc).
>
>
> Thanks in advanced.
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>