You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Akshay Shingote (JIRA)" <ji...@apache.org> on 2016/06/22 14:10:57 UTC
[jira] [Created] (FLINK-4101) Calculating average in Flink
DataStream on window time
Akshay Shingote created FLINK-4101:
--------------------------------------
Summary: Calculating average in Flink DataStream on window time
Key: FLINK-4101
URL: https://issues.apache.org/jira/browse/FLINK-4101
Project: Flink
Issue Type: Task
Components: DataStream API
Affects Versions: 1.0.2
Reporter: Akshay Shingote
I am using Flink DataStream API where there where racks are available & I want to calculate "average"of temperature group by rack IDs. My window duration is of 40 seconds & my window is sliding every 10 seconds...Following is my code where I am calculating sum of temperatures every 10 seconds for every rackID,but now I want to calculate average temperatures::
static Properties properties=new Properties();
public static Properties getProperties()
{
properties.setProperty("bootstrap.servers", "54.164.200.104:9092");
properties.setProperty("zookeeper.connect", "54.164.200.104:2181");
//properties.setProperty("deserializer.class", "kafka.serializer.StringEncoder");
//properties.setProperty("group.id", "akshay");
properties.setProperty("auto.offset.reset", "earliest");
return properties;
}
@SuppressWarnings("rawtypes")
public static void main(String[] args) throws Exception
{
StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
Properties props=Program.getProperties();
DataStream<TemperatureEvent> dstream=env.addSource(new FlinkKafkaConsumer09<TemperatureEvent>("TemperatureEvent", new TemperatureEventSchema(), props)).assignTimestampsAndWatermarks(new IngestionTimeExtractor<>());
DataStream<TemperatureEvent> ds1=dstream.keyBy("rackId").timeWindow(Time.seconds(40), Time.seconds(10)).sum("temperature");
env.execute("Temperature Consumer");
}
How can I calcluate average temperature for the above example ??
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)