You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Juha Iso-Sipilä (JIRA)" <ji...@apache.org> on 2019/06/20 13:16:00 UTC

[jira] [Comment Edited] (SPARK-24961) sort operation causes out of memory

    [ https://issues.apache.org/jira/browse/SPARK-24961?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16868522#comment-16868522 ] 

Juha Iso-Sipilä edited comment on SPARK-24961 at 6/20/19 1:15 PM:
------------------------------------------------------------------

I am using Spark 2.4.3 (pyspark) in local mode and I observe the same with my dataset. The dataset has two columns, identifier and document, both are strings. The document column can vary between 1kB and 5MB. I wanted to do the following to shuffle the order of the documents in my dataset.
 ```

df = spark.read.parquet('parquet_files')

df.orderBy(fn.hash(*df.columns)).write.parquet('output_dir')
 ```

This fails with error suggesting I should increase maxResultSize.

It feels wrong to me that the user of sorting API should know anything about the technical implementation or its limitations. Setting maxResultSize to 0 helps with my dataset but will it eventually fail with system OOM when I have even more data?


was (Author: juhai):
I am using Spark 2.4.3 (pyspark) and I observe the same with my dataset. The dataset has two columns, identifier and document, both are strings. The document column can vary between 1kB and 5MB. I wanted to do the following to shuffle the order of the documents in my dataset.
```

df = spark.read.parquet('parquet_files')

df.orderBy(fn.hash(*df.columns)).write.parquet('output_dir')
```

This fails with error suggesting I should increase maxResultSize.

It feels wrong to me that the user of sorting API should know anything about the technical implementation or its limitations. Setting maxResultSize to 0 helps with my dataset but will it eventually fail with system OOM when I have even more data?

> sort operation causes out of memory 
> ------------------------------------
>
>                 Key: SPARK-24961
>                 URL: https://issues.apache.org/jira/browse/SPARK-24961
>             Project: Spark
>          Issue Type: Bug
>          Components: Java API
>    Affects Versions: 2.3.1
>         Environment: Java 1.8u144+
> Windows 10
> Spark 2.3.1 in local mode
> -Xms4g -Xmx4g
> optional: -XX:+UseParallelOldGC 
>            Reporter: Markus Breuer
>            Priority: Major
>
> A sort operation on large rdd - which does not fit in memory - causes out of memory exception. I made the effect reproducable by an sample, the sample creates large object of about 2mb size. When saving result the oom occurs. I tried several StorageLevels, but if memory is included (MEMORY_AND_DISK, MEMORY_AND_DISK_SER, none) application runs in out of memory. Only DISK_ONLY seems to work.
> When replacing sort() with sortWithinPartitions() no StorageLevel is required and application succeeds.
> {code:java}
> package de.bytefusion.examples;
> import breeze.storage.Storage;
> import de.bytefusion.utils.Options;
> import org.apache.hadoop.io.MapFile;
> import org.apache.hadoop.io.SequenceFile;
> import org.apache.hadoop.io.Text;
> import org.apache.hadoop.mapred.SequenceFileOutputFormat;
> import org.apache.spark.api.java.JavaRDD;
> import org.apache.spark.api.java.JavaSparkContext;
> import org.apache.spark.sql.Dataset;
> import org.apache.spark.sql.Row;
> import org.apache.spark.sql.RowFactory;
> import org.apache.spark.sql.SparkSession;
> import org.apache.spark.sql.types.DataTypes;
> import org.apache.spark.sql.types.StructType;
> import org.apache.spark.storage.StorageLevel;
> import scala.Tuple2;
> import static org.apache.spark.sql.functions.*;
> import java.util.ArrayList;
> import java.util.List;
> import java.util.UUID;
> import java.util.stream.Collectors;
> import java.util.stream.IntStream;
> public class Example3 {
>     public static void main(String... args) {
>         // create spark session
>         SparkSession spark = SparkSession.builder()
>                 .appName("example1")
>                 .master("local[4]")
>                 .config("spark.driver.maxResultSize","1g")
>                 .config("spark.driver.memory","512m")
>                 .config("spark.executor.memory","512m")
>                 .config("spark.local.dir","d:/temp/spark-tmp")
>                 .getOrCreate();
>         JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
>         // base to generate huge data
>         List<Integer> list = new ArrayList<>();
>         for (int val = 1; val < 10000; val++) {
>             int valueOf = Integer.valueOf(val);
>             list.add(valueOf);
>         }
>         // create simple rdd of int
>         JavaRDD<Integer> rdd = sc.parallelize(list,200);
>         // use map to create large object per row
>         JavaRDD<Row> rowRDD =
>                 rdd
>                         .map(value -> RowFactory.create(String.valueOf(value), createLongText(UUID.randomUUID().toString(), 2 * 1024 * 1024)))
>                         // no persist => out of memory exception on write()
>                         // persist MEMORY_AND_DISK => out of memory exception on write()
>                         // persist MEMORY_AND_DISK_SER => out of memory exception on write()
>                         // persist(StorageLevel.DISK_ONLY())
>                 ;
>         StructType type = new StructType();
>         type = type
>                 .add("c1", DataTypes.StringType)
>                 .add( "c2", DataTypes.StringType );
>         Dataset<Row> df = spark.createDataFrame(rowRDD, type);
>         // works
>         df.show();
>         df = df
>                 .sort(col("c1").asc() )
>             ;
>         df.explain();
>         // takes a lot of time but works
>         df.show();
>         // OutOfMemoryError: java heap space
>         df
>             .write()
>             .mode("overwrite")
>             .csv("d:/temp/my.csv");
>         // OutOfMemoryError: java heap space
>         df
>                 .toJavaRDD()
>                 .mapToPair(row -> new Tuple2(new Text(row.getString(0)), new Text( row.getString(1))))
>                 .saveAsHadoopFile("d:\\temp\\foo", Text.class, Text.class, SequenceFileOutputFormat.class );
>     }
>     private static String createLongText( String text, int minLength ) {
>         StringBuffer sb = new StringBuffer();
>         while( sb.length() < minLength ) {
>             sb.append(text);
>         }
>         return sb.toString();
>     }
> }
> {code}
> When using StorageLevel.MEMORY_AND_DISK(_SER) an oom crashes application at partition 70 at heap usage of 3g from 4g available.
> It seems sort does something like collect, an heap dump shows very large array of array - possibly the partition contents. Also spark.driver.maxResultSize is involved, so sort exceeds the default values. Setting it to unlimited causes oom.
> Why do I think this is a bug?
>  # Operation sort() should not involve maxResultSize
>  # MEMORY_AND_DISK should work at all and at least disk should be used. But I see oom when reaching 3g of 4g total heap size.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org