You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Ajay Srivastava <a_...@yahoo.com> on 2014/06/04 14:32:32 UTC

Join : Giving incorrect result

Hi,

I am doing join of two RDDs which giving different results ( counting number of records ) each time I run this code on same input.

The input files are large enough to be divided in two splits. When the program runs on two workers with single core assigned to these, output is consistent and looks correct. But when single worker is used with two or more than two cores, the result seems to be random. Every time, count of joined record is different.

Does this sound like a defect or I need to take care of something while using join ? I am using spark-0.9.1.


Regards
Ajay

Re: Join : Giving incorrect result

Posted by Cheng Lian <li...@gmail.com>.
Hi Ajay, would you mind to synthesise a minimum code snippet that can
reproduce this issue and paste it here?


On Wed, Jun 4, 2014 at 8:32 PM, Ajay Srivastava <a_...@yahoo.com>
wrote:

> Hi,
>
> I am doing join of two RDDs which giving different results ( counting
> number of records ) each time I run this code on same input.
>
> The input files are large enough to be divided in two splits. When the
> program runs on two workers with single core assigned to these, output is
> consistent and looks correct. But when single worker is used with two or
> more than two cores, the result seems to be random. Every time, count of
> joined record is different.
>
> Does this sound like a defect or I need to take care of something while
> using join ? I am using spark-0.9.1.
>
> Regards
> Ajay
>

Re: Join : Giving incorrect result

Posted by Ajay Srivastava <a_...@yahoo.com>.

Thanks Matei. We have tested the fix and it's working perfectly.

Andrew, we set spark.shuffle.spill=false but the application goes out of memory. I think that is expected.

Regards,Ajay 


On Friday, June 6, 2014 3:49 AM, Andrew Ash <an...@andrewash.com> wrote:
 


Hi Ajay,

Can you please try running the same code with spark.shuffle.spill=false and see if the numbers turn out correctly?  That parameter controls whether or not the buggy code that Matei fixed in ExternalAppendOnlyMap is used.

FWIW I saw similar issues in 0.9.0 but no longer in 0.9.1 after I think some fixes in spilling landed.

Andrew



On Thu, Jun 5, 2014 at 3:05 PM, Matei Zaharia <ma...@gmail.com> wrote:

Hey Ajay, thanks for reporting this. There was indeed a bug, specifically in the way join tasks spill to disk (which happened when you had more concurrent tasks competing for memory). I’ve posted a patch for it here: https://github.com/apache/spark/pull/986. Feel free to try that if you’d like; it will also be in 0.9.2 and 1.0.1.
>
>
>Matei
>
>
>On Jun 5, 2014, at 12:19 AM, Ajay Srivastava <a_...@yahoo.com> wrote:
>
>Sorry for replying late. It was night here.
>>
>>
>>Lian/Matei,
>>Here is the code snippet -
>>    sparkConf.set("spark.executor.memory", "10g")
>>    sparkConf.set("spark.cores.max", "5")
>>    
>>    val sc = new SparkContext(sparkConf)
>>    
>>    val accId2LocRDD = sc.textFile("hdfs://bbr-dev178:9000/data/subDbSpark/account2location").map(getKeyValueFromString(_, 0, ',', true))
>>      
>>    val accId2DemoRDD = sc.textFile("hdfs://bbr-dev178:9000/data/subDbSpark/account2demographic_planType").map(getKeyValueFromString(_, 0, ',', true))
>>    
>>    val joinedRDD = accId2LocRDD.join(accId2DemoRDD)
>>
>>
>>  def getKeyValueFromString(line: String, keyIndex: Int, delimit: Char, retFullLine: Boolean): Tuple2[String, String] = {
>>    val splits = line.split(delimit)
>>    if (splits.length <= 1) {
>>      (null, null)
>>    } else if (retFullLine) {
>>      (splits(keyIndex), line)
>>    } else{
>>        (splits(keyIndex), splits(splits.length-keyIndex-1))
>>    }
>>  }
>>
>>    
>>
>>Both of these files have 10 M records with same unique keys. Size of the file is nearly 280 MB and block size in hdfs is 256 MB. The output of join should contain 10 M records.
>>
>>
>>
>>We have done some more experiments -
>>1) Running cogroup instead of join - it also gives incorrect count.
>>2) Running union followed by groupbykey and then filtering records with two entries in sequence - It also gives incorrect count.
>>3) Increase spark.executor.memory to 50 g and everything works fine. Count comes 10 M for join,cogroup and union/groupbykey/filter transformations.
>>
>>
>>
>>I thought that 10g is enough memory for executors but even if the memory is less it should not result in incorrect computation. Probably there is a problem in reconstructing RDDs when memory is not enough. 
>>
>>
>>
>>Thanks Chen for your observation. I get this problem on single worker so there will not be any mismatch of jars. On two workers, since executor memory gets doubled the code works fine.
>>
>>
>>
>>Regards,
>>Ajay
>>
>>
>>
>>
>>On Thursday, June 5, 2014 1:35 AM, Matei Zaharia <ma...@gmail.com> wrote:
>> 
>>
>>
>>If this isn’t the problem, it would be great if you can post the code for the program.
>>
>>
>>Matei
>>
>>
>>
>>On Jun 4, 2014, at 12:58 PM, Xu (Simon) Chen <xc...@gmail.com> wrote:
>>
>>Maybe your two workers have different assembly jar files?
>>>I just ran into a similar problem that my spark-shell is using a different jar file than my workers - got really confusing results.
>>>On Jun 4, 2014 8:33 AM, "Ajay Srivastava" <a_...@yahoo.com> wrote:
>>>
>>>Hi,
>>>>
>>>>
>>>>I am doing join of two RDDs which giving different results ( counting number of records ) each time I run this code on same input.
>>>>
>>>>
>>>>The input files are large enough to be divided in two splits. When the program runs on two workers with single core assigned to these, output is consistent and looks correct. But when single worker is used with two or more than two cores, the result seems to be random. Every time, count of joined record is different.
>>>>
>>>>
>>>>Does this sound like a defect or I need to take care of something while using join ? I am using spark-0.9.1.
>>>>
>>>>
>>>>
>>>>Regards
>>>>Ajay
>>
>>
>>
>

Re: Join : Giving incorrect result

Posted by Andrew Ash <an...@andrewash.com>.
Hi Ajay,

Can you please try running the same code with spark.shuffle.spill=false and
see if the numbers turn out correctly?  That parameter controls whether or
not the buggy code that Matei fixed in ExternalAppendOnlyMap is used.

FWIW I saw similar issues in 0.9.0 but no longer in 0.9.1 after I think
some fixes in spilling landed.

Andrew


On Thu, Jun 5, 2014 at 3:05 PM, Matei Zaharia <ma...@gmail.com>
wrote:

> Hey Ajay, thanks for reporting this. There was indeed a bug, specifically
> in the way join tasks spill to disk (which happened when you had more
> concurrent tasks competing for memory). I’ve posted a patch for it here:
> https://github.com/apache/spark/pull/986. Feel free to try that if you’d
> like; it will also be in 0.9.2 and 1.0.1.
>
> Matei
>
> On Jun 5, 2014, at 12:19 AM, Ajay Srivastava <a_...@yahoo.com>
> wrote:
>
> Sorry for replying late. It was night here.
>
> Lian/Matei,
> Here is the code snippet -
>     sparkConf.set("spark.executor.memory", "10g")
>     sparkConf.set("spark.cores.max", "5")
>
>     val sc = new SparkContext(sparkConf)
>
>     val accId2LocRDD = sc.textFile("
> hdfs://bbr-dev178:9000/data/subDbSpark/account2location").map(getKeyValueFromString(_,
> 0, ',', true))
>
>     val accId2DemoRDD = sc.textFile("
> hdfs://bbr-dev178:9000/data/subDbSpark/account2demographic_planType").map(getKeyValueFromString(_,
> 0, ',', true))
>
>     val joinedRDD = accId2LocRDD.join(accId2DemoRDD)
>
>   def getKeyValueFromString(line: String, keyIndex: Int, delimit: Char,
> retFullLine: Boolean): Tuple2[String, String] = {
>     val splits = line.split(delimit)
>     if (splits.length <= 1) {
>       (null, null)
>     } else if (retFullLine) {
>       (splits(keyIndex), line)
>     } else{
>         (splits(keyIndex), splits(splits.length-keyIndex-1))
>     }
>   }
>
> Both of these files have 10 M records with same unique keys. Size of the
> file is nearly 280 MB and block size in hdfs is 256 MB. The output of join
> should contain 10 M records.
>
> We have done some more experiments -
> 1) Running cogroup instead of join - it also gives incorrect count.
> 2) Running union followed by groupbykey and then filtering records with
> two entries in sequence - It also gives incorrect count.
> 3) Increase spark.executor.memory to 50 g and everything works fine. Count
> comes 10 M for join,cogroup and union/groupbykey/filter transformations.
>
> I thought that 10g is enough memory for executors but even if the memory
> is less it should not result in incorrect computation. Probably there is a
> problem in reconstructing RDDs when memory is not enough.
>
> Thanks Chen for your observation. I get this problem on single worker so
> there will not be any mismatch of jars. On two workers, since executor
> memory gets doubled the code works fine.
>
> Regards,
> Ajay
>
>
>   On Thursday, June 5, 2014 1:35 AM, Matei Zaharia <
> matei.zaharia@gmail.com> wrote:
>
>
>  If this isn’t the problem, it would be great if you can post the code
> for the program.
>
> Matei
>
> On Jun 4, 2014, at 12:58 PM, Xu (Simon) Chen <xc...@gmail.com> wrote:
>
> Maybe your two workers have different assembly jar files?
> I just ran into a similar problem that my spark-shell is using a different
> jar file than my workers - got really confusing results.
> On Jun 4, 2014 8:33 AM, "Ajay Srivastava" <a_...@yahoo.com>
> wrote:
>
> Hi,
>
> I am doing join of two RDDs which giving different results ( counting
> number of records ) each time I run this code on same input.
>
> The input files are large enough to be divided in two splits. When the
> program runs on two workers with single core assigned to these, output is
> consistent and looks correct. But when single worker is used with two or
> more than two cores, the result seems to be random. Every time, count of
> joined record is different.
>
> Does this sound like a defect or I need to take care of something while
> using join ? I am using spark-0.9.1.
>
> Regards
> Ajay
>
>
>
>
>
>

Re: Join : Giving incorrect result

Posted by Matei Zaharia <ma...@gmail.com>.
Hey Ajay, thanks for reporting this. There was indeed a bug, specifically in the way join tasks spill to disk (which happened when you had more concurrent tasks competing for memory). I’ve posted a patch for it here: https://github.com/apache/spark/pull/986. Feel free to try that if you’d like; it will also be in 0.9.2 and 1.0.1.

Matei

On Jun 5, 2014, at 12:19 AM, Ajay Srivastava <a_...@yahoo.com> wrote:

> Sorry for replying late. It was night here.
> 
> Lian/Matei,
> Here is the code snippet -
>     sparkConf.set("spark.executor.memory", "10g")
>     sparkConf.set("spark.cores.max", "5")
>     
>     val sc = new SparkContext(sparkConf)
>     
>     val accId2LocRDD = sc.textFile("hdfs://bbr-dev178:9000/data/subDbSpark/account2location").map(getKeyValueFromString(_, 0, ',', true))
>       
>     val accId2DemoRDD = sc.textFile("hdfs://bbr-dev178:9000/data/subDbSpark/account2demographic_planType").map(getKeyValueFromString(_, 0, ',', true))
>     
>     val joinedRDD = accId2LocRDD.join(accId2DemoRDD)
> 
>   def getKeyValueFromString(line: String, keyIndex: Int, delimit: Char, retFullLine: Boolean): Tuple2[String, String] = {
>     val splits = line.split(delimit)
>     if (splits.length <= 1) {
>       (null, null)
>     } else if (retFullLine) {
>       (splits(keyIndex), line)
>     } else{
>         (splits(keyIndex), splits(splits.length-keyIndex-1))
>     }
>   }
>     
> Both of these files have 10 M records with same unique keys. Size of the file is nearly 280 MB and block size in hdfs is 256 MB. The output of join should contain 10 M records.
> 
> We have done some more experiments -
> 1) Running cogroup instead of join - it also gives incorrect count.
> 2) Running union followed by groupbykey and then filtering records with two entries in sequence - It also gives incorrect count.
> 3) Increase spark.executor.memory to 50 g and everything works fine. Count comes 10 M for join,cogroup and union/groupbykey/filter transformations.
> 
> I thought that 10g is enough memory for executors but even if the memory is less it should not result in incorrect computation. Probably there is a problem in reconstructing RDDs when memory is not enough. 
> 
> Thanks Chen for your observation. I get this problem on single worker so there will not be any mismatch of jars. On two workers, since executor memory gets doubled the code works fine.
> 
> Regards,
> Ajay
> 
> 
> On Thursday, June 5, 2014 1:35 AM, Matei Zaharia <ma...@gmail.com> wrote:
> 
> 
> If this isn’t the problem, it would be great if you can post the code for the program.
> 
> Matei
> 
> On Jun 4, 2014, at 12:58 PM, Xu (Simon) Chen <xc...@gmail.com> wrote:
> 
>> Maybe your two workers have different assembly jar files?
>> I just ran into a similar problem that my spark-shell is using a different jar file than my workers - got really confusing results.
>> On Jun 4, 2014 8:33 AM, "Ajay Srivastava" <a_...@yahoo.com> wrote:
>> Hi,
>> 
>> I am doing join of two RDDs which giving different results ( counting number of records ) each time I run this code on same input.
>> 
>> The input files are large enough to be divided in two splits. When the program runs on two workers with single core assigned to these, output is consistent and looks correct. But when single worker is used with two or more than two cores, the result seems to be random. Every time, count of joined record is different.
>> 
>> Does this sound like a defect or I need to take care of something while using join ? I am using spark-0.9.1.
>> 
>> Regards
>> Ajay
> 
> 
> 


Re: Join : Giving incorrect result

Posted by Ajay Srivastava <a_...@yahoo.com>.
Sorry for replying late. It was night here.

Lian/Matei,
Here is the code snippet -
    sparkConf.set("spark.executor.memory", "10g")
    sparkConf.set("spark.cores.max", "5")
    
    val sc = new SparkContext(sparkConf)
    
    val accId2LocRDD = sc.textFile("hdfs://bbr-dev178:9000/data/subDbSpark/account2location").map(getKeyValueFromString(_, 0, ',', true))
      
    val accId2DemoRDD = sc.textFile("hdfs://bbr-dev178:9000/data/subDbSpark/account2demographic_planType").map(getKeyValueFromString(_, 0, ',', true))
    
    val joinedRDD = accId2LocRDD.join(accId2DemoRDD)

  def getKeyValueFromString(line: String, keyIndex: Int, delimit: Char, retFullLine: Boolean): Tuple2[String, String] = {
    val splits = line.split(delimit)
    if (splits.length <= 1) {
      (null, null)
    } else if (retFullLine) {
      (splits(keyIndex), line)
    } else{
        (splits(keyIndex), splits(splits.length-keyIndex-1))
    }
  }

    

Both of these files have 10 M records with same unique keys. Size of the file is nearly 280 MB and block size in hdfs is 256 MB. The output of join should contain 10 M records.


We have done some more experiments -
1) Running cogroup instead of join - it also gives incorrect count.
2) Running union followed by groupbykey and then filtering records with two entries in sequence - It also gives incorrect count.
3) Increase spark.executor.memory to 50 g and everything works fine. Count comes 10 M for join,cogroup and union/groupbykey/filter transformations.


I thought that 10g is enough memory for executors but even if the memory is less it should not result in incorrect computation. Probably there is a problem in reconstructing RDDs when memory is not enough. 


Thanks Chen for your observation. I get this problem on single worker so there will not be any mismatch of jars. On two workers, since executor memory gets doubled the code works fine.


Regards,
Ajay



On Thursday, June 5, 2014 1:35 AM, Matei Zaharia <ma...@gmail.com> wrote:
 


If this isn’t the problem, it would be great if you can post the code for the program.

Matei



On Jun 4, 2014, at 12:58 PM, Xu (Simon) Chen <xc...@gmail.com> wrote:

Maybe your two workers have different assembly jar files?
>I just ran into a similar problem that my spark-shell is using a different jar file than my workers - got really confusing results.
>On Jun 4, 2014 8:33 AM, "Ajay Srivastava" <a_...@yahoo.com> wrote:
>
>Hi,
>>
>>
>>I am doing join of two RDDs which giving different results ( counting number of records ) each time I run this code on same input.
>>
>>
>>The input files are large enough to be divided in two splits. When the program runs on two workers with single core assigned to these, output is consistent and looks correct. But when single worker is used with two or more than two cores, the result seems to be random. Every time, count of joined record is different.
>>
>>
>>Does this sound like a defect or I need to take care of something while using join ? I am using spark-0.9.1.
>>
>>
>>
>>Regards
>>Ajay

Re: Join : Giving incorrect result

Posted by Matei Zaharia <ma...@gmail.com>.
If this isn’t the problem, it would be great if you can post the code for the program.

Matei

On Jun 4, 2014, at 12:58 PM, Xu (Simon) Chen <xc...@gmail.com> wrote:

> Maybe your two workers have different assembly jar files?
> 
> I just ran into a similar problem that my spark-shell is using a different jar file than my workers - got really confusing results.
> 
> On Jun 4, 2014 8:33 AM, "Ajay Srivastava" <a_...@yahoo.com> wrote:
> Hi,
> 
> I am doing join of two RDDs which giving different results ( counting number of records ) each time I run this code on same input.
> 
> The input files are large enough to be divided in two splits. When the program runs on two workers with single core assigned to these, output is consistent and looks correct. But when single worker is used with two or more than two cores, the result seems to be random. Every time, count of joined record is different.
> 
> Does this sound like a defect or I need to take care of something while using join ? I am using spark-0.9.1.
> 
> Regards
> Ajay


Re: Join : Giving incorrect result

Posted by "Xu (Simon) Chen" <xc...@gmail.com>.
Maybe your two workers have different assembly jar files?

I just ran into a similar problem that my spark-shell is using a different
jar file than my workers - got really confusing results.
On Jun 4, 2014 8:33 AM, "Ajay Srivastava" <a_...@yahoo.com> wrote:

> Hi,
>
> I am doing join of two RDDs which giving different results ( counting
> number of records ) each time I run this code on same input.
>
> The input files are large enough to be divided in two splits. When the
> program runs on two workers with single core assigned to these, output is
> consistent and looks correct. But when single worker is used with two or
> more than two cores, the result seems to be random. Every time, count of
> joined record is different.
>
> Does this sound like a defect or I need to take care of something while
> using join ? I am using spark-0.9.1.
>
> Regards
> Ajay
>