You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by "wydhcws@gmail.com" <wy...@gmail.com> on 2019/12/02 04:13:18 UTC

小白求助,FlinkSQL处理速度上不来

各位大佬,我用flink  sql写了一些指标计算程序,消费kafka写到influxdb,发现夜晚kafka日志生产速度在10几k/min 时,程序没有问题,但是到白天涨到100k/min后 就渐渐卡住消费不动了,用的是flink 1.9 ,现在觉得是Flink sql执行这一层有点慢,窗口是滚动5分钟,目前是用的两个solt,调大并行度试了一下也没效果,这个有什么解决方案吗?
代码如下:
    val windowWidth = 5

    //stream config
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.enableCheckpointing(1000 * 60*5)
    env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
    env.getCheckpointConfig.setCheckpointTimeout(60000*10)
    env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
    env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION)
    //   env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    //table config
    val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
    tEnv.getConfig.setIdleStateRetentionTime(Time.minutes(1), Time.minutes(10))

    tEnv.registerFunction("TimeFormatJava", new TimeFormatJava())
    tEnv.registerFunction("TimeFormatUDF", TimeFormatUDF)

  
    //Kafka Source
    val kafkaProperties: Properties = new Properties
    kafkaProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers)
    kafkaProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId)
    kafkaProperties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"5000")
    kafkaProperties.setProperty(ConsumerConfig.FETCH_MIN_BYTES_CONFIG,"500")


    var topics: util.List[String] = new util.ArrayList[String]

    for (topic <- kafkaTopics.split(SPLIT)) {
      topics.add(topic)
    }

    val kafkaConsumer: FlinkKafkaConsumer011[String] = new FlinkKafkaConsumer011[String](topics, new SimpleStringSchema, kafkaProperties)

    val driverSearchDstream: DataStream[DriverSearch] = env.addSource(kafkaConsumer.setStartFromLatest()).map(msg => {
      val info: String = msg.substring(msg.indexOf("{"), msg.length)
      val createTime = msg.substring(0, 19)
      val timeStamp = getLongTime(createTime)

      val json = JSON.parseObject(info)
      DriverSearch(
        json.getString("driverId") + "_" + timeStamp,
        json.getString("driverId"),
        json.getIntValue("searchType"),
        timeStamp
      )
    }).setParallelism(2)

    val driverSearchDstreamWithEventTime: DataStream[DriverSearch] = driverSearchDstream.assignTimestampsAndWatermarks(
      new BoundedOutOfOrdernessTimestampExtractor[DriverSearch](org.apache.flink.streaming.api.windowing.time.Time.seconds(10L)) {
        override def extractTimestamp(element: DriverSearch): Long = element.timestamp
      }
    )

    driverSearchDstream.map(info=>println(info+"time:"+System.currentTimeMillis()))

    val table: Table = tEnv.fromDataStream(driverSearchDstreamWithEventTime, 'rowKey, 'driverId, 'searchType, 'timestamp.rowtime as 'w)

    val sql1: String =
      s"""
                        select TimeFormatJava(TUMBLE_END(w, INTERVAL '$windowWidth' MINUTE),8) as time_end,
                        searchType,
                        count(distinct driverId) as typeUv,
                        count(distinct rowKey) as typePv
                        from $table
                        group by TUMBLE(w, INTERVAL '$windowWidth' MINUTE),searchType
                      """.stripMargin

    val resultTable1: Table = tEnv.sqlQuery(sql1)

    val typeMap= immutable.Map(1->"1-goWorkSearch",2->"2-offWorkSearch",3->"3-nearbySearch",4->"4-temporarySearch",5->"5-commonSearch",6->"6-multiplySearch")

    val influxStream: DataStream[InfluxDBPoint] = tEnv.toAppendStream[Row](resultTable1).map {
      row => {
        val typeName: String= typeMap(row.getField(1).asInstanceOf[Int])
        val point = new InfluxDBPoint("Carpool_Search_Pv_Uv", row.getField(0).asInstanceOf[Long]) //udf +8hour
        val fields = new util.HashMap[String,Object]()
        val tags = new util.HashMap[String,String]()
        fields.put("typeUv", row.getField(2))
        fields.put("typePv",row.getField(3))
        point.setFields(fields)
        tags.put("typeName",typeName)
        point.setTags(tags)
        point
      }
    }

    influxStream.map{
      point=>{
        println( println("influxPoint:"+point.getFields+"=="
          +point.getTags+"=="+point.getMeasurement
          +"=="+point.getTimestamp+"time:"+System.currentTimeMillis())
        )
      }
    }


    val influxDBConfig = InfluxDBConfig.builder("http://host:8086", "admin", "admin", "aimetric").build


    influxStream.addSink(new InfluxDBSink(influxDBConfig))

    env.execute()

  }


  def getLongTime(str:String) ={
    val format = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
    val time: Long = format.parse(str).getTime
    time
  






wydhcws@gmail.com