You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by ponkin <al...@ya.ru> on 2016/01/11 22:13:34 UTC

[KafkaRDD]: rdd.cache() does not seem to work

Hi,

Here is my use case :
I have kafka topic. The job is fairly simple - it reads topic and save data to several hdfs paths.
I create rdd with the following code
 val r =  KafkaUtils.createRDD[Array[Byte],Array[Byte],DefaultDecoder,DefaultDecoder](context,kafkaParams,range)
Then I am trying to cache that rdd with 
 r.cache()
and then save this rdd to several hdfs locations.
But it seems that KafkaRDD is fetching data from kafka broker every time I call saveAsNewAPIHadoopFile.

How can I cache data from Kafka in memory?

P.S. When I do repartition add it seems to work properly( read kafka only once) but spark store shuffled data localy.
Is it possible to keep data in memory?




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/KafkaRDD-rdd-cache-does-not-seem-to-work-tp25936.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: [KafkaRDD]: rdd.cache() does not seem to work

Posted by Tathagata Das <td...@databricks.com>.
Can you just simplify the code and run a few counts to see if the cache is
being used (later jobs are faster or not). In addition, use the Spark UI to
see whether it is cached, see the DAG viz of the job to see whethr it is
using the cached RDD or not (DAG will show a green vertex if RDD is
cached).

TD

On Tue, Jan 12, 2016 at 12:46 AM, Понькин Алексей <al...@ya.ru>
wrote:

> Hi Charles,
>
> I have created very simplified job -
> https://github.com/ponkin/KafkaSnapshot to illustrate the problem.
> https://github.com/ponkin/KafkaSnapshot/blob/master/src/main/scala/ru/ponkin/KafkaSnapshot.scala
>
> In a short - may be persist method is working but not like I expected.
> I thought that spark will fetch all data from kafka topic once and cache
> it in memory, instead add is calculating every time I call saveAsObjectFile
> method
>
> --
> Яндекс.Почта — надёжная почта
> http://mail.yandex.ru/neo2/collect/?exp=1&t=1
>
>
> 12.01.2016, 10:56, "charles li" <ch...@gmail.com>:
> > cache is the default storage level of persist, and it is lazy [ not
> cached indeed ] until the first time it is computed.
> >
> > ​
> >
> > On Tue, Jan 12, 2016 at 5:13 AM, ponkin <al...@ya.ru> wrote:
> >> Hi,
> >>
> >> Here is my use case :
> >> I have kafka topic. The job is fairly simple - it reads topic and save
> data to several hdfs paths.
> >> I create rdd with the following code
> >>  val r =
>  KafkaUtils.createRDD[Array[Byte],Array[Byte],DefaultDecoder,DefaultDecoder](context,kafkaParams,range)
> >> Then I am trying to cache that rdd with
> >>  r.cache()
> >> and then save this rdd to several hdfs locations.
> >> But it seems that KafkaRDD is fetching data from kafka broker every
> time I call saveAsNewAPIHadoopFile.
> >>
> >> How can I cache data from Kafka in memory?
> >>
> >> P.S. When I do repartition add it seems to work properly( read kafka
> only once) but spark store shuffled data localy.
> >> Is it possible to keep data in memory?
> >>
> >> ----------------------------------------
> >> View this message in context: [KafkaRDD]: rdd.cache() does not seem to
> work
> >> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>

Re: [KafkaRDD]: rdd.cache() does not seem to work

Posted by Понькин Алексей <al...@ya.ru>.
Hi Charles,

I have created very simplified job - https://github.com/ponkin/KafkaSnapshot to illustrate the problem. https://github.com/ponkin/KafkaSnapshot/blob/master/src/main/scala/ru/ponkin/KafkaSnapshot.scala

In a short - may be persist method is working but not like I expected.
I thought that spark will fetch all data from kafka topic once and cache it in memory, instead add is calculating every time I call saveAsObjectFile method

-- 
Яндекс.Почта — надёжная почта
http://mail.yandex.ru/neo2/collect/?exp=1&t=1


12.01.2016, 10:56, "charles li" <ch...@gmail.com>:
> cache is the default storage level of persist, and it is lazy [ not cached indeed ] until the first time it is computed.
>
> ​
>
> On Tue, Jan 12, 2016 at 5:13 AM, ponkin <al...@ya.ru> wrote:
>> Hi,
>>
>> Here is my use case :
>> I have kafka topic. The job is fairly simple - it reads topic and save data to several hdfs paths.
>> I create rdd with the following code
>>  val r =  KafkaUtils.createRDD[Array[Byte],Array[Byte],DefaultDecoder,DefaultDecoder](context,kafkaParams,range)
>> Then I am trying to cache that rdd with
>>  r.cache()
>> and then save this rdd to several hdfs locations.
>> But it seems that KafkaRDD is fetching data from kafka broker every time I call saveAsNewAPIHadoopFile.
>>
>> How can I cache data from Kafka in memory?
>>
>> P.S. When I do repartition add it seems to work properly( read kafka only once) but spark store shuffled data localy.
>> Is it possible to keep data in memory?
>>
>> ----------------------------------------
>> View this message in context: [KafkaRDD]: rdd.cache() does not seem to work
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: [KafkaRDD]: rdd.cache() does not seem to work

Posted by charles li <ch...@gmail.com>.
cache is the default storage level of persist, and it is lazy [ not cached
indeed ] until the first time it is computed.

​

On Tue, Jan 12, 2016 at 5:13 AM, ponkin <al...@ya.ru> wrote:

> Hi,
>
> Here is my use case :
> I have kafka topic. The job is fairly simple - it reads topic and save
> data to several hdfs paths.
> I create rdd with the following code
>  val r =
>  KafkaUtils.createRDD[Array[Byte],Array[Byte],DefaultDecoder,DefaultDecoder](context,kafkaParams,range)
>
> Then I am trying to cache that rdd with
>  r.cache()
> and then save this rdd to several hdfs locations.
> But it seems that KafkaRDD is fetching data from kafka broker every time I
> call saveAsNewAPIHadoopFile.
>
> How can I cache data from Kafka in memory?
>
> P.S. When I do repartition add it seems to work properly( read kafka only
> once) but spark store shuffled data localy.
> Is it possible to keep data in memory?
>
> ------------------------------
> View this message in context: [KafkaRDD]: rdd.cache() does not seem to
> work
> <http://apache-spark-user-list.1001560.n3.nabble.com/KafkaRDD-rdd-cache-does-not-seem-to-work-tp25936.html>
> Sent from the Apache Spark User List mailing list archive
> <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com.
>