You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 张作峰 <ma...@zhangzuofeng.cn> on 2019/03/05 05:16:07 UTC

如何每五分钟统计一次当天某个消息的总条数

大家好!
请教下诸位大牛,如何使用stream api每五分钟统计一次当天某个消息的总条数?
谢谢!

Re: 如何每五分钟统计一次当天某个消息的总条数

Posted by 戴嘉诚 <a7...@gmail.com>.
当天的,就直接是翻滚窗口就行了吧,不过你要注意你一天量有多大,小心内存不够了

张作峰 <ma...@zhangzuofeng.cn>于2019年3月5日 周二15:06写道:

> 设置event time 窗口为一天,是滑动窗口吗?具体是指?需要统计的是当天的
>
> ------------------
> 张作峰
> 创维 一体机软件开发部
>
> 深圳市南山区高新南一道创维大厦A座12楼
> 手机: 18320872958 座机: 0755-26974350(分机号 4350)
> Email:mail@zhangzuofeng.cn
> 主页:http://www.zhangzuofeng.cn
> wiki: http://wiki.qiannuo.me
>
>
>
>
>
>
>
>
>
> ------------------ 原始邮件 ------------------
> 发件人: "Paul Lam"<pa...@gmail.com>;
> 发送时间: 2019年3月5日(星期二) 下午2:46
> 收件人: "user-zh"<us...@flink.apache.org>;
> 主题: Re: 如何每五分钟统计一次当天某个消息的总条数
>
>
>
> Hi,
>
> 你可以试下设置 event time 窗口为一天,然后设置 processing time timer 来定时每 5 分钟触发输出当天最新的结果。
>
> Best,
> Paul Lam
>
> > 在 2019年3月5日,13:16,张作峰 <ma...@zhangzuofeng.cn> 写道:
> >
> > 大家好!
> > 请教下诸位大牛,如何使用stream api每五分钟统计一次当天某个消息的总条数?
> > 谢谢!

回复: 如何每五分钟统计一次当天某个消息的总条数

Posted by 张作峰 <ma...@zhangzuofeng.cn>.
设置event time 窗口为一天,是滑动窗口吗?具体是指?需要统计的是当天的
 
------------------
张作峰 
创维 一体机软件开发部

深圳市南山区高新南一道创维大厦A座12楼
手机: 18320872958 座机: 0755-26974350(分机号 4350)
Email:mail@zhangzuofeng.cn
主页:http://www.zhangzuofeng.cn
wiki: http://wiki.qiannuo.me




 




------------------ 原始邮件 ------------------
发件人: "Paul Lam"<pa...@gmail.com>; 
发送时间: 2019年3月5日(星期二) 下午2:46
收件人: "user-zh"<us...@flink.apache.org>; 
主题: Re: 如何每五分钟统计一次当天某个消息的总条数



Hi,

你可以试下设置 event time 窗口为一天,然后设置 processing time timer 来定时每 5 分钟触发输出当天最新的结果。

Best,
Paul Lam

> 在 2019年3月5日,13:16,张作峰 <ma...@zhangzuofeng.cn> 写道:
> 
> 大家好!
> 请教下诸位大牛,如何使用stream api每五分钟统计一次当天某个消息的总条数?
> 谢谢!

Re: 如何每五分钟统计一次当天某个消息的总条数

Posted by Paul Lam <pa...@gmail.com>.
Hi,

你可以试下设置 event time 窗口为一天,然后设置 processing time timer 来定时每 5 分钟触发输出当天最新的结果。

Best,
Paul Lam

> 在 2019年3月5日,13:16,张作峰 <ma...@zhangzuofeng.cn> 写道:
> 
> 大家好!
> 请教下诸位大牛,如何使用stream api每五分钟统计一次当天某个消息的总条数?
> 谢谢!


Re: 如何每五分钟统计一次当天某个消息的总条数

Posted by 张作峰 <ma...@zhangzuofeng.cn>.
streamOperator
        .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<EventItem>() {
            @Override
            public long extractAscendingTimestamp(EventItem eventItem) {
                return eventItem.getWindowEnd();
            }
        })
        .map(eventItem -> Tuple2.of(eventItem.getItemId(), 1L))
        .keyBy(1)
        .timeWindow(Time.minutes(5))
        .aggregate(new AccumulatorAggregateFunction<>(), (WindowFunction<Long, EventItem, Tuple, TimeWindow>) (key, timeWindow, iterable, collector) -> {
            String newId = ((Tuple1<String>) key).f0;
            Long count = iterable.iterator().next();
            collector.collect(EventItem.of(newId, timeWindow.getEnd(), count));
        })
        .keyBy(1)
        .process(new KeyedProcessFunction<Tuple, EventItem, Tuple2<String, Long>>() {
            private MapState<String, Long> itemState;
            private ValueState<Long> dayState;

            @Override
            public void open(Configuration parameters) throws Exception {
                MapStateDescriptor<String, Long> mapStateDescriptor = new MapStateDescriptor<>("ei_pv", TypeInformation.of(String.class), TypeInformation.of(Long.class));
                itemState = getRuntimeContext().getMapState(mapStateDescriptor);
                dayState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("day_state", TypeInformation.of(Long.class)));
                dayState.update((long) currentDay(System.currentTimeMillis()));
            }

            private int currentDay(long epochDay) {
                return LocalDate.ofEpochDay(epochDay).getDayOfYear();
            }

            @Override
            public void processElement(EventItem input, Context context, Collector<Tuple2<String, Long>> collector) throws Exception {
                String ei = input.getItemId();
                Long cnt = itemState.get(ei);
                long viewCount = input.getViewCount();
                cnt = cnt != null ? cnt + viewCount : viewCount;
                itemState.put(ei, cnt);
                context.timerService().registerEventTimeTimer(input.getWindowEnd() + 5000);
            }
            @Override
            public void onTimer(long time, OnTimerContext ctx, Collector<Tuple2<String, Long>> out) throws Exception {
                int currentDay = currentDay(time);
                boolean isCurrentDay = currentDay == dayState.value();
                if (!isCurrentDay) {
                    itemState.clear();
                    dayState.update((long) currentDay);
                }
                for (Map.Entry<String, Long> entry : itemState.entries()) {
                    out.collect(Tuple2.of(entry.getKey(), entry.getValue()));
                }
            }
        })
        .addSink(textLongSink);

 这样有没有问题?




------------------ Original ------------------
From:  "刘 文"<th...@yahoo.com.INVALID>;
Date:  Tue, Mar 5, 2019 01:32 PM
To:  "user-zh"<us...@flink.apache.org>;

Subject:  Re: 如何每五分钟统计一次当天某个消息的总条数




处理这个问题,我有一些想法:

).Flink Stream默认是处理增量数据,对指定间隔时间或数量内的数据进行分析
).可以自定义 ProcessAllWindowFunction,相当于,对于一个Window的数据,自己实现处理逻辑,参数是在Window之前的operator也是已经处理的
).对于你,需要存储每次计算的结果,并更新到存储中心供每次计算使用(如Redis、等)
).下面是一个ProcessAllWIndowFunction的示例,供参考(实现功能: WordCount 程序(增量按单词升序排序)  )







package com.opensourceteams.module.bigdata.flink.example.stream.worldcount.nc.sort

import java.time.ZoneId

import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.function.ProcessAllWindowFunction
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.fs.bucketing.{BucketingSink, DateTimeBucketer}
import org.apache.flink.util.Collector

import scala.collection.mutable

/**
  * nc -lk 1234  输入数据
  */
object SocketWindowWordCountLocalSinkHDFSAndWindowAllAndSorted {


  def getConfiguration(isDebug:Boolean = false):Configuration={

    val configuration : Configuration = new Configuration()

    if(isDebug){
      val timeout = "100000 s"
      val timeoutHeartbeatPause = "1000000 s"
      configuration.setString("akka.ask.timeout",timeout)
      configuration.setString("akka.lookup.timeout",timeout)
      configuration.setString("akka.tcp.timeout",timeout)
      configuration.setString("akka.transport.heartbeat.interval",timeout)
      configuration.setString("akka.transport.heartbeat.pause",timeoutHeartbeatPause)
      configuration.setString("akka.watch.heartbeat.pause",timeout)
      configuration.setInteger("heartbeat.interval",10000000)
      configuration.setInteger("heartbeat.timeout",50000000)
    }


    configuration
  }

  def main(args: Array[String]): Unit = {


    val port = 1234
    // get the execution environment
   // val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment


    val configuration : Configuration = getConfiguration(true)

    val env:StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment(1,configuration)






    // get input data by connecting to the socket
    val dataStream = env.socketTextStream("localhost", port, '\n')



    import org.apache.flink.streaming.api.scala._
    val dataStreamDeal = dataStream.flatMap( w => w.split("\\s") ).map( w => WordWithCount(w,1))
      .keyBy("word")
      //将当前window中所有的行记录,发送过来ProcessAllWindowFunction函数中去处理(可以排序,可以对相同key进行处理)
      //缺点,window中数据量大时,就容易内存溢出
      .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))

      .process(new ProcessAllWindowFunction[WordWithCount,WordWithCount,TimeWindow] {
        override def process(context: Context, elements: Iterable[WordWithCount], out: Collector[WordWithCount]): Unit = {
          val set = new mutable.HashSet[WordWithCount]{}


          for(wordCount <- elements){
            if(set.contains(wordCount)){
              set.remove(wordCount)
              set.add(new WordWithCount(wordCount.word,wordCount.count + 1))
            }else{
              set.add(wordCount)
            }
          }

          val sortSet = set.toList.sortWith( (a,b) => a.word.compareTo(b.word)  < 0 )

          for(wordCount <- sortSet)  out.collect(wordCount)
        }

      })




      //.countWindow(3)
      //.countWindow(3,1)
      //.countWindowAll(3)




    //textResult.print().setParallelism(1)

    val bucketingSink = new BucketingSink[WordWithCount]("file:/opt/n_001_workspaces/bigdata/flink/flink-maven-scala-2/sink-data")


    bucketingSink.setBucketer(new DateTimeBucketer[WordWithCount]("yyyy-MM-dd--HHmm", ZoneId.of("Asia/Shanghai")))
    //bucketingSink.setWriter(new SequenceFileWriter[IntWritable, Text]())
    //bucketingSink.setWriter(new SequenceFileWriter[WordWithCount]())
    //bucketingSink.setBatchSize(1024 * 1024 * 400) // this is 400 MB,
    //bucketingSink.setBatchSize(100 ) // this is 400 MB,
    bucketingSink.setBatchSize(1024 * 1024 * 400 ) // this is 400 MB,
    //bucketingSink.setBatchRolloverInterval(20 * 60 * 1000); // this is 20 mins
    bucketingSink.setBatchRolloverInterval( 2 * 1000); // this is 20 mins
    //setInactiveBucketCheckInterval
    //setInactiveBucketThreshold
    //每间隔多久时间,往Sink中写数据,不是每天条数据就写,浪费资源

    bucketingSink.setInactiveBucketThreshold(2 * 1000)
    bucketingSink.setAsyncTimeout(1 * 1000)


    dataStreamDeal.setParallelism(1)
      .addSink(bucketingSink)




    if(args == null || args.size ==0){
      env.execute("默认作业")

      //执行计划
      //println(env.getExecutionPlan)
      //StreamGraph
     //println(env.getStreamGraph.getStreamingPlanAsJSON)



      //JsonPlanGenerator.generatePlan(jobGraph)

    }else{
      env.execute(args(0))
    }

    println("结束")

  }


  // Data type for words with count
  case class WordWithCount(word: String, count: Long)

/*  abstract private   class OrderWindowFunction extends ProcessWindowFunction<WordWithCount,WordWithCount,WordWithCount,TimeWindow> {

  }*/
}






---------------------------------------------------------------------------------------------------------------------------------------

> 在 2019年3月5日,下午1:16,张作峰 <ma...@zhangzuofeng.cn> 写道:
> 
> 大家好!
> 请教下诸位大牛,如何使用stream api每五分钟统计一次当天某个消息的总条数?
> 谢谢!


---------------------------------------------------------------------------------------------------------------------------------------

Re: 如何每五分钟统计一次当天某个消息的总条数

Posted by 刘 文 <th...@yahoo.com.INVALID>.
处理这个问题,我有一些想法:

).Flink Stream默认是处理增量数据,对指定间隔时间或数量内的数据进行分析
).可以自定义 ProcessAllWindowFunction,相当于,对于一个Window的数据,自己实现处理逻辑,参数是在Window之前的operator也是已经处理的
).对于你,需要存储每次计算的结果,并更新到存储中心供每次计算使用(如Redis、等)
).下面是一个ProcessAllWIndowFunction的示例,供参考(实现功能: WordCount 程序(增量按单词升序排序)  )







package com.opensourceteams.module.bigdata.flink.example.stream.worldcount.nc.sort

import java.time.ZoneId

import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.function.ProcessAllWindowFunction
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.fs.bucketing.{BucketingSink, DateTimeBucketer}
import org.apache.flink.util.Collector

import scala.collection.mutable

/**
  * nc -lk 1234  输入数据
  */
object SocketWindowWordCountLocalSinkHDFSAndWindowAllAndSorted {


  def getConfiguration(isDebug:Boolean = false):Configuration={

    val configuration : Configuration = new Configuration()

    if(isDebug){
      val timeout = "100000 s"
      val timeoutHeartbeatPause = "1000000 s"
      configuration.setString("akka.ask.timeout",timeout)
      configuration.setString("akka.lookup.timeout",timeout)
      configuration.setString("akka.tcp.timeout",timeout)
      configuration.setString("akka.transport.heartbeat.interval",timeout)
      configuration.setString("akka.transport.heartbeat.pause",timeoutHeartbeatPause)
      configuration.setString("akka.watch.heartbeat.pause",timeout)
      configuration.setInteger("heartbeat.interval",10000000)
      configuration.setInteger("heartbeat.timeout",50000000)
    }


    configuration
  }

  def main(args: Array[String]): Unit = {


    val port = 1234
    // get the execution environment
   // val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment


    val configuration : Configuration = getConfiguration(true)

    val env:StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment(1,configuration)






    // get input data by connecting to the socket
    val dataStream = env.socketTextStream("localhost", port, '\n')



    import org.apache.flink.streaming.api.scala._
    val dataStreamDeal = dataStream.flatMap( w => w.split("\\s") ).map( w => WordWithCount(w,1))
      .keyBy("word")
      //将当前window中所有的行记录,发送过来ProcessAllWindowFunction函数中去处理(可以排序,可以对相同key进行处理)
      //缺点,window中数据量大时,就容易内存溢出
      .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))

      .process(new ProcessAllWindowFunction[WordWithCount,WordWithCount,TimeWindow] {
        override def process(context: Context, elements: Iterable[WordWithCount], out: Collector[WordWithCount]): Unit = {
          val set = new mutable.HashSet[WordWithCount]{}


          for(wordCount <- elements){
            if(set.contains(wordCount)){
              set.remove(wordCount)
              set.add(new WordWithCount(wordCount.word,wordCount.count + 1))
            }else{
              set.add(wordCount)
            }
          }

          val sortSet = set.toList.sortWith( (a,b) => a.word.compareTo(b.word)  < 0 )

          for(wordCount <- sortSet)  out.collect(wordCount)
        }

      })




      //.countWindow(3)
      //.countWindow(3,1)
      //.countWindowAll(3)




    //textResult.print().setParallelism(1)

    val bucketingSink = new BucketingSink[WordWithCount]("file:/opt/n_001_workspaces/bigdata/flink/flink-maven-scala-2/sink-data")


    bucketingSink.setBucketer(new DateTimeBucketer[WordWithCount]("yyyy-MM-dd--HHmm", ZoneId.of("Asia/Shanghai")))
    //bucketingSink.setWriter(new SequenceFileWriter[IntWritable, Text]())
    //bucketingSink.setWriter(new SequenceFileWriter[WordWithCount]())
    //bucketingSink.setBatchSize(1024 * 1024 * 400) // this is 400 MB,
    //bucketingSink.setBatchSize(100 ) // this is 400 MB,
    bucketingSink.setBatchSize(1024 * 1024 * 400 ) // this is 400 MB,
    //bucketingSink.setBatchRolloverInterval(20 * 60 * 1000); // this is 20 mins
    bucketingSink.setBatchRolloverInterval( 2 * 1000); // this is 20 mins
    //setInactiveBucketCheckInterval
    //setInactiveBucketThreshold
    //每间隔多久时间,往Sink中写数据,不是每天条数据就写,浪费资源

    bucketingSink.setInactiveBucketThreshold(2 * 1000)
    bucketingSink.setAsyncTimeout(1 * 1000)


    dataStreamDeal.setParallelism(1)
      .addSink(bucketingSink)




    if(args == null || args.size ==0){
      env.execute("默认作业")

      //执行计划
      //println(env.getExecutionPlan)
      //StreamGraph
     //println(env.getStreamGraph.getStreamingPlanAsJSON)



      //JsonPlanGenerator.generatePlan(jobGraph)

    }else{
      env.execute(args(0))
    }

    println("结束")

  }


  // Data type for words with count
  case class WordWithCount(word: String, count: Long)

/*  abstract private   class OrderWindowFunction extends ProcessWindowFunction<WordWithCount,WordWithCount,WordWithCount,TimeWindow> {

  }*/
}






---------------------------------------------------------------------------------------------------------------------------------------

> 在 2019年3月5日,下午1:16,张作峰 <ma...@zhangzuofeng.cn> 写道:
> 
> 大家好!
> 请教下诸位大牛,如何使用stream api每五分钟统计一次当天某个消息的总条数?
> 谢谢!


---------------------------------------------------------------------------------------------------------------------------------------