You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Sung Gon Yi <sk...@mac.com> on 2019/07/19 08:35:31 UTC

Parallelism issue

Hello.

I wrote below codes.

It works extraordinarily. 
Processing performs after SourceFunction generates all data and quit.
If SourceFunction works infinitely, processing is never performed.

But, it works well when parallelismForTimestamp is given other value (eg. 3), 

I want to know the mechanism of processing of below codes.

Sincerely,
Sung Gon

—————
package org.skon.flink

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.{GlobalWindows, TumblingEventTimeWindows}
import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
import org.apache.flink.util.Collector

object ParallelismWithGlobalWindow {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.getConfig.setAutoWatermarkInterval(50L)
    env.setParallelism(1)

    val maxParallelism = 4
    val parallelismForTimestamp = 4

    val stream = env
      .addSource(new SourceFunction[(Long, Long, Long)] {
        override def run(ctx: SourceFunction.SourceContext[(Long, Long, Long)]): Unit = {
          (0 to 250000).foreach(count => ctx.collect((count, 1L, 2L)))
        }
        override def cancel(): Unit = {}
      })
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[(Long, Long, Long)](Time.seconds(0L)) {
        override def extractTimestamp(element: (Long, Long, Long)): Long = element._1
      })

    stream
      .keyBy(_._2)
      .window(GlobalWindows.create)
      .evictor(TimeEvictor.of(Time.seconds(20L)))
      .trigger(CountTrigger.of(1L))
      .apply[(Long, Long, Long)]((_: Long, _: GlobalWindow, elements: Iterable[(Long, Long, Long)], out: Collector[(Long, Long, Long)]) => out.collect(elements.last))
      .setParallelism(maxParallelism)
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[(Long, Long, Long)](Time.seconds(0L)) {
        override def extractTimestamp(element: (Long, Long, Long)): Long = element._1
      })
      .setParallelism(parallelismForTimestamp)
      .keyBy(_._3)
      .window(TumblingEventTimeWindows.of(Time.seconds(5L)))
      .reduce((_, v2) => v2)
      .setParallelism(maxParallelism)
      .process[(Long, Long, Long)]((value, _, out) => {
        Console.println(value)
        out.collect(value)
      })
      .addSink(new CollectionSink[(Long, Long, Long)])

    env.execute("Parallelism Test with Global Window")
  }
  class CollectionSink[T] extends SinkFunction[T] {
    private val values: List[String] = List[String]()

    override def invoke(value: T): Unit = values + value.toString
  }
}
—————