You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2017/07/29 12:27:43 UTC
spark git commit: [SPARK-21357][DSTREAMS] FileInputDStream not remove
out of date RDD
Repository: spark
Updated Branches:
refs/heads/master c14382030 -> 60e9b2bdd
[SPARK-21357][DSTREAMS] FileInputDStream not remove out of date RDD
## What changes were proposed in this pull request?
```DStreams
class FileInputDStream
[line 162] protected[streaming] override def clearMetadata(time: Time) {
batchTimeToSelectedFiles.synchronized {
val oldFiles = batchTimeToSelectedFiles.filter(_._1 < (time - rememberDuration))
batchTimeToSelectedFiles --= oldFiles.keys
```
The above code does not remove the old generatedRDDs. "super.clearMetadata(time)" was added to the beginning of clearMetadata to remove the old generatedRDDs.
## How was this patch tested?
At the end of clearMetadata, the testing code (print the number of generatedRDDs) was added to check the old RDDS were removed manually.
Author: shaofei007 <14...@qq.com>
Author: Fei Shao <14...@qq.com>
Closes #18718 from shaofei007/master.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/60e9b2bd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/60e9b2bd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/60e9b2bd
Branch: refs/heads/master
Commit: 60e9b2bdd55854a6be077b17aa032c25bfb031bf
Parents: c143820
Author: shaofei007 <14...@qq.com>
Authored: Sat Jul 29 13:27:39 2017 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Sat Jul 29 13:27:39 2017 +0100
----------------------------------------------------------------------
.../scala/org/apache/spark/streaming/dstream/FileInputDStream.scala | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/60e9b2bd/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index 905b1c5..b8a5a96 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -164,6 +164,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
/** Clear the old time-to-files mappings along with old RDDs */
protected[streaming] override def clearMetadata(time: Time) {
+ super.clearMetadata(time)
batchTimeToSelectedFiles.synchronized {
val oldFiles = batchTimeToSelectedFiles.filter(_._1 < (time - rememberDuration))
batchTimeToSelectedFiles --= oldFiles.keys
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org