You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Josh Holbrook <jo...@fusion.net> on 2017/07/18 19:17:56 UTC

[Spark Core] unhashable type: 'dict' during shuffle step

Hello!

I'm running into a very strange issue with pretty much no hits on the
internet, and I'm hoping someone here can give me some protips! At this
point, I'm at a loss. This is a little long-winded, but hopefully you'll
indulge me.

Background: I'm currently trying to port some existing spark jobs from
scala to python as part of a greater effort to change owners of our
analytics pipeline, both sets running on AWS EMR. For the most part this
has been going reasonably well, excepting for some cluster tuning problems
that I've since sorted out, but my last job is consistently failing (but
only in prod, not locally with a reduced dataset). Because the jobs are
ported from existing scala code, there are a few odd idioms that I think
could be made smoother with a little bit of work, but I'd rather not rock
the boat too much. I'm using EMR release 5.6.0, which has spark 2.1.0. I've
also ran this against EMR 5.7.0/Spark 2.1.1, with the hope that this might
magically fix my problem, but no dice. The traceback, at any rate, is from
2.1.0. Also potentially relevant is that I have spark configured to use
python 3.4, which is the version of python 3 that ships with EMR.

Anyway, the error I get is "unhashable type: 'dict'" from inside
shuffle.py's ExternalMerger#mergeValues method--full stack trace at:

https://gist.github.com/jfhbrook/5864939b498d7d1adb9856c1697cf2e5

According to the history dashboard, this step is failing at:

    max at
/mnt/yarn/usercache/hadoop/appcache/application_1500043838599_0001/container_1500043838599_0001_02_000001/kinjaspark.zip/kinjaspark/util.py:91

Which in my code looks like this:

    def get_latest_time(rdd):
        return rdd.map(lambda event: event.timestamp).max()

I realized while investigating this that I could rewrite this step to skip
the map and use the key argument with the max call; I have a trial run of
this change ongoing while I write this email. Timestamp, for what it's
worth, is validated in my code as being a python datetime.

Search results for this error are pretty scant
https://encrypted.google.com/search?hl=en&q=spark%20unhashable%20type%20dict%20%2Bshuffle%20mergeValues
and mostly have to do with use of reduceByKey, which I'm not doing at this
stage, so I think something else is going on. It's probably worth noting,
though, that max is a thin wrapper around regular reduce
https://github.com/apache/spark/blob/branch-2.1/python/pyspark/rdd.py#L1004-L1006
.

As a matter of full disclosure, the source data in the rdd variable should
be instances of an attrs-wrapped class
https://pypi.python.org/pypi/attrs/17.2.0 and I can show the source for how
we're loading the events if this is helpful. Another thing potentially
worth noting is that the input RDDs are set to
.persist(StorageLevel.DISK_ONLY) -- this is directly ported from the old
job, and I want to experiment with removing it, but rocking the boat etc. I
also do execute a reduceByKey at a later stage, though by assert its keys
shouldn't involve dicts either (being of the form (str, datetime)).

The data in the traceback is *almost* straightforward: max calls reduce,
which ultimately creates a new PipelinedRDD
https://github.com/apache/spark/blob/branch-2.1/python/pyspark/rdd.py#L335-L358
(I don't know what separates a pipelined RDD from a "regular" RDD), and
somewhere in this process a shuffle is triggered and for whatever reason it
tries to set the key of a dict to another dict
https://github.com/apache/spark/blob/branch-2.1/python/pyspark/shuffle.py#L238
which causes python to explode. Things I don't know: What self.pdata is and
how it's populated, how I can ensure that it doesn't populate with dicts as
keys, and why this step doesn't fail for other jobs that have extremely
similar logic (this job mostly differs in that it later broadcasts a dict
lookup table, but that's well after this step and on the face of it seems
unrelated). They also pull from the same dataset.

This is the last job I have to port over before we can sunset the old jobs
and I'm at my wits' end, so any suggestions are highly appreciated!

Thanks,

--Josh

Re: [Spark Core] unhashable type: 'dict' during shuffle step

Posted by Josh Holbrook <jo...@fusion.net>.
Hi all,

Just an update: I ran a variation of the job with the new latest_time code
and it failed again, but I think I was misreading the history dash. This
time, it shows 2 attempts, the second of which failed during the max call
as before, but the *first* of which appears to be failing during the
reduceByKey step:

http://imgur.com/a/ipVMY

so the max call is likely a red herring.

The weird thing is that I definitely wrote a variation that straight called
hash() on the key on generation of the RDD on which reduceByKey() is
called, as an assert/sanity check. I'll try this again, but I'm not keeping
my hopes up...

Thanks,

--Josh

On Tue, Jul 18, 2017 at 3:17 PM, Josh Holbrook <jo...@fusion.net>
wrote:

> Hello!
>
> I'm running into a very strange issue with pretty much no hits on the
> internet, and I'm hoping someone here can give me some protips! At this
> point, I'm at a loss. This is a little long-winded, but hopefully you'll
> indulge me.
>
> Background: I'm currently trying to port some existing spark jobs from
> scala to python as part of a greater effort to change owners of our
> analytics pipeline, both sets running on AWS EMR. For the most part this
> has been going reasonably well, excepting for some cluster tuning problems
> that I've since sorted out, but my last job is consistently failing (but
> only in prod, not locally with a reduced dataset). Because the jobs are
> ported from existing scala code, there are a few odd idioms that I think
> could be made smoother with a little bit of work, but I'd rather not rock
> the boat too much. I'm using EMR release 5.6.0, which has spark 2.1.0. I've
> also ran this against EMR 5.7.0/Spark 2.1.1, with the hope that this might
> magically fix my problem, but no dice. The traceback, at any rate, is from
> 2.1.0. Also potentially relevant is that I have spark configured to use
> python 3.4, which is the version of python 3 that ships with EMR.
>
> Anyway, the error I get is "unhashable type: 'dict'" from inside
> shuffle.py's ExternalMerger#mergeValues method--full stack trace at:
>
> https://gist.github.com/jfhbrook/5864939b498d7d1adb9856c1697cf2e5
>
> According to the history dashboard, this step is failing at:
>
>     max at /mnt/yarn/usercache/hadoop/appcache/application_
> 1500043838599_0001/container_1500043838599_0001_02_000001/
> kinjaspark.zip/kinjaspark/util.py:91
>
> Which in my code looks like this:
>
>     def get_latest_time(rdd):
>         return rdd.map(lambda event: event.timestamp).max()
>
> I realized while investigating this that I could rewrite this step to skip
> the map and use the key argument with the max call; I have a trial run of
> this change ongoing while I write this email. Timestamp, for what it's
> worth, is validated in my code as being a python datetime.
>
> Search results for this error are pretty scant
> https://encrypted.google.com/search?hl=en&q=spark%
> 20unhashable%20type%20dict%20%2Bshuffle%20mergeValues and mostly have to
> do with use of reduceByKey, which I'm not doing at this stage, so I think
> something else is going on. It's probably worth noting, though, that max is
> a thin wrapper around regular reduce https://github.com/apache/
> spark/blob/branch-2.1/python/pyspark/rdd.py#L1004-L1006 .
>
> As a matter of full disclosure, the source data in the rdd variable should
> be instances of an attrs-wrapped class https://pypi.python.org/pypi/
> attrs/17.2.0 and I can show the source for how we're loading the events
> if this is helpful. Another thing potentially worth noting is that the
> input RDDs are set to .persist(StorageLevel.DISK_ONLY) -- this is
> directly ported from the old job, and I want to experiment with removing
> it, but rocking the boat etc. I also do execute a reduceByKey at a later
> stage, though by assert its keys shouldn't involve dicts either (being of
> the form (str, datetime)).
>
> The data in the traceback is *almost* straightforward: max calls reduce,
> which ultimately creates a new PipelinedRDD https://github.com/apache/
> spark/blob/branch-2.1/python/pyspark/rdd.py#L335-L358 (I don't know what
> separates a pipelined RDD from a "regular" RDD), and somewhere in this
> process a shuffle is triggered and for whatever reason it tries to set the
> key of a dict to another dict https://github.com/apache/
> spark/blob/branch-2.1/python/pyspark/shuffle.py#L238 which causes python
> to explode. Things I don't know: What self.pdata is and how it's populated,
> how I can ensure that it doesn't populate with dicts as keys, and why this
> step doesn't fail for other jobs that have extremely similar logic (this
> job mostly differs in that it later broadcasts a dict lookup table, but
> that's well after this step and on the face of it seems unrelated). They
> also pull from the same dataset.
>
> This is the last job I have to port over before we can sunset the old jobs
> and I'm at my wits' end, so any suggestions are highly appreciated!
>
> Thanks,
>
> --Josh
>
>
>