You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "michael procopio (JIRA)" <ji...@apache.org> on 2017/06/19 14:00:01 UTC

[jira] [Commented] (SPARK-21140) Reduce collect high memory requrements

    [ https://issues.apache.org/jira/browse/SPARK-21140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16054043#comment-16054043 ] 

michael procopio commented on SPARK-21140:
------------------------------------------

I disagree executor memory does depend on the size of the partition being collected.  A 6 to 1 ratio to collect  data seems onerous to me.  Seems like multiple copies must be created in the executor.

I found that to collect an RDD containing a single partition of 512 mb took 3gb.  

Here's the code I was using:

package com.test

import org.apache.spark._
import org.apache.spark.SparkContext._

object SparkRdd2 {
    def main(args: Array[String]) {
      try
      {
          //
          // Process any arguments.
          //
         def parseOptions( map: Map[String,Any], listArgs: List[String]): Map[String,Any] = {
             listArgs match {
                 case Nil =>
                     map
                 case "-master" :: value :: tail =>
                     parseOptions( map+("master"-> value),tail)
                 case "-recordSize" :: value :: tail =>
                     parseOptions( map+("recordSize"-> value.toInt),tail)
                 case "-partitionSize" :: value :: tail  =>
                     parseOptions( map+("partitionSize"-> value.toLong),tail)
                 case "-executorMemory" :: value :: tail =>
                     parseOptions( map+("executorMemory"-> value),tail)
                 case option :: tail =>
                     println("unknown option"+option)
                     sys.exit(1)
             }
         }
         val listArgs = args.toList
         val optionmap = parseOptions( Map[String,Any](),listArgs)
         val master = optionmap.getOrElse("master","local").asInstanceOf[String]
         val recordSize = optionmap.getOrElse("recordSize",128).asInstanceOf[Int]
         val partitionSize = optionmap.getOrElse("partitionSize",1024*1024*1024).asInstanceOf[Long]
         val executorMemory = optionmap.getOrElse("executorMemory","6g").asInstanceOf[String]
         println(f"Creating single partition of $partitionSize%d with records of length $recordSize%d")
         println(f"Setting spark.executor.memory to $executorMemory")
         //
         // Create SparkConf.
         //
         val sparkConf = new SparkConf()
         sparkConf.setAppName("MyEnvVar").setMaster(master).setExecutorEnv("myenvvar","good")
         sparkConf.set("spark.executor.cores","1")
         sparkConf.set("spark.executor.instances","1")
         sparkConf.set("spark.executor.memory",executorMemory)
         sparkConf.set("spark.eventLog.enabled","true")
         sparkConf.set("spark.eventLog.dir","hdfs://hadoop01glnxa64:54310/user/mprocopi/spark-events");
         sparkConf.set("spark.driver.maxResultSize","0")
         /*
          sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
          sparkConf.set("spark.kryoserializer.buffer.max","768m")
          sparkConf.set("spark.kryoserializer.buffer","64k")
          */
         //
         // Create SparkContext
         //
         val sc = new SparkContext(sparkConf)
         //
         //
         //
         def createdSizedPartition( recordSize:Int ,partitionSize:Long): Iterator[Array[Byte]] = {
            var sizeReturned:Long = 0
            new Iterator[Array[Byte]] {
                override def hasNext(): Boolean = {
                   (sizeReturned<partitionSize)
                }
                override def next(): Array[Byte] = {
                   val record = Array.fill(recordSize)(0.toByte)
                   sizeReturned = sizeReturned+recordSize
                   record
                }
            }
         }
         //
         // Off we go.
         //
         val startRdd = sc.parallelize( Array( (recordSize, partitionSize)))
         val sizedRdd = startRdd.flatMap ( rddInfo => createdSizedPartition( rddInfo._1, rddInfo._2))
         val results = sizedRdd.collect
         var countLines: Int = 0
         var countBytes: Long = 0
         var maxRecord: Int = 0
         for (line <- results) {
             countLines = countLines+1
             countBytes = countBytes+line.length
              if (line.length> maxRecord)
              {
                 maxRecord = line.length
              }
         }
         println(f"Collected $countLines%d lines")
         println(f"          $countBytes%d bytes")
         println(f"Max record $maxRecord%d bytes")
      } catch {
           case e: Exception =>
              println("Error in executing application: ", e.getMessage)
              throw e
      }

   }
}

After building it can be invoked as:

spark-submit --class com.test.SparkRdd2 --driver-memory 10g ./target/scala-2.11/envtest_2.11-0.0.1.jar -recordSize 256 -partitionSize 536870912

Allows you to vary the 

> Reduce collect high memory requrements
> --------------------------------------
>
>                 Key: SPARK-21140
>                 URL: https://issues.apache.org/jira/browse/SPARK-21140
>             Project: Spark
>          Issue Type: Improvement
>          Components: Input/Output
>    Affects Versions: 2.1.1
>         Environment: Linux Debian 8 using hadoop 2.7.2.
>            Reporter: michael procopio
>
> I wrote a very simple Scala application which used flatMap to create an RDD containing a 512 mb partition of 256 byte arrays.  Experimentally, I determined that spark.executor.memory had to be set at 3 gb in order to colledt the data.  This seems extremely high.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org