You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Shixiong Zhu (JIRA)" <ji...@apache.org> on 2015/12/23 02:01:57 UTC

[jira] [Updated] (SPARK-11749) Duplicate creating the RDD in file stream when recovering from checkpoint data

     [ https://issues.apache.org/jira/browse/SPARK-11749?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Shixiong Zhu updated SPARK-11749:
---------------------------------
    Affects Version/s:     (was: 1.5.0)
                       1.6.0

> Duplicate creating the RDD in file stream when recovering from checkpoint data
> ------------------------------------------------------------------------------
>
>                 Key: SPARK-11749
>                 URL: https://issues.apache.org/jira/browse/SPARK-11749
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.5.2, 1.6.0
>            Reporter: Jack Hu
>            Assignee: Jack Hu
>             Fix For: 1.6.0
>
>
> I have a case to monitor a HDFS folder, then enrich the incoming data from the HDFS folder via different table (about 15 reference tables) and send to different hive table after some operations. 
> The code is as this:
> {code}
> val txt = ssc.textFileStream(folder).map(toKeyValuePair).reduceByKey(removeDuplicates)
> val refTable1 = ssc.textFileStream(refSource1).map(parse(_)).updateStateByKey(...)
> txt.join(refTable1).map(..).reduceByKey(...).foreachRDD(
>   rdd => {
>      // insert into hive table
>   }
> )
> val refTable2 = ssc.textFileStream(refSource2).map(parse(_)).updateStateByKey(...)
> txt.join(refTable2).map(..).reduceByKey(...).foreachRDD(
>   rdd => {
>      // insert into hive table
>   }
> )
> /// more refTables in following code
> {code}
>  
> The {{batchInterval}} of this application is set to *30 seconds*, the checkpoint interval is set to *10 minutes*, every batch in {{txt}} has *60 files*
> After recovered from checkpoint data, I can see lots of log to create the RDD in file stream: rdd in each batch of file stream was been recreated *15 times*, and it takes about *5 minutes* to create so much file RDD. During this period, *10K+ broadcast* had been created and almost used all the block manager space. 
> After some investigation, we found that the {{DStream.restoreCheckpointData}} would be invoked at each output ({{DStream.foreachRDD}} in this case), and no flag to indicate that this {{DStream}} had been restored, so the RDD in file stream was been recreated. 
> Suggest to add on flag to control the restore process to avoid the duplicated work.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org