You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "michael procopio (JIRA)" <ji...@apache.org> on 2017/06/19 14:00:01 UTC
[jira] [Commented] (SPARK-21140) Reduce collect high memory
requrements
[ https://issues.apache.org/jira/browse/SPARK-21140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16054043#comment-16054043 ]
michael procopio commented on SPARK-21140:
------------------------------------------
I disagree executor memory does depend on the size of the partition being collected. A 6 to 1 ratio to collect data seems onerous to me. Seems like multiple copies must be created in the executor.
I found that to collect an RDD containing a single partition of 512 mb took 3gb.
Here's the code I was using:
package com.test
import org.apache.spark._
import org.apache.spark.SparkContext._
object SparkRdd2 {
def main(args: Array[String]) {
try
{
//
// Process any arguments.
//
def parseOptions( map: Map[String,Any], listArgs: List[String]): Map[String,Any] = {
listArgs match {
case Nil =>
map
case "-master" :: value :: tail =>
parseOptions( map+("master"-> value),tail)
case "-recordSize" :: value :: tail =>
parseOptions( map+("recordSize"-> value.toInt),tail)
case "-partitionSize" :: value :: tail =>
parseOptions( map+("partitionSize"-> value.toLong),tail)
case "-executorMemory" :: value :: tail =>
parseOptions( map+("executorMemory"-> value),tail)
case option :: tail =>
println("unknown option"+option)
sys.exit(1)
}
}
val listArgs = args.toList
val optionmap = parseOptions( Map[String,Any](),listArgs)
val master = optionmap.getOrElse("master","local").asInstanceOf[String]
val recordSize = optionmap.getOrElse("recordSize",128).asInstanceOf[Int]
val partitionSize = optionmap.getOrElse("partitionSize",1024*1024*1024).asInstanceOf[Long]
val executorMemory = optionmap.getOrElse("executorMemory","6g").asInstanceOf[String]
println(f"Creating single partition of $partitionSize%d with records of length $recordSize%d")
println(f"Setting spark.executor.memory to $executorMemory")
//
// Create SparkConf.
//
val sparkConf = new SparkConf()
sparkConf.setAppName("MyEnvVar").setMaster(master).setExecutorEnv("myenvvar","good")
sparkConf.set("spark.executor.cores","1")
sparkConf.set("spark.executor.instances","1")
sparkConf.set("spark.executor.memory",executorMemory)
sparkConf.set("spark.eventLog.enabled","true")
sparkConf.set("spark.eventLog.dir","hdfs://hadoop01glnxa64:54310/user/mprocopi/spark-events");
sparkConf.set("spark.driver.maxResultSize","0")
/*
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
sparkConf.set("spark.kryoserializer.buffer.max","768m")
sparkConf.set("spark.kryoserializer.buffer","64k")
*/
//
// Create SparkContext
//
val sc = new SparkContext(sparkConf)
//
//
//
def createdSizedPartition( recordSize:Int ,partitionSize:Long): Iterator[Array[Byte]] = {
var sizeReturned:Long = 0
new Iterator[Array[Byte]] {
override def hasNext(): Boolean = {
(sizeReturned<partitionSize)
}
override def next(): Array[Byte] = {
val record = Array.fill(recordSize)(0.toByte)
sizeReturned = sizeReturned+recordSize
record
}
}
}
//
// Off we go.
//
val startRdd = sc.parallelize( Array( (recordSize, partitionSize)))
val sizedRdd = startRdd.flatMap ( rddInfo => createdSizedPartition( rddInfo._1, rddInfo._2))
val results = sizedRdd.collect
var countLines: Int = 0
var countBytes: Long = 0
var maxRecord: Int = 0
for (line <- results) {
countLines = countLines+1
countBytes = countBytes+line.length
if (line.length> maxRecord)
{
maxRecord = line.length
}
}
println(f"Collected $countLines%d lines")
println(f" $countBytes%d bytes")
println(f"Max record $maxRecord%d bytes")
} catch {
case e: Exception =>
println("Error in executing application: ", e.getMessage)
throw e
}
}
}
After building it can be invoked as:
spark-submit --class com.test.SparkRdd2 --driver-memory 10g ./target/scala-2.11/envtest_2.11-0.0.1.jar -recordSize 256 -partitionSize 536870912
Allows you to vary the
> Reduce collect high memory requrements
> --------------------------------------
>
> Key: SPARK-21140
> URL: https://issues.apache.org/jira/browse/SPARK-21140
> Project: Spark
> Issue Type: Improvement
> Components: Input/Output
> Affects Versions: 2.1.1
> Environment: Linux Debian 8 using hadoop 2.7.2.
> Reporter: michael procopio
>
> I wrote a very simple Scala application which used flatMap to create an RDD containing a 512 mb partition of 256 byte arrays. Experimentally, I determined that spark.executor.memory had to be set at 3 gb in order to colledt the data. This seems extremely high.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org