You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sean Owen (JIRA)" <ji...@apache.org> on 2015/09/10 16:32:45 UTC

[jira] [Commented] (SPARK-10536) filtered POJOs replaced by other instances after collect()

    [ https://issues.apache.org/jira/browse/SPARK-10536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14738832#comment-14738832 ] 

Sean Owen commented on SPARK-10536:
-----------------------------------

My snap guess is that this is the problem: {{f._1.datum()}}

You aren't cloning / copying the value you get directly from Hadoop's InputFormat API. It reuses the object internally. I think you're operating repeatedly on the last value it read. Possible?

> filtered POJOs replaced by other instances after collect()
> ----------------------------------------------------------
>
>                 Key: SPARK-10536
>                 URL: https://issues.apache.org/jira/browse/SPARK-10536
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.4.1
>            Reporter: Erik Schmiegelow
>
> I've encountered a very strange phenomenon with collect() in a simplistic program written for debugging purposes.
> The objective of the program is to filter objects which match an id and print their contents to stderr so that we can have a look at the contents. Our initial plan was to have the driver do that, because we run our applications in a YARN cluster and we didn't want to have to look for the executor instance first before looking at the log files.
> We then discovered that the results after collect didn't match the ids for which we had filtered, so we added a few debugging statements to find out what happened. Interestingly enough, we get the correct instances when we look at the instances with filter() or map() - on the executor. Once the instances are sent back to the driver, the instances are swapped. More intriguingly, we always get the same set of incorrect instances.
> Here' s the code:
> {code}
>     val rdd = sparkContext.newAPIHadoopFile(
>       input, classOf[AvroKeyInputFormat[Visitor]], classOf[AvroKey[Visitor]], classOf[NullWritable]).map(
>       f => f._1.datum()
>       ).filter(visitor => {
>       val result = visitor.eid == eid
>       if (result) {
>         println(s"Found match ${visitor.eid} for $eid with hash ${visitor.hashCode()}")
>         val mapper = new ObjectMapper()
>         System.err.println(mapper.writerWithDefaultPrettyPrinter().writeValueAsString(visitor))
>       }
>       result
>     }).map(f => {
>       val mapper = new ObjectMapper()
>       println(s"Map Output of visitor ${f.eid} for $eid with hash ${f.hashCode()}")
>       System.err.println(mapper.writerWithDefaultPrettyPrinter().writeValueAsString(f))
>       f
>     }).collect().foreach(f => {
>       val mapper = new ObjectMapper()
>       println(s"Collect Output of visitor ${f.eid} for $eid with hash ${f.hashCode()}")
>       System.err.println(mapper.writerWithDefaultPrettyPrinter().writeValueAsString(f))
>       f})
> {code}
> The output we get in the Executor (filter + map) is as follows:
> {code}
> Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash 1105567550
> {code}
> The output on the driver is this:
> {code}
> Collect Output of visitor 4143521127100013504 for 4143922947700659073 with hash 1405504607
> Collect Output of visitor 4143521127100013504 for 4143922947700659073 with hash 1405504607
> Collect Output of visitor 4143521127100013504 for 4143922947700659073 with hash 1405504607
> Collect Output of visitor 4143521127100013504 for 4143922947700659073 with hash 1405504607
> Collect Output of visitor 4143521127100013504 for 4143922947700659073 with hash 1405504607
> Collect Output of visitor 4143521127100013504 for 4143922947700659073 with hash 1405504607
> Collect Output of visitor 4143521127100013504 for 4143922947700659073 with hash 1405504607
> Collect Output of visitor 4143521127100013504 for 4143922947700659073 with hash 1405504607
> Collect Output of visitor 4143521127100013504 for 4143922947700659073 with hash 1405504607
> Collect Output of visitor 4143521127100013504 for 4143922947700659073 with hash 1405504607
> Collect Output of visitor 4143521127100013504 for 4143922947700659073 with hash 1405504607
> Collect Output of visitor 4143521127100013504 for 4143922947700659073 with hash 1405504607
> Collect Output of visitor 4143521127100013504 for 4143922947700659073 with hash 1405504607
> {code}
> Some notes on the input: 
> - we use reflective Avro to serialize to HDFS
> - we've got about 5 GB of data



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org