You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Koert Kuipers <ko...@tresata.com> on 2014/06/25 18:09:38 UTC

graphx Joining two VertexPartitions with different indexes is slow.

lately i am seeing a lot of this warning in graphx:
org.apache.spark.graphx.impl.ShippableVertexPartitionOps: Joining two
VertexPartitions with different indexes is slow.

i am using Graph.outerJoinVertices to join in data from a regular RDD (that
is co-partitioned). i would like this operation to be fast, since i use it
frequently. should i be doing something different?

Re: graphx Joining two VertexPartitions with different indexes is slow.

Posted by Koert Kuipers <ko...@tresata.com>.
you could only do the deep check if the hashcodes are the same and design
hashcodes that do not take all elements into account.

the alternative seems to be putting cache statements all over graphx, as is
currently the case, which is trouble for any long lived application where
caching is carefully managed. I think? I am currently forced to do
unpersists on vertices after almost every intermediate graph
transformation, or accept my rdd cache getting polluted
On Jul 7, 2014 12:03 AM, "Ankur Dave" <an...@gmail.com> wrote:

> Well, the alternative is to do a deep equality check on the index arrays,
> which would be somewhat expensive since these are pretty large arrays (one
> element per vertex in the graph). But, in case the reference equality check
> fails, it actually might be a good idea to do the deep check before
> resorting to the slow code path.
>
> Ankur <http://www.ankurdave.com/>
>

Re: graphx Joining two VertexPartitions with different indexes is slow.

Posted by Ankur Dave <an...@gmail.com>.
Well, the alternative is to do a deep equality check on the index arrays,
which would be somewhat expensive since these are pretty large arrays (one
element per vertex in the graph). But, in case the reference equality check
fails, it actually might be a good idea to do the deep check before
resorting to the slow code path.

Ankur <http://www.ankurdave.com/>

Re: graphx Joining two VertexPartitions with different indexes is slow.

Posted by Koert Kuipers <ko...@tresata.com>.
probably a dumb question, but why is reference equality used for the
indexes?


On Sun, Jul 6, 2014 at 12:43 AM, Ankur Dave <an...@gmail.com> wrote:

> When joining two VertexRDDs with identical indexes, GraphX can use a fast
> code path (a zip join without any hash lookups). However, the check for
> identical indexes is performed using reference equality.
>
> Without caching, two copies of the index are created. Although the two
> indexes are structurally identical, they fail reference equality, and so
> GraphX mistakenly uses the slow path involving a hash lookup per joined
> element.
>
> I'm working on a patch <https://github.com/apache/spark/pull/1297> that
> attempts an optimistic zip join with per-element fallback to hash lookups,
> which would improve this situation.
>
> Ankur <http://www.ankurdave.com/>
>
>

Re: graphx Joining two VertexPartitions with different indexes is slow.

Posted by Ankur Dave <an...@gmail.com>.
When joining two VertexRDDs with identical indexes, GraphX can use a fast
code path (a zip join without any hash lookups). However, the check for
identical indexes is performed using reference equality.

Without caching, two copies of the index are created. Although the two
indexes are structurally identical, they fail reference equality, and so
GraphX mistakenly uses the slow path involving a hash lookup per joined
element.

I'm working on a patch <https://github.com/apache/spark/pull/1297> that
attempts an optimistic zip join with per-element fallback to hash lookups,
which would improve this situation.

Ankur <http://www.ankurdave.com/>

Re: graphx Joining two VertexPartitions with different indexes is slow.

Posted by Koert Kuipers <ko...@tresata.com>.
thanks for replying. why is joining two vertexrdds without caching slow?
what is recomputed unnecessarily?
i am not sure what is different here from joining 2 regular RDDs (where
nobody seems to recommend to cache before joining i think...)


On Thu, Jul 3, 2014 at 10:52 PM, Ankur Dave <an...@gmail.com> wrote:

> Oh, I just read your message more carefully and noticed that you're
> joining a regular RDD with a VertexRDD. In that case I'm not sure why the
> warning is occurring, but it might be worth caching both operands
> (graph.vertices and the regular RDD) just to be sure.
>
> Ankur <http://www.ankurdave.com/>
>
>

Re: graphx Joining two VertexPartitions with different indexes is slow.

Posted by Ankur Dave <an...@gmail.com>.
Oh, I just read your message more carefully and noticed that you're joining
a regular RDD with a VertexRDD. In that case I'm not sure why the warning
is occurring, but it might be worth caching both operands (graph.vertices
and the regular RDD) just to be sure.

Ankur <http://www.ankurdave.com/>

Re: graphx Joining two VertexPartitions with different indexes is slow.

Posted by Ankur Dave <an...@gmail.com>.
A common reason for the "Joining ... is slow" message is that you're
joining VertexRDDs without having cached them first. This will cause Spark
to recompute unnecessarily, and as a side effect, the same index will get
created twice and GraphX won't be able to do an efficient zip join.

For example, the following code will counterintuitively produce the
"Joining ... is slow" message:

val a = VertexRDD(sc.parallelize((1 to 100).map(x => (x.toLong, x))))
a.leftJoin(a) { (id, a, b) => a + b }

The remedy is to call a.cache() before a.leftJoin(a).

Ankur <http://www.ankurdave.com/>