You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Sebastian Graca (Jira)" <ji...@apache.org> on 2020/04/15 06:46:00 UTC

[jira] [Updated] (BEAM-9759) Pipeline creation with large number of shards/streams takes long time

     [ https://issues.apache.org/jira/browse/BEAM-9759?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Sebastian Graca updated BEAM-9759:
----------------------------------
    Issue Type: Improvement  (was: Bug)

> Pipeline creation with large number of shards/streams takes long time
> ---------------------------------------------------------------------
>
>                 Key: BEAM-9759
>                 URL: https://issues.apache.org/jira/browse/BEAM-9759
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-kinesis, runner-dataflow
>    Affects Versions: 2.19.0
>            Reporter: Sebastian Graca
>            Priority: Major
>
> We are processing multiple Kinesis streams using pipelines running on {{DataflowRunner}}. The time needed to start such pipeline from a pipeline definition (execution of {{org.apache.beam.sdk.Pipeline.run()}} method) takes considerable amount of time. In our case:
>  * a pipeline that consumes data from 196 streams (237 shards in total) starts in 7 minutes
>  * a pipeline that consumes data from 111 streams (261 shards in total) starts in 4 minutes
> I've been investigating this and found out that when {{Pipeline.run}} is invoked, the whole pipeline graph is traversed and serialized so it can be passed to the Dataflow backend. Here's part of the stacktrace that shows this traversal:
> {code:java}
> at com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:1252)
> at org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.lambda$getRecords$2(SimplifiedKinesisClient.java:137)
> at org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.wrapExceptions(SimplifiedKinesisClient.java:210)
> at org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.getRecords(SimplifiedKinesisClient.java:134)
> at org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.getRecords(SimplifiedKinesisClient.java:119)
> at org.apache.beam.sdk.io.kinesis.StartingPointShardsFinder.validateShards(StartingPointShardsFinder.java:195)
> at org.apache.beam.sdk.io.kinesis.StartingPointShardsFinder.findShardsAtStartingPoint(StartingPointShardsFinder.java:115)
> at org.apache.beam.sdk.io.kinesis.DynamicCheckpointGenerator.generate(DynamicCheckpointGenerator.java:59)
> at org.apache.beam.sdk.io.kinesis.KinesisSource.split(KinesisSource.java:88)
> at org.apache.beam.runners.dataflow.internal.CustomSources.serializeToCloudSource(CustomSources.java:87)
> at org.apache.beam.runners.dataflow.ReadTranslator.translateReadHelper(ReadTranslator.java:51)
> at org.apache.beam.runners.dataflow.DataflowRunner$StreamingUnboundedRead$ReadWithIdsTranslator.translate(DataflowRunner.java:1630)
> at org.apache.beam.runners.dataflow.DataflowRunner$StreamingUnboundedRead$ReadWithIdsTranslator.translate(DataflowRunner.java:1627)
> at org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.visitPrimitiveTransform(DataflowPipelineTranslator.java:494)
> at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:665)
> at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
> at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
> at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
> at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:317)
> at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:251)
> at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:460)
> at org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.translate(DataflowPipelineTranslator.java:433)
> at org.apache.beam.runners.dataflow.DataflowPipelineTranslator.translate(DataflowPipelineTranslator.java:192)
> at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:795)
> at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:186)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:315)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:301)
> {code}
> As you can see, during serialization, {{org.apache.beam.sdk.io.kinesis.KinesisSource.split}} method is called. This method finds all shards for the stream and also validates each shard by reading from it. As this process is sequential it takes considerable time that is dependent both on the number of streams (which has the greatest impact) and also the number of shards. Even with a single stream that has large number of shards, the pipeline startup time will be noticeable.
> I wonder if it's possible to optimise this somehow?
>  One way could be to parallelise the whole process, both on the stream and shard level. As this is split between Beam core and KinesisIO this can be complex.
>  Another solution, that I could think of, is having the information about valid stream shards ready before calling {{Pipeline.run}}. It there were a way to create a {{KinesisIO.Read}} operation in such a way that it cached shard information and enabled a client code to control the parallelisation of this operation this would allow for a great reduction of the startup time.
> I was able to make a PoC to verify how much parallelisation of this process can improve startup time and just by implementing this on the stream level I was able to reduce the startup time from 7 minutes to 2.5 minutes. Unfortunately this was a really hacky solution and I don't consider it to be a one that should be implemented - I hacked the AWS client used by KinesisIO to cache all responses from server and called {{split}} method in parallel on all sources before executing {{Pipeline.run}}. However this proves that there's a huge room for improvement for pipelines that deal with multiple streams and/or shards.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)