You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Shixiong Zhu (JIRA)" <ji...@apache.org> on 2015/12/23 02:01:57 UTC
[jira] [Updated] (SPARK-11749) Duplicate creating the RDD in file
stream when recovering from checkpoint data
[ https://issues.apache.org/jira/browse/SPARK-11749?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Shixiong Zhu updated SPARK-11749:
---------------------------------
Affects Version/s: (was: 1.5.0)
1.6.0
> Duplicate creating the RDD in file stream when recovering from checkpoint data
> ------------------------------------------------------------------------------
>
> Key: SPARK-11749
> URL: https://issues.apache.org/jira/browse/SPARK-11749
> Project: Spark
> Issue Type: Bug
> Components: Streaming
> Affects Versions: 1.5.2, 1.6.0
> Reporter: Jack Hu
> Assignee: Jack Hu
> Fix For: 1.6.0
>
>
> I have a case to monitor a HDFS folder, then enrich the incoming data from the HDFS folder via different table (about 15 reference tables) and send to different hive table after some operations.
> The code is as this:
> {code}
> val txt = ssc.textFileStream(folder).map(toKeyValuePair).reduceByKey(removeDuplicates)
> val refTable1 = ssc.textFileStream(refSource1).map(parse(_)).updateStateByKey(...)
> txt.join(refTable1).map(..).reduceByKey(...).foreachRDD(
> rdd => {
> // insert into hive table
> }
> )
> val refTable2 = ssc.textFileStream(refSource2).map(parse(_)).updateStateByKey(...)
> txt.join(refTable2).map(..).reduceByKey(...).foreachRDD(
> rdd => {
> // insert into hive table
> }
> )
> /// more refTables in following code
> {code}
>
> The {{batchInterval}} of this application is set to *30 seconds*, the checkpoint interval is set to *10 minutes*, every batch in {{txt}} has *60 files*
> After recovered from checkpoint data, I can see lots of log to create the RDD in file stream: rdd in each batch of file stream was been recreated *15 times*, and it takes about *5 minutes* to create so much file RDD. During this period, *10K+ broadcast* had been created and almost used all the block manager space.
> After some investigation, we found that the {{DStream.restoreCheckpointData}} would be invoked at each output ({{DStream.foreachRDD}} in this case), and no flag to indicate that this {{DStream}} had been restored, so the RDD in file stream was been recreated.
> Suggest to add on flag to control the restore process to avoid the duplicated work.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org