You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/09/20 05:19:55 UTC
spark git commit: [SPARK-17513][SQL] Make StreamExecution
garbage-collect its metadata
Repository: spark
Updated Branches:
refs/heads/master 26145a5af -> be9d57fc9
[SPARK-17513][SQL] Make StreamExecution garbage-collect its metadata
## What changes were proposed in this pull request?
This PR modifies StreamExecution such that it discards metadata for batches that have already been fully processed. I used the purge method that was added as part of SPARK-17235.
This is based on work by frreiss in #15067, but fixed the test case along with some typos.
## How was this patch tested?
A new test case in StreamingQuerySuite. The test case would fail without the changes in this pull request.
Author: petermaxlee <pe...@gmail.com>
Author: frreiss <fr...@us.ibm.com>
Closes #15126 from petermaxlee/SPARK-17513.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/be9d57fc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/be9d57fc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/be9d57fc
Branch: refs/heads/master
Commit: be9d57fc9d8b10e4234c01c06ed43fd7dd12c07b
Parents: 26145a5
Author: petermaxlee <pe...@gmail.com>
Authored: Mon Sep 19 22:19:51 2016 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Mon Sep 19 22:19:51 2016 -0700
----------------------------------------------------------------------
.../sql/execution/streaming/MetadataLog.scala | 1 +
.../execution/streaming/StreamExecution.scala | 7 ++++++
.../sql/streaming/StreamingQuerySuite.scala | 24 ++++++++++++++++++++
3 files changed, 32 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/be9d57fc/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala
index 78d6be1..9e2604c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala
@@ -24,6 +24,7 @@ package org.apache.spark.sql.execution.streaming
* - Allow the user to query the latest batch id.
* - Allow the user to query the metadata object of a specified batch id.
* - Allow the user to query metadata objects in a range of batch ids.
+ * - Allow the user to remove obsolete metadata
*/
trait MetadataLog[T] {
http://git-wip-us.apache.org/repos/asf/spark/blob/be9d57fc/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index a1aae61..220f77d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -290,6 +290,13 @@ class StreamExecution(
assert(offsetLog.add(currentBatchId, availableOffsets.toCompositeOffset(sources)),
s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId")
logInfo(s"Committed offsets for batch $currentBatchId.")
+
+ // Now that we have logged the new batch, no further processing will happen for
+ // the previous batch, and it is safe to discard the old metadata.
+ // Note that purge is exclusive, i.e. it purges everything before currentBatchId.
+ // NOTE: If StreamExecution implements pipeline parallelism (multiple batches in
+ // flight at the same time), this cleanup logic will need to change.
+ offsetLog.purge(currentBatchId)
} else {
awaitBatchLock.lock()
try {
http://git-wip-us.apache.org/repos/asf/spark/blob/be9d57fc/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 9d58315..d3e2cab 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -125,6 +125,30 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter {
)
}
+ testQuietly("StreamExecution metadata garbage collection") {
+ val inputData = MemoryStream[Int]
+ val mapped = inputData.toDS().map(6 / _)
+
+ // Run 3 batches, and then assert that only 1 metadata file is left at the end
+ // since the first 2 should have been purged.
+ testStream(mapped)(
+ AddData(inputData, 1, 2),
+ CheckAnswer(6, 3),
+ AddData(inputData, 1, 2),
+ CheckAnswer(6, 3, 6, 3),
+ AddData(inputData, 4, 6),
+ CheckAnswer(6, 3, 6, 3, 1, 1),
+
+ AssertOnQuery("metadata log should contain only one file") { q =>
+ val metadataLogDir = new java.io.File(q.offsetLog.metadataPath.toString)
+ val logFileNames = metadataLogDir.listFiles().toSeq.map(_.getName())
+ val toTest = logFileNames // Workaround for SPARK-17475
+ assert(toTest.size == 1 && toTest.head == "2")
+ true
+ }
+ )
+ }
+
/**
* A [[StreamAction]] to test the behavior of `StreamingQuery.awaitTermination()`.
*
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org