You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Ron Gonzalez <zl...@yahoo.com> on 2014/07/24 23:41:15 UTC

cache changes precision

Hi,
  I'm doing the following:

  def main(args: Array[String]) = {
    val sparkConf = new SparkConf().setAppName("AvroTest").setMaster("local[2]")
    val sc = new SparkContext(sparkConf)
    val conf = new Configuration()
    val job = new Job(conf)
    val path = new Path("/tmp/a.avro");
    val schema = AvroUtils.getSchema(conf, path);

    AvroJob.setInputKeySchema(job, schema);
    
    val rdd = sc.newAPIHadoopFile(
       path.toString(),
       classOf[AvroKeyInputFormat[GenericRecord]],
       classOf[AvroKey[GenericRecord]],
       classOf[NullWritable], conf).map(x => x._1.datum())
    val sum = rdd.map(p => p.get("SEPAL_WIDTH").asInstanceOf[Float]).reduce(_ + _)
    val avg = sum/rdd.count()
    println(s"Sum = $sum")
    println(s"Avg = $avg")
  }

If I run this, it works as expected, when I add .cache() to 

val rdd = sc.newAPIHadoopFile(
       path.toString(),
       classOf[AvroKeyInputFormat[GenericRecord]],
       classOf[AvroKey[GenericRecord]],
       classOf[NullWritable], conf).map(x => x._1.datum()).cache()

then the command rounds up the average.

Any idea why this works this way? Any tips on how to fix this?

Thanks,
Ron

Re: cache changes precision

Posted by Ron Gonzalez <zl...@yahoo.com>.
Cool I'll take a look and give it a try!

Thanks,
Ron

Sent from my iPad

> On Jul 24, 2014, at 10:35 PM, Andrew Ash <an...@andrewash.com> wrote:
> 
> Hi Ron,
> 
> I think you're encountering the issue where cacheing data from Hadoop ends up with many duplicate values instead of what you expect.  Try adding a .clone() to the datum() call.
> 
> The issue is that Hadoop returns the same object many times but with its contents changed.  This is an optimization to prevent allocating and GC'ing an object for every row in Hadoop.  This works fine in Hadoop MapReduce because it's single-threaded and with no cacheing of the objects.
> 
> Spark though saves a reference to each object it gets back from Hadoop.  So by the end of the partition, Spark ends up with a bunch of references all to the same object!  I think it's just by chance that this ends up changing your average to be rounded.
> 
> Can you try with cloning the records in the map call?  Also look at the contents and see if they're actually changed, or if the resulting RDD after a cache is just the last record "smeared" across all the others.
> 
> Cheers,
> Andrew
> 
> 
>> On Thu, Jul 24, 2014 at 2:41 PM, Ron Gonzalez <zl...@yahoo.com> wrote:
>> Hi,
>>   I'm doing the following:
>> 
>>   def main(args: Array[String]) = {
>>     val sparkConf = new SparkConf().setAppName("AvroTest").setMaster("local[2]")
>>     val sc = new SparkContext(sparkConf)
>>     val conf = new Configuration()
>>     val job = new Job(conf)
>>     val path = new Path("/tmp/a.avro");
>>     val schema = AvroUtils.getSchema(conf, path);
>> 
>>     AvroJob.setInputKeySchema(job, schema);
>>     
>>     val rdd = sc.newAPIHadoopFile(
>>        path.toString(),
>>        classOf[AvroKeyInputFormat[GenericRecord]],
>>        classOf[AvroKey[GenericRecord]],
>>        classOf[NullWritable], conf).map(x => x._1.datum())
>>     val sum = rdd.map(p => p.get("SEPAL_WIDTH").asInstanceOf[Float]).reduce(_ + _)
>>     val avg = sum/rdd.count()
>>     println(s"Sum = $sum")
>>     println(s"Avg = $avg")
>>   }
>> 
>> If I run this, it works as expected, when I add .cache() to 
>> 
>> val rdd = sc.newAPIHadoopFile(
>>        path.toString(),
>>        classOf[AvroKeyInputFormat[GenericRecord]],
>>        classOf[AvroKey[GenericRecord]],
>>        classOf[NullWritable], conf).map(x => x._1.datum()).cache()
>> 
>> then the command rounds up the average.
>> 
>> Any idea why this works this way? Any tips on how to fix this?
>> 
>> Thanks,
>> Ron
> 

Re: cache changes precision

Posted by Andrew Ash <an...@andrewash.com>.
Hi Ron,

I think you're encountering the issue where cacheing data from Hadoop ends
up with many duplicate values instead of what you expect.  Try adding a
.clone() to the datum() call.

The issue is that Hadoop returns the same object many times but with its
contents changed.  This is an optimization to prevent allocating and GC'ing
an object for every row in Hadoop.  This works fine in Hadoop MapReduce
because it's single-threaded and with no cacheing of the objects.

Spark though saves a reference to each object it gets back from Hadoop.  So
by the end of the partition, Spark ends up with a bunch of references all
to the same object!  I think it's just by chance that this ends up changing
your average to be rounded.

Can you try with cloning the records in the map call?  Also look at the
contents and see if they're actually changed, or if the resulting RDD after
a cache is just the last record "smeared" across all the others.

Cheers,
Andrew


On Thu, Jul 24, 2014 at 2:41 PM, Ron Gonzalez <zl...@yahoo.com> wrote:

> Hi,
>   I'm doing the following:
>
>   def main(args: Array[String]) = {
>     val sparkConf = new
> SparkConf().setAppName("AvroTest").setMaster("local[2]")
>     val sc = new SparkContext(sparkConf)
>     val conf = new Configuration()
>     val job = new Job(conf)
>     val path = new Path("/tmp/a.avro");
>     val schema = AvroUtils.getSchema(conf, path);
>
>     AvroJob.setInputKeySchema(job, schema);
>
>     val rdd = sc.newAPIHadoopFile(
>        path.toString(),
>        classOf[AvroKeyInputFormat[GenericRecord]],
>        classOf[AvroKey[GenericRecord]],
>        classOf[NullWritable], conf).map(x => x._1.datum())
>     val sum = rdd.map(p =>
> p.get("SEPAL_WIDTH").asInstanceOf[Float]).reduce(_ + _)
>     val avg = sum/rdd.count()
>     println(s"Sum = $sum")
>     println(s"Avg = $avg")
>   }
>
> If I run this, it works as expected, when I add .cache() to
>
> val rdd = sc.newAPIHadoopFile(
>        path.toString(),
>        classOf[AvroKeyInputFormat[GenericRecord]],
>        classOf[AvroKey[GenericRecord]],
>        classOf[NullWritable], conf).map(x => x._1.datum()).cache()
>
> then the command rounds up the average.
>
> Any idea why this works this way? Any tips on how to fix this?
>
> Thanks,
> Ron
>
>