You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Rick Moritz <ra...@gmail.com> on 2015/08/18 16:38:26 UTC

Strange shuffle behaviour difference between Zeppelin and Spark-shell

Dear list,

I am observing a very strange behaviour between a Spark 1.4.0-rc4 REPL
(locally compiled with Java 7) and a Spark 1.4.0 zeppelin interpreter
(compiled with Java 6 and sourced from maven central).

The workflow loads data from Hive, applies a number of transformations
(including quite a lot of shuffle operations) and then presents an enriched
dataset. The code (an resulting DAGs) are identical in each case.

The following particularities are noted:
Importing the HiveRDD and caching it yields identical results on both
platforms.
Applying case classes, leads to a 2-2.5MB increase in dataset size per
partition (excepting empty partitions).

Writing shuffles shows this much more significant result:

Zepppelin:
*Total Time Across All Tasks: * 2,6 min
*Input Size / Records: * 2.4 GB / 7314771
*Shuffle Write: * 673.5 MB / 7314771

vs

Spark-shell:
*Total Time Across All Tasks: * 28 min
*Input Size / Records: * 3.6 GB / 7314771
*Shuffle Write: * 9.0 GB / 7314771

This is one of the early stages, which reads from a cached partition and
then feeds into a join-stage. The latter stages show similar behaviour in
producing excessive shuffle spills.

Quite often the excessive shuffle volume will lead to massive shuffle
spills which ultimately kill not only performance, but the actual executors
as well.

I have examined the Environment tab in the SParkUI and identified no
notable difference besides FAIR (Zeppelin) vs FIFO (spark-shell) scheduling
mode. I fail to see how this would impact shuffle writes in such a drastic
way, since it should be on the inter-job level, while this happens at the
inter-stage level.

I was somewhat supicious of maybe compression or serialization playing a
role, but the SparkConf points to those being set to the default. Also
Zeppelin's interpreter adds no relevant additional default parameters.
I performed a diff between rc4 (which was later released) and 1.4.0 and as
expected there were no differences, besides a single class (remarkably, a
shuffle-relevant class:
/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.class )
differing in its binary representation due to being compiled with Java 7
instead of Java 6. The decompiled sources of those two are again identical.

I may attempt as a next step to simply replace that file in the packaged
jar, to ascertain that indeed there is no difference between the two
versions, but would consider this to be a major bg, if a simple compiler
change leads to this kind of issue.

I a also open for any other ideas, in particular to verify that the same
compression/serialization is indeed happening, and regarding ways to
determin what exactly is written into these shuffles -- currently I only
know that the tuples are bigger (or smaller) than they ought to be. The
Zeppelin-obtained results do appear to be consistent at least, thus the
suspicion is, that there is an issue with the process launched from
spark-shell. I will also attempt to build a spark job and spark-submit it
using different spark-binaries to further explore the issue.

Best Regards,

Rick Moritz