You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by "Neth,David" <Da...@Cerner.com> on 2020/04/23 21:17:48 UTC

Re: Watermark not advancing on splittable DoFns in Flink

Hi, I’m having trouble setting up a pipeline that continuously reads from S3, does a small transformation and then combines per key:

pipeline.apply(FileIO.match().filepattern(options.getDirectory() + "/input/*").continuously(Duration.standardSeconds(10), Watch.Growth.never()))
        .apply("Read", FileIO.readMatches())
        .apply(TextIO.readFiles())
        .apply(Window.into(FixedWindows.of(Duration.standardSeconds(30))))
        .apply(ParDo.of(new SplitFn()))
        .apply(Combine.perKey(new CombinePerKeyFn()))

That works in the direct runner, but in Flink the watermark isn’t advancing..

I asked someone else to look at it and he swapped out the file read with this code so he didn’t have to create the file:


pipeline
        .apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(2)))
        .apply(MapElements.into(strings()).via(i -> "test" + ThreadLocalRandom.current().nextInt(10)));

Surprisingly, the pipeline started working.  That doesn’t use a splittable DoFn like the file read did, so we tried using a different splittable source and it didn’t work in Flink either.  All attempts have worked using the direct runner.

It seems like there’s something wrong in the splittable DoFn logic in the Flink runner where the timestamp isn’t advancing.  I’m fairly new to Beam and Flink and am a little lost at the moment.  Should I log a JIRA for this since it seems like a defect?  I plan on continuing to investigate, so any suggestions for where to start looking would be greatly appreciated.

I setup an example that has both splittable and non-splittable sources (based on the pipeline options) to make it easier to troubleshoot: https://github.com/dneth/file-streaming-example/blob/master/src/main/java/Example.java

Thanks


CONFIDENTIALITY NOTICE This message and any included attachments are from Cerner Corporation and are intended only for the addressee. The information contained in this message is confidential and may constitute inside or non-public information under international, federal, or state securities laws. Unauthorized forwarding, printing, copying, distribution, or use of such information is strictly prohibited and may be unlawful. If you are not the addressee, please promptly delete this message and notify the sender of the delivery error by e-mail or you may call Cerner's corporate offices in Kansas City, Missouri, U.S.A at (+1) (816)221-1024.