You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Marcin Cylke <ma...@ext.allegro.pl> on 2015/03/11 11:19:56 UTC
skewed outer join with spark 1.2.0 - memory consumption
Hi
I'm trying to do a join of two datasets: 800GB with ~50MB.
My code looks like this:
private def parseClickEventLine(line: String, jsonFormatBC: Broadcast[LazyJsonFormat]): ClickEvent = {
val json = line.parseJson.asJsObject
val eventJson = if (json.fields.contains("recommendationId")) json else json.fields("message").asJsObject
jsonFormatBC.value.clickEventJsonFormat.read(eventJson)
}
val jsonFormatBc: Broadcast[LazyJsonFormat] = sc.broadcast(new LazyJsonFormat)
val views = sc.recoLogRdd(jobConfig.viewsDirectory)
.map(view => (view.id.toString, view))
val clicks = sc.textFile(s"${jobConfig.clicksDirectory}/*")
.map(parseClickEventLine(_, jsonFormatBc))
.map(click => (click.recommendationId, click))
val clicksCounts = views.leftOuterJoin(clicks).map({ case (recommendationId, (view, click)) =>
val metaItemType = click.flatMap(c => view.itemDetailsById.get(c.itemIdStr).map(_.metaItemType))
(view, metaItemType) -> click.map(_ => 1).getOrElse(0)
})
clicksCounts.reduceByKey(_ + _).map(toCSV).saveAsTextFile(jobConfig.outputDirectory)
I'm using Spark 1.2.0 and have the following options set:
spark.default.parallelism = 24
spark.serializer': 'org.apache.spark.serializer.KryoSerializer',
spark.test.disableBlockManagerHeartBeat': 'true',
spark.shuffle.netty.connect.timeout': '30000',
spark.storage.blockManagerSlaveTimeoutMs': '30000',
spark.yarn.user.classpath.first': 'true',
spark.yarn.executor.memoryOverhead': '1536'
The job is run on YARN and I see errors in container logs:
015-03-11 09:16:56,629 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Container [pid=24500,containerID=container_1425476483191_402083_01_000019] is running beyond physical memory limits. Current usage: 6.0 GB of 6 GB physical memory used; 6.9 GB of 12.6 GB virtual memory used. Killing container.
So the problems is related to the excessive use of memory.
Could you advise me what should I fix in my code to make it work for my usecase?
The strange thing is, that the code worked earlier, with versions around 1.0.0.
Is it possible that changes between 1.0.0 and 1.2.0 caused that kind of regression?
Regards
Marcin
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org
Re: skewed outer join with spark 1.2.0 - memory consumption
Posted by Marcin Cylke <ma...@ext.allegro.pl>.
On Wed, 11 Mar 2015 11:19:56 +0100
Marcin Cylke <ma...@ext.allegro.pl> wrote:
> Hi
>
> I'm trying to do a join of two datasets: 800GB with ~50MB.
The job finishes if I set spark.yarn.executor.memoryOverhead to 2048MB.
If it is around 1000MB it fails with "executor lost" errors.
My spark settings are:
- executor cores - 8
- num executors - 32
- executor memory - 4g
Regards
Marcin
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org