You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Marcin Cylke <ma...@ext.allegro.pl> on 2015/03/11 11:19:56 UTC

skewed outer join with spark 1.2.0 - memory consumption

Hi

I'm trying to do a join of two datasets: 800GB with ~50MB.

My code looks like this:

  private def parseClickEventLine(line: String, jsonFormatBC: Broadcast[LazyJsonFormat]): ClickEvent = {
    val json = line.parseJson.asJsObject
    val eventJson = if (json.fields.contains("recommendationId")) json else json.fields("message").asJsObject

    jsonFormatBC.value.clickEventJsonFormat.read(eventJson)
  }

    val jsonFormatBc: Broadcast[LazyJsonFormat] = sc.broadcast(new LazyJsonFormat)

    val views = sc.recoLogRdd(jobConfig.viewsDirectory)
      .map(view => (view.id.toString, view))

    val clicks = sc.textFile(s"${jobConfig.clicksDirectory}/*")
      .map(parseClickEventLine(_, jsonFormatBc))
      .map(click => (click.recommendationId, click))

    val clicksCounts = views.leftOuterJoin(clicks).map({ case (recommendationId, (view, click)) =>
      val metaItemType = click.flatMap(c => view.itemDetailsById.get(c.itemIdStr).map(_.metaItemType))
      (view, metaItemType) -> click.map(_ => 1).getOrElse(0)
    })
    clicksCounts.reduceByKey(_ + _).map(toCSV).saveAsTextFile(jobConfig.outputDirectory)

I'm using Spark 1.2.0 and have the following options set:

spark.default.parallelism = 24
spark.serializer': 'org.apache.spark.serializer.KryoSerializer',
spark.test.disableBlockManagerHeartBeat': 'true',
spark.shuffle.netty.connect.timeout': '30000',
spark.storage.blockManagerSlaveTimeoutMs': '30000',
spark.yarn.user.classpath.first': 'true',
spark.yarn.executor.memoryOverhead': '1536'

The job is run on YARN and I see errors in container logs:

015-03-11 09:16:56,629 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Container [pid=24500,containerID=container_1425476483191_402083_01_000019] is running beyond physical memory limits. Current usage: 6.0 GB of 6 GB physical memory used; 6.9 GB of 12.6 GB virtual memory used. Killing container.

So the problems is related to the excessive use of memory.

Could you advise me what should I fix in my code to make it work for my usecase? 
The strange thing is, that the code worked earlier, with versions around 1.0.0. 
Is it possible that changes between 1.0.0 and 1.2.0 caused that kind of regression?

Regards 
Marcin

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: skewed outer join with spark 1.2.0 - memory consumption

Posted by Marcin Cylke <ma...@ext.allegro.pl>.
On Wed, 11 Mar 2015 11:19:56 +0100
Marcin Cylke <ma...@ext.allegro.pl> wrote:

> Hi
> 
> I'm trying to do a join of two datasets: 800GB with ~50MB.

The job finishes if I set spark.yarn.executor.memoryOverhead to 2048MB.
If it is around 1000MB it fails with "executor lost" errors.

My spark settings are:

- executor cores - 8
- num executors - 32
- executor memory - 4g

Regards
Marcin

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org