You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Jungtaek Lim (JIRA)" <ji...@apache.org> on 2018/07/04 21:52:00 UTC

[jira] [Commented] (SPARK-23682) Memory issue with Spark structured streaming

    [ https://issues.apache.org/jira/browse/SPARK-23682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16533078#comment-16533078 ] 

Jungtaek Lim commented on SPARK-23682:
--------------------------------------

[~bondyk] [~ccifuentes] [~akorzhuev]

This may not due to memory leak but due to excess caching in HDFS state store provider.

Could one of you apply the patch for https://issues.apache.org/jira/browse/SPARK-24441 ([https://github.com/apache/spark/pull/21469)] to see overall memory use for caching?

If you find that excess state versions caching is an issue, you can apply the patch for https://issues.apache.org/jira/browse/SPARK-24717 ([https://github.com/apache/spark/pull/21700]) to see reduced memory use.

> Memory issue with Spark structured streaming
> --------------------------------------------
>
>                 Key: SPARK-23682
>                 URL: https://issues.apache.org/jira/browse/SPARK-23682
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL, Structured Streaming
>    Affects Versions: 2.2.0
>         Environment: EMR 5.9.0 with Spark 2.2.0 and Hadoop 2.7.3
> |spark.blacklist.decommissioning.enabled|true|
> |spark.blacklist.decommissioning.timeout|1h|
> |spark.cleaner.periodicGC.interval|10min|
> |spark.default.parallelism|18|
> |spark.dynamicAllocation.enabled|false|
> |spark.eventLog.enabled|true|
> |spark.executor.cores|3|
> |spark.executor.extraJavaOptions|-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 -XX:+CMSClassUnloadingEnabled -XX:OnOutOfMemoryError='kill -9 %p'|
> |spark.executor.id|driver|
> |spark.executor.instances|3|
> |spark.executor.memory|22G|
> |spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version|2|
> |spark.hadoop.parquet.enable.summary-metadata|false|
> |spark.hadoop.yarn.timeline-service.enabled|false|
> |spark.jars| |
> |spark.master|yarn|
> |spark.memory.fraction|0.9|
> |spark.memory.storageFraction|0.3|
> |spark.memory.useLegacyMode|false|
> |spark.rdd.compress|true|
> |spark.resourceManager.cleanupExpiredHost|true|
> |spark.scheduler.mode|FIFO|
> |spark.serializer|org.apache.spark.serializer.KryoSerializer|
> |spark.shuffle.service.enabled|true|
> |spark.speculation|false|
> |spark.sql.parquet.filterPushdown|true|
> |spark.sql.parquet.mergeSchema|false|
> |spark.sql.warehouse.dir|hdfs:///user/spark/warehouse|
> |spark.stage.attempt.ignoreOnDecommissionFetchFailure|true|
> |spark.submit.deployMode|client|
> |spark.yarn.am.cores|1|
> |spark.yarn.am.memory|2G|
> |spark.yarn.am.memoryOverhead|1G|
> |spark.yarn.executor.memoryOverhead|3G|
>            Reporter: Yuriy Bondaruk
>            Priority: Major
>              Labels: Memory, memory, memory-leak
>         Attachments: Screen Shot 2018-03-07 at 21.52.17.png, Screen Shot 2018-03-10 at 18.53.49.png, Screen Shot 2018-03-28 at 16.44.20.png, Screen Shot 2018-03-28 at 16.44.20.png, Screen Shot 2018-03-28 at 16.44.20.png, Spark executors GC time.png, image-2018-03-22-14-46-31-960.png, screen_shot_2018-03-20_at_15.23.29.png
>
>
> It seems like there is an issue with memory in structured streaming. A stream with aggregation (dropDuplicates()) and data partitioning constantly increases memory usage and finally executors fails with exit code 137:
> {quote}ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: Container marked as failed: container_1520214726510_0001_01_000003 on host: ip-10-0-1-153.us-west-2.compute.internal. Exit status: 137. Diagnostics: Container killed on request. Exit code is 137
> Container exited with a non-zero exit code 137
> Killed by external signal{quote}
> Stream creating looks something like this:
> {quote}session
>     .readStream()
>     .schema(inputSchema)
>     .option(OPTION_KEY_DELIMITER, OPTION_VALUE_DELIMITER_TAB)
>     .option(OPTION_KEY_QUOTE, OPTION_VALUE_QUOTATION_OFF)
>     .csv("s3://test-bucket/input")
>     .as(Encoders.bean(TestRecord.class))
>     .flatMap(mf, Encoders.bean(TestRecord.class))
>     .dropDuplicates("testId", "testName")
>     .withColumn("year", functions.date_format(dataset.col("testTimestamp").cast(DataTypes.DateType), "YYYY"))
>     .writeStream()
>     .option("path", "s3://test-bucket/output")
>     .option("checkpointLocation", "s3://test-bucket/checkpoint")
>     .trigger(Trigger.ProcessingTime(60, TimeUnit.SECONDS))
>     .partitionBy("year")
>     .format("parquet")
>     .outputMode(OutputMode.Append())
>     .queryName("test-stream")
>     .start();{quote}
> Analyzing the heap dump I found that most of the memory used by {{org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider}} that is referenced from [StateStore|https://github.com/apache/spark/blob/branch-2.2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L196] 
> On the first glance it looks normal since that is how Spark keeps aggregation keys in memory. However I did my testing by renaming files in source folder, so that they could be picked up by spark again. Since input records are the same all further rows should be rejected as duplicates and memory consumption shouldn't increase but it's not true. Moreover, GC time took more than 30% of total processing time.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org