You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Pawel Bartoszek (JIRA)" <ji...@apache.org> on 2019/01/18 16:52:00 UTC

[jira] [Updated] (BEAM-6465) Flink: State accumulation during restoring from a savepoint

     [ https://issues.apache.org/jira/browse/BEAM-6465?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Pawel Bartoszek updated BEAM-6465:
----------------------------------
    Attachment: Screen Shot 2019-01-18 at 12.07.03.png

> Flink: State accumulation during restoring from a savepoint
> -----------------------------------------------------------
>
>                 Key: BEAM-6465
>                 URL: https://issues.apache.org/jira/browse/BEAM-6465
>             Project: Beam
>          Issue Type: Test
>          Components: beam-model
>    Affects Versions: 2.7.0
>            Reporter: Pawel Bartoszek
>            Assignee: Kenneth Knowles
>            Priority: Major
>         Attachments: Screen Shot 2019-01-18 at 12.07.03.png
>
>
> This ticket captures my findings when restoring a BEAM job from a savepoint on Flink runner.
>  
> *The problem*
> When job is restored from a savepoint taken a few hours ago than we see that checkpoint size starts growing rediciously high which leads to the job running out of heap space error (we use filesystem state backend).  
>  
> *Job structure*
> Job has two paths the data lake path and the aggregate data path. 
> *Data lake path*
> Data lake path is a dumb sink of all records received by the job. The records are flushed to S3.
> Datalake trigger:
> {code:java}
> input.apply(   WithTimestamps.of(extractTimestamp).withAllowedTimestampSkew(standardDays(7)))
> .apply(Window.<RDotRecord>into(FixedWindows.of(standardMinutes(1)))
>                 .triggering(
>                         AfterWatermark.pastEndOfWindow()
>                                 .withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(lateFiringsPeriod))
>                 )
>                 .withAllowedLateness(standardMinutes(30), FIRE_ALWAYS)
>                 .discardingFiredPanes());{code}
>  
> *Aggregate path*
> Aggregate path has some group by key, count etc transformations 
>  
> Aggregate trigger:
> {code:java}
> input.apply( WithTimestamps.of(extractTimestamp).withAllowedTimestampSkew(standardDays(7)))
>         .apply(Window.<RDotRecord>into(FixedWindows.of(WINDOW_SIZE))
>                 .triggering(
>                         AfterWatermark.pastEndOfWindow()
>                                 .withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(lateFiringsPeriod))
>                 )
>                 .withAllowedLateness(standardMinutes(30), FIRE_ALWAYS)
>                 .accumulatingFiredPanes());{code}
>  
> *My investigation*
> Our team has written a tool to collect input watermarks from the Flink API. It turned out that some sub operator(running on particular Flink slot) are running slower thus watermark is falling behind other slots. Look at the graph below:
>  
>  
>  
>  
> *My findings*
> If the state is being accumulated because of watermark slowing down on some operator slots (Flink specific) than introducing *early firings* should help ... and indeed helped. I can see that low watermark on JDBC task (where I write to a database) 
>  
>  
>  
>  
> Setup:
>  * Job reads records from 32 Kinesis shards.
>  * Job parallelism 8
>  * Running on Beam 2.7 Flink 1.5
> *Hardware:*
>  ** Master:1 x m5.xlarge
>  ** Core instances: 5 x r4.2xlarge
>  * *YARN session configuration:*
>  ** 
> {code:java}
> /usr/bin/flink run --class streaming.Main -m yarn-cluster --yarnstreaming --yarnjobManagerMemory 6272 --yarntaskManagerMemory 26000 -yD classloader.resolve-order=parent-first -yD parallelism.default=8 -yD containerized.heap-cutoff-ratio=0.15 -yD state.backend=filesystem -yD yarn.maximum-failed-containers=-1 -yD jobmanager.web.checkpoints.history=1000 -yD akka.ask.timeout=60s -XX:GCLogFileSize=20M -XX:NumberOfGCLogFiles=2 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintTenuringDistribution -XX:+PrintGCCause -XX:+PrintGCDateStamps -XX:+UseG1GC /home/hadoop/job.jar --runner=FlinkRunner --awsRegion=eu-west-1 --appName=XXX --input=kinesis://XXX --outputFileSystemType=S3 --outputFileSystemRoot=XXX --outputDirectory=structured-streaming --externalizedCheckpointsEnabled=true --checkpointingInterval=300000 --checkpointTimeoutMillis=360000 --failOnCheckpointingErrors=false --minPauseBetweenCheckpoints=60000 --parallelism=8{code}
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)