You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sean Owen (JIRA)" <ji...@apache.org> on 2015/09/10 16:32:45 UTC
[jira] [Commented] (SPARK-10536) filtered POJOs replaced by other
instances after collect()
[ https://issues.apache.org/jira/browse/SPARK-10536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14738832#comment-14738832 ]
Sean Owen commented on SPARK-10536:
-----------------------------------
My snap guess is that this is the problem: {{f._1.datum()}}
You aren't cloning / copying the value you get directly from Hadoop's InputFormat API. It reuses the object internally. I think you're operating repeatedly on the last value it read. Possible?
> filtered POJOs replaced by other instances after collect()
> ----------------------------------------------------------
>
> Key: SPARK-10536
> URL: https://issues.apache.org/jira/browse/SPARK-10536
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 1.4.1
> Reporter: Erik Schmiegelow
>
> I've encountered a very strange phenomenon with collect() in a simplistic program written for debugging purposes.
> The objective of the program is to filter objects which match an id and print their contents to stderr so that we can have a look at the contents. Our initial plan was to have the driver do that, because we run our applications in a YARN cluster and we didn't want to have to look for the executor instance first before looking at the log files.
> We then discovered that the results after collect didn't match the ids for which we had filtered, so we added a few debugging statements to find out what happened. Interestingly enough, we get the correct instances when we look at the instances with filter() or map() - on the executor. Once the instances are sent back to the driver, the instances are swapped. More intriguingly, we always get the same set of incorrect instances.
> Here' s the code:
> {code}
> val rdd = sparkContext.newAPIHadoopFile(
> input, classOf[AvroKeyInputFormat[Visitor]], classOf[AvroKey[Visitor]], classOf[NullWritable]).map(
> f => f._1.datum()
> ).filter(visitor => {
> val result = visitor.eid == eid
> if (result) {
> println(s"Found match ${visitor.eid} for $eid with hash ${visitor.hashCode()}")
> val mapper = new ObjectMapper()
> System.err.println(mapper.writerWithDefaultPrettyPrinter().writeValueAsString(visitor))
> }
> result
> }).map(f => {
> val mapper = new ObjectMapper()
> println(s"Map Output of visitor ${f.eid} for $eid with hash ${f.hashCode()}")
> System.err.println(mapper.writerWithDefaultPrettyPrinter().writeValueAsString(f))
> f
> }).collect().foreach(f => {
> val mapper = new ObjectMapper()
> println(s"Collect Output of visitor ${f.eid} for $eid with hash ${f.hashCode()}")
> System.err.println(mapper.writerWithDefaultPrettyPrinter().writeValueAsString(f))
> f})
> {code}
> The output we get in the Executor (filter + map) is as follows:
> {code}
> Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash 1105567550
> {code}
> The output on the driver is this:
> {code}
> Collect Output of visitor 4143521127100013504 for 4143922947700659073 with hash 1405504607
> Collect Output of visitor 4143521127100013504 for 4143922947700659073 with hash 1405504607
> Collect Output of visitor 4143521127100013504 for 4143922947700659073 with hash 1405504607
> Collect Output of visitor 4143521127100013504 for 4143922947700659073 with hash 1405504607
> Collect Output of visitor 4143521127100013504 for 4143922947700659073 with hash 1405504607
> Collect Output of visitor 4143521127100013504 for 4143922947700659073 with hash 1405504607
> Collect Output of visitor 4143521127100013504 for 4143922947700659073 with hash 1405504607
> Collect Output of visitor 4143521127100013504 for 4143922947700659073 with hash 1405504607
> Collect Output of visitor 4143521127100013504 for 4143922947700659073 with hash 1405504607
> Collect Output of visitor 4143521127100013504 for 4143922947700659073 with hash 1405504607
> Collect Output of visitor 4143521127100013504 for 4143922947700659073 with hash 1405504607
> Collect Output of visitor 4143521127100013504 for 4143922947700659073 with hash 1405504607
> Collect Output of visitor 4143521127100013504 for 4143922947700659073 with hash 1405504607
> {code}
> Some notes on the input:
> - we use reflective Avro to serialize to HDFS
> - we've got about 5 GB of data
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org