You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Devin Huang <ho...@163.com> on 2015/10/09 11:05:21 UTC

Different partition number of GroupByKey leads different result

Hi everyone,

     I got a trouble these days,and I don't know whether it is a bug of
spark.When I use  GroupByKey for our sequenceFile Data,I find that different
partition number lead different result, so as ReduceByKey. I think the
problem happens on the shuffle stage.I read the source code,  but still
can't find the answer.


this is the main code:

val rdd = sc.sequenceFile[UserWritable, TagsWritable](input,
classOf[UserWritable], classOf[TagsWritable])
val combinedRdd = rdd.map(s => (s._1.getuserid(),
s._2)).groupByKey(num).filter(_._1 == uid)

num is the number of partition and uid is a filter id for result
comparision.
TagsWritable implements WritableComparable<TagsWritable> and Serializable.

I used GroupByKey on text file, the result was right. 

Thanks,
Devin Huang




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Different-partition-number-of-GroupByKey-leads-different-result-tp24989.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Different partition number of GroupByKey leads different result

Posted by Devin Huang <ho...@163.com>.
Let me add.

The problem is that GroupByKey cannot divide our sequence data into groups
correctly ,and produce wrong key/value .The shuffle stage might not be
execute correctly.And I don’t know what leads this.


The type of key is String, and the type of value is TagsWritable.

I take out one user’s data for example.

when the partition number is 300, the value of this user is
2700000102,1.00;130098967f,1.00;2700000027,1.00;2700000001,1.00.
when the partition number is 100, the value of this user is
2800002133,1.00;150098921f,1.00;

I guess the wrong value is the other user’s value.The data may be mismatched
on the shuffle stage.







--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Different-partition-number-of-GroupByKey-leads-different-result-tp24989p24990.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Different partition number of GroupByKey leads different result

Posted by Sean Owen <so...@cloudera.com>.
If you are not copying or cloning the value (TagsWritable) object,
then that is likely the problem. The value is not immutable and is
changed by the InputFormat code reading the file, because it is
reused.

On Fri, Oct 9, 2015 at 11:04 AM, Devin Huang <ho...@163.com> wrote:
> Forgive me for not understanding what you mean.The sequence file key is UserWritable,and Value is TagsWritable.Both of them implement WritableComparable and Serializable and rewrite the clone().
> The key of string is collected from UserWritable through a map transformation.
>
> Have you ever read the spark source code?Which step can lead to data dislocation?
>
>> 在 2015年10月9日,17:37,Sean Owen <so...@cloudera.com> 写道:
>>
>> Another guess, since you say the key is String (offline): you are not
>> cloning the value of TagsWritable. Hadoop reuses the object under the
>> hood, and so is changing your object value. You can't save references
>> to the object you get from reading a SequenceFile.
>>
>> On Fri, Oct 9, 2015 at 10:22 AM, Sean Owen <so...@cloudera.com> wrote:
>>> First guess: your key class does not implement hashCode/equals
>>>
>>> On Fri, Oct 9, 2015 at 10:05 AM, Devin Huang <ho...@163.com> wrote:
>>>> Hi everyone,
>>>>
>>>>     I got a trouble these days,and I don't know whether it is a bug of
>>>> spark.When I use  GroupByKey for our sequenceFile Data,I find that different
>>>> partition number lead different result, so as ReduceByKey. I think the
>>>> problem happens on the shuffle stage.I read the source code,  but still
>>>> can't find the answer.
>>>>
>>>>
>>>> this is the main code:
>>>>
>>>> val rdd = sc.sequenceFile[UserWritable, TagsWritable](input,
>>>> classOf[UserWritable], classOf[TagsWritable])
>>>> val combinedRdd = rdd.map(s => (s._1.getuserid(),
>>>> s._2)).groupByKey(num).filter(_._1 == uid)
>>>>
>>>> num is the number of partition and uid is a filter id for result
>>>> comparision.
>>>> TagsWritable implements WritableComparable<TagsWritable> and Serializable.
>>>>
>>>> I used GroupByKey on text file, the result was right.
>>>>
>>>> Thanks,
>>>> Devin Huang
>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Different-partition-number-of-GroupByKey-leads-different-result-tp24989.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>
>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Different partition number of GroupByKey leads different result

Posted by Devin Huang <ho...@163.com>.
Forgive me for not understanding what you mean.The sequence file key is UserWritable,and Value is TagsWritable.Both of them implement WritableComparable and Serializable and rewrite the clone().
The key of string is collected from UserWritable through a map transformation.

Have you ever read the spark source code?Which step can lead to data dislocation?

> 在 2015年10月9日,17:37,Sean Owen <so...@cloudera.com> 写道:
> 
> Another guess, since you say the key is String (offline): you are not
> cloning the value of TagsWritable. Hadoop reuses the object under the
> hood, and so is changing your object value. You can't save references
> to the object you get from reading a SequenceFile.
> 
> On Fri, Oct 9, 2015 at 10:22 AM, Sean Owen <so...@cloudera.com> wrote:
>> First guess: your key class does not implement hashCode/equals
>> 
>> On Fri, Oct 9, 2015 at 10:05 AM, Devin Huang <ho...@163.com> wrote:
>>> Hi everyone,
>>> 
>>>     I got a trouble these days,and I don't know whether it is a bug of
>>> spark.When I use  GroupByKey for our sequenceFile Data,I find that different
>>> partition number lead different result, so as ReduceByKey. I think the
>>> problem happens on the shuffle stage.I read the source code,  but still
>>> can't find the answer.
>>> 
>>> 
>>> this is the main code:
>>> 
>>> val rdd = sc.sequenceFile[UserWritable, TagsWritable](input,
>>> classOf[UserWritable], classOf[TagsWritable])
>>> val combinedRdd = rdd.map(s => (s._1.getuserid(),
>>> s._2)).groupByKey(num).filter(_._1 == uid)
>>> 
>>> num is the number of partition and uid is a filter id for result
>>> comparision.
>>> TagsWritable implements WritableComparable<TagsWritable> and Serializable.
>>> 
>>> I used GroupByKey on text file, the result was right.
>>> 
>>> Thanks,
>>> Devin Huang
>>> 
>>> 
>>> 
>>> 
>>> --
>>> View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Different-partition-number-of-GroupByKey-leads-different-result-tp24989.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>> 
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>> For additional commands, e-mail: user-help@spark.apache.org
>>> 



---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Different partition number of GroupByKey leads different result

Posted by Sean Owen <so...@cloudera.com>.
Another guess, since you say the key is String (offline): you are not
cloning the value of TagsWritable. Hadoop reuses the object under the
hood, and so is changing your object value. You can't save references
to the object you get from reading a SequenceFile.

On Fri, Oct 9, 2015 at 10:22 AM, Sean Owen <so...@cloudera.com> wrote:
> First guess: your key class does not implement hashCode/equals
>
> On Fri, Oct 9, 2015 at 10:05 AM, Devin Huang <ho...@163.com> wrote:
>> Hi everyone,
>>
>>      I got a trouble these days,and I don't know whether it is a bug of
>> spark.When I use  GroupByKey for our sequenceFile Data,I find that different
>> partition number lead different result, so as ReduceByKey. I think the
>> problem happens on the shuffle stage.I read the source code,  but still
>> can't find the answer.
>>
>>
>> this is the main code:
>>
>> val rdd = sc.sequenceFile[UserWritable, TagsWritable](input,
>> classOf[UserWritable], classOf[TagsWritable])
>> val combinedRdd = rdd.map(s => (s._1.getuserid(),
>> s._2)).groupByKey(num).filter(_._1 == uid)
>>
>> num is the number of partition and uid is a filter id for result
>> comparision.
>> TagsWritable implements WritableComparable<TagsWritable> and Serializable.
>>
>> I used GroupByKey on text file, the result was right.
>>
>> Thanks,
>> Devin Huang
>>
>>
>>
>>
>> --
>> View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Different-partition-number-of-GroupByKey-leads-different-result-tp24989.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Different partition number of GroupByKey leads different result

Posted by Sean Owen <so...@cloudera.com>.
First guess: your key class does not implement hashCode/equals

On Fri, Oct 9, 2015 at 10:05 AM, Devin Huang <ho...@163.com> wrote:
> Hi everyone,
>
>      I got a trouble these days,and I don't know whether it is a bug of
> spark.When I use  GroupByKey for our sequenceFile Data,I find that different
> partition number lead different result, so as ReduceByKey. I think the
> problem happens on the shuffle stage.I read the source code,  but still
> can't find the answer.
>
>
> this is the main code:
>
> val rdd = sc.sequenceFile[UserWritable, TagsWritable](input,
> classOf[UserWritable], classOf[TagsWritable])
> val combinedRdd = rdd.map(s => (s._1.getuserid(),
> s._2)).groupByKey(num).filter(_._1 == uid)
>
> num is the number of partition and uid is a filter id for result
> comparision.
> TagsWritable implements WritableComparable<TagsWritable> and Serializable.
>
> I used GroupByKey on text file, the result was right.
>
> Thanks,
> Devin Huang
>
>
>
>
> --
> View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Different-partition-number-of-GroupByKey-leads-different-result-tp24989.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org