You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Vijayasarathy Kannan <kv...@vt.edu> on 2015/03/19 19:03:51 UTC

Problems with spark.akka.frameSize

Hi,

I am encountering the following error with a Spark application.

"Exception in thread "main" org.apache.spark.SparkException:
Job aborted due to stage failure:
Serialized task 0:0 was 11257268 bytes, which exceeds max allowed:
spark.akka.frameSize (10485760 bytes) - reserved (204800 bytes).
Consider increasing spark.akka.frameSize or using broadcast variables for
large values."

I am doing the following in my code.

var groupedEdges =
graph.edges.groupBy[VertexId](groupBySrcId).persist(StorageLevel.MEMORY_AND_DISK)

while(condition) {
  var updates = groupedEdges.flatMap {
                  edgesBySrc => doWork(edgesBySrc, a, b)
                }

  updates.collect.foreach(println)
}

def doWork(edges: (VertexId, Iterable[Edge[(Int, Int, Int)]]), a: Double,
b: Double): List[VertexId] = {
  // do something with edges and return a list of verteices
}

Note that the attribute of each edge is a tuple with 3 elements. I
encountered the above mentioned error only when having the edge attribute
as a tuple. The code doesn't run into this when the edge attribute is a
single integer.

I tried increasing *spark.akka.frameSize* (to say 100) and it worked
without running into this issue. Doing a broadcast does not seem
appropriate because each task performing doWork() gets a different set of
edges. However, the groups of edges remain the same all through.

I was wondering if there is an efficient way to what I'm doing, i.e., pass
edgesBySrc efficiently to doWork() (or not pass it all or pass it just once
for the first time and have the tasks retain the sets of edges across
iterations) ?

Thanks