You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Alexey Romanenko (Jira)" <ji...@apache.org> on 2022/03/18 18:00:00 UTC

[jira] [Commented] (BEAM-14099) Pipeline with large number of PTransforms fails with StackOverflowError

    [ https://issues.apache.org/jira/browse/BEAM-14099?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17508989#comment-17508989 ] 

Alexey Romanenko commented on BEAM-14099:
-----------------------------------------

Actually, this issue is caused by the way how Apache Spark handles the lineage of its pipelines. To make it fault tolerant, the driver process keeps all DAG in stack and it may take quite significant amount of memory for large pipelines with many transforms that don't materialise or checkpointing the results. So, it's mostly not a Beam Spark Runner problem rather than just a Spark particular feature.

Some workarounds can be used to overcome this issue:
- increase "-Xss" option for Spark driver process 
- materialise results from time to time with an empty side input and {{Reshuffle}}, like it's done in [JdbcIO.Reparallelize|https://github.com/apache/beam/blob/d7f7f8e587a56fbc7fbfbdd8b010a8f6991234ee/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L2084]

> Pipeline with large number of PTransforms fails with StackOverflowError 
> ------------------------------------------------------------------------
>
>                 Key: BEAM-14099
>                 URL: https://issues.apache.org/jira/browse/BEAM-14099
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-spark, sdk-java-core
>    Affects Versions: 2.37.0
>            Reporter: Alexey Romanenko
>            Priority: P1
>         Attachments: BEAM-14099_spark.log
>
>
> If pipeline, written in Java SDK, contains a large number of PTransforms then it fails with a  {{java.lang.StackOverflowError}}
> Code snippet to reproduce (based on WordCount example):
> {code}
> public class WordCountWithNFilters {
>   private static final int N = 100;
>   public static void main(String[] args) {
>     PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
>     Pipeline p = Pipeline.create(options);
>     PCollection<String> words = p.apply(TextIO.read().from("file://tmp/input.txt"))
>         .apply(
>             FlatMapElements.into(TypeDescriptors.strings())
>                 .via((String line) -> Arrays.asList(line.split("[^\\p{L}]+"))));
>     for (int i = 0; i < N; i++) {
>       words = words.apply(Filter.by((String word) -> !word.isEmpty()));
>     }
>     words.apply(Count.perElement())
>         .apply(
>             MapElements.into(TypeDescriptors.strings())
>                 .via(
>                     (KV<String, Long> wordCount) ->
>                         wordCount.getKey() + ": " + wordCount.getValue()))
>         .apply(TextIO.write().to("wordcounts"));
>     p.run().waitUntilFinish();
>   }
> }
> {code}
> Log while running with SparkRunner:
> {code}
> 2022-03-14 19:01:30,465 [pool-3-thread-1] INFO  org.apache.beam.runners.spark.SparkRunner$Evaluator  - Evaluating View.CreatePCollectionView
> [WARNING] 
> org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.StackOverflowError
>     at org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom (SparkPipelineResult.java:73)
>     at org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish (SparkPipelineResult.java:104)
>     at org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish (SparkPipelineResult.java:92)
>     at org.apache.beam.samples.sql.WordCountWithNFilters.main (WordCountWithNFilters.java:39)
>     at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:254)
>     at java.lang.Thread.run (Thread.java:748)
> Caused by: java.lang.StackOverflowError
>     at java.lang.ReflectiveOperationException.<init> (ReflectiveOperationException.java:89)
>     at java.lang.reflect.InvocationTargetException.<init> (InvocationTargetException.java:72)
>     at sun.reflect.GeneratedMethodAccessor39.invoke (Unknown Source)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke (Method.java:498)
>     at java.io.ObjectStreamClass.invokeWriteReplace (ObjectStreamClass.java:1244)
>     at java.io.ObjectOutputStream.writeObject0 (ObjectOutputStream.java:1136)
>     at java.io.ObjectOutputStream.defaultWriteFields (ObjectOutputStream.java:1548)
>     at java.io.ObjectOutputStream.writeSerialData (ObjectOutputStream.java:1509)
>     at java.io.ObjectOutputStream.writeOrdinaryObject (ObjectOutputStream.java:1432)
>     at java.io.ObjectOutputStream.writeObject0 (ObjectOutputStream.java:1178)
>     at java.io.ObjectOutputStream.defaultWriteFields (ObjectOutputStream.java:1548)
>     at java.io.ObjectOutputStream.writeSerialData (ObjectOutputStream.java:1509)
>     at java.io.ObjectOutputStream.writeOrdinaryObject (ObjectOutputStream.java:1432)
>     at java.io.ObjectOutputStream.writeObject0 (ObjectOutputStream.java:1178)
>     at java.io.ObjectOutputStream.writeObject (ObjectOutputStream.java:348)
>     at scala.collection.immutable.List$SerializationProxy.writeObject (List.scala:479)
>     at sun.reflect.GeneratedMethodAccessor40.invoke (Unknown Source)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
> ...
> {code}
> It seems that {{N}} depends on environment configuration.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)