You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Markus Breuer (JIRA)" <ji...@apache.org> on 2018/07/28 22:07:00 UTC

[jira] [Created] (SPARK-24961) sort operation causes out of memory

Markus Breuer created SPARK-24961:
-------------------------------------

             Summary: sort operation causes out of memory 
                 Key: SPARK-24961
                 URL: https://issues.apache.org/jira/browse/SPARK-24961
             Project: Spark
          Issue Type: Bug
          Components: Java API
    Affects Versions: 2.3.1
         Environment: Java 1.8u144+

Windows 10

Spark 2.3.1 in local mode

-Xms4g -Xmx4g

optional: -XX:+UseParallelOldGC 
            Reporter: Markus Breuer


A sort operation on large rdd - which does not fit in memory - causes out of memory exception. I made the effect reproducable by an sample, the sample creates large object of about 2mb size. When saving result the oom occurs. I tried several StorageLevels, but if memory is included (MEMORY_AND_DISK, MEMORY_AND_DISK_SER, none) application runs in out of memory. Only DISK_ONLY seems to work.

When replacing sort() with sortWithinPartitions() no StorageLevel is required and application succeeds.
{code:java}
package de.bytefusion.examples;

import breeze.storage.Storage;
import de.bytefusion.utils.Options;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.storage.StorageLevel;
import scala.Tuple2;

import static org.apache.spark.sql.functions.*;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class Example3 {

    public static void main(String... args) {

        // create spark session
        SparkSession spark = SparkSession.builder()
                .appName("example1")
                .master("local[4]")
                .config("spark.driver.maxResultSize","1g")
                .config("spark.driver.memory","512m")
                .config("spark.executor.memory","512m")
                .config("spark.local.dir","d:/temp/spark-tmp")
                .getOrCreate();

        JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());

        // base to generate huge data
        List<Integer> list = new ArrayList<>();
        for (int val = 1; val < 10000; val++) {
            int valueOf = Integer.valueOf(val);
            list.add(valueOf);
        }
        // create simple rdd of int
        JavaRDD<Integer> rdd = sc.parallelize(list,200);
        // use map to create large object per row
        JavaRDD<Row> rowRDD =
                rdd
                        .map(value -> RowFactory.create(String.valueOf(value), createLongText(UUID.randomUUID().toString(), 2 * 1024 * 1024)))
                        // no persist => out of memory exception on write()
                        // persist MEMORY_AND_DISK => out of memory exception on write()
                        // persist MEMORY_AND_DISK_SER => out of memory exception on write()
                        // persist(StorageLevel.DISK_ONLY())
                ;

        StructType type = new StructType();
        type = type
                .add("c1", DataTypes.StringType)
                .add( "c2", DataTypes.StringType );

        Dataset<Row> df = spark.createDataFrame(rowRDD, type);
        // works
        df.show();
        df = df
                .sort(col("c1").asc() )
            ;
        df.explain();
        // takes a lot of time but works
        df.show();
        // OutOfMemoryError: java heap space
        df
            .write()
            .mode("overwrite")
            .csv("d:/temp/my.csv");
        // OutOfMemoryError: java heap space
        df
                .toJavaRDD()
                .mapToPair(row -> new Tuple2(new Text(row.getString(0)), new Text( row.getString(1))))
                .saveAsHadoopFile("d:\\temp\\foo", Text.class, Text.class, SequenceFileOutputFormat.class );
    }

    private static String createLongText( String text, int minLength ) {
        StringBuffer sb = new StringBuffer();
        while( sb.length() < minLength ) {
            sb.append(text);
        }
        return sb.toString();
    }
}

{code}
When using StorageLevel.MEMORY_AND_DISK(_SER) an oom crashes application at partition 70 at heap usage of 3g from 4g available.

It seems sort does something like collect, an heap dump shows very large array of array - possibly the partition contents. Also spark.driver.maxResultSize is involved, so sort exceeds the default values. Setting it to unlimited causes oom.

Why do I think this is a bug?
 # Operation sort() should not involve maxResultSize
 # MEMORY_AND_DISK should work at all and at least disk should be used. But I see oom when reaching 3g of 4g total heap size.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org