You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2016/12/12 07:38:40 UTC
spark git commit: [SPARK-18790][SS] Keep a general offset history of
stream batches
Repository: spark
Updated Branches:
refs/heads/master c802ad871 -> 83a42897a
[SPARK-18790][SS] Keep a general offset history of stream batches
## What changes were proposed in this pull request?
Instead of only keeping the minimum number of offsets around, we should keep enough information to allow us to roll back n batches and reexecute the stream starting from a given point. In particular, we should create a config in SQLConf, spark.sql.streaming.retainedBatches that defaults to 100 and ensure that we keep enough log files in the following places to roll back the specified number of batches:
the offsets that are present in each batch
versions of the state store
the files lists stored for the FileStreamSource
the metadata log stored by the FileStreamSink
marmbrus zsxwing
## How was this patch tested?
The following tests were added.
### StreamExecution offset metadata
Test added to StreamingQuerySuite that ensures offset metadata is garbage collected according to minBatchesRetain
### CompactibleFileStreamLog
Tests added in CompactibleFileStreamLogSuite to ensure that logs are purged starting before the first compaction file that proceeds the current batch id - minBatchesToRetain.
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: Tyson Condie <tc...@gmail.com>
Closes #16219 from tcondie/offset_hist.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/83a42897
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/83a42897
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/83a42897
Branch: refs/heads/master
Commit: 83a42897ae90d84a54373db386a985e3e2d5903a
Parents: c802ad8
Author: Tyson Condie <tc...@gmail.com>
Authored: Sun Dec 11 23:38:31 2016 -0800
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Sun Dec 11 23:38:31 2016 -0800
----------------------------------------------------------------------
.../streaming/CompactibleFileStreamLog.scala | 69 +++++++++++++-------
.../execution/streaming/StreamExecution.scala | 10 ++-
.../state/HDFSBackedStateStoreProvider.scala | 1 -
.../streaming/state/StateStoreConf.scala | 4 +-
.../org/apache/spark/sql/internal/SQLConf.scala | 17 +++--
.../CompactibleFileStreamLogSuite.scala | 16 ++++-
.../streaming/FileStreamSinkLogSuite.scala | 48 ++++++++++++--
.../streaming/state/StateStoreSuite.scala | 5 +-
.../sql/streaming/StreamingQuerySuite.scala | 64 +++++++++++++-----
9 files changed, 170 insertions(+), 64 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/83a42897/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
index 8529cea..5a6f9e8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
@@ -52,6 +52,8 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag](
/** Needed to serialize type T into JSON when using Jackson */
private implicit val manifest = Manifest.classType[T](implicitly[ClassTag[T]].runtimeClass)
+ protected val minBatchesToRetain = sparkSession.sessionState.conf.minBatchesToRetain
+
/**
* If we delete the old files after compaction at once, there is a race condition in S3: other
* processes may see the old files are deleted but still cannot see the compaction file using
@@ -152,11 +154,16 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag](
}
override def add(batchId: Long, logs: Array[T]): Boolean = {
- if (isCompactionBatch(batchId, compactInterval)) {
- compact(batchId, logs)
- } else {
- super.add(batchId, logs)
+ val batchAdded =
+ if (isCompactionBatch(batchId, compactInterval)) {
+ compact(batchId, logs)
+ } else {
+ super.add(batchId, logs)
+ }
+ if (batchAdded && isDeletingExpiredLog) {
+ deleteExpiredLog(batchId)
}
+ batchAdded
}
/**
@@ -167,9 +174,6 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag](
val validBatches = getValidBatchesBeforeCompactionBatch(batchId, compactInterval)
val allLogs = validBatches.flatMap(batchId => super.get(batchId)).flatten ++ logs
if (super.add(batchId, compactLogs(allLogs).toArray)) {
- if (isDeletingExpiredLog) {
- deleteExpiredLog(batchId)
- }
true
} else {
// Return false as there is another writer.
@@ -210,26 +214,41 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag](
}
/**
- * Since all logs before `compactionBatchId` are compacted and written into the
- * `compactionBatchId` log file, they can be removed. However, due to the eventual consistency of
- * S3, the compaction file may not be seen by other processes at once. So we only delete files
- * created `fileCleanupDelayMs` milliseconds ago.
+ * Delete expired log entries that proceed the currentBatchId and retain
+ * sufficient minimum number of batches (given by minBatchsToRetain). This
+ * equates to retaining the earliest compaction log that proceeds
+ * batch id position currentBatchId + 1 - minBatchesToRetain. All log entries
+ * prior to the earliest compaction log proceeding that position will be removed.
+ * However, due to the eventual consistency of S3, the compaction file may not
+ * be seen by other processes at once. So we only delete files created
+ * `fileCleanupDelayMs` milliseconds ago.
*/
- private def deleteExpiredLog(compactionBatchId: Long): Unit = {
- val expiredTime = System.currentTimeMillis() - fileCleanupDelayMs
- fileManager.list(metadataPath, new PathFilter {
- override def accept(path: Path): Boolean = {
- try {
- val batchId = getBatchIdFromFileName(path.getName)
- batchId < compactionBatchId
- } catch {
- case _: NumberFormatException =>
- false
+ private def deleteExpiredLog(currentBatchId: Long): Unit = {
+ if (compactInterval <= currentBatchId + 1 - minBatchesToRetain) {
+ // Find the first compaction batch id that maintains minBatchesToRetain
+ val minBatchId = currentBatchId + 1 - minBatchesToRetain
+ val minCompactionBatchId = minBatchId - (minBatchId % compactInterval) - 1
+ assert(isCompactionBatch(minCompactionBatchId, compactInterval),
+ s"$minCompactionBatchId is not a compaction batch")
+
+ logInfo(s"Current compact batch id = $currentBatchId " +
+ s"min compaction batch id to delete = $minCompactionBatchId")
+
+ val expiredTime = System.currentTimeMillis() - fileCleanupDelayMs
+ fileManager.list(metadataPath, new PathFilter {
+ override def accept(path: Path): Boolean = {
+ try {
+ val batchId = getBatchIdFromFileName(path.getName)
+ batchId < minCompactionBatchId
+ } catch {
+ case _: NumberFormatException =>
+ false
+ }
+ }
+ }).foreach { f =>
+ if (f.getModificationTime <= expiredTime) {
+ fileManager.delete(f.getPath)
}
- }
- }).foreach { f =>
- if (f.getModificationTime <= expiredTime) {
- fileManager.delete(f.getPath)
}
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/83a42897/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index b52810d..48eee42 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -58,6 +58,9 @@ class StreamExecution(
private val pollingDelayMs = sparkSession.sessionState.conf.streamingPollingDelay
+ private val minBatchesToRetain = sparkSession.sessionState.conf.minBatchesToRetain
+ require(minBatchesToRetain > 0, "minBatchesToRetain has to be positive")
+
/**
* A lock used to wait/notify when batches complete. Use a fair lock to avoid thread starvation.
*/
@@ -400,10 +403,11 @@ class StreamExecution(
}
}
- // Now that we have logged the new batch, no further processing will happen for
- // the batch before the previous batch, and it is safe to discard the old metadata.
+ // It is now safe to discard the metadata beyond the minimum number to retain.
// Note that purge is exclusive, i.e. it purges everything before the target ID.
- offsetLog.purge(currentBatchId - 1)
+ if (minBatchesToRetain < currentBatchId) {
+ offsetLog.purge(currentBatchId - minBatchesToRetain)
+ }
}
} else {
awaitBatchLock.lock()
http://git-wip-us.apache.org/repos/asf/spark/blob/83a42897/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
index 493fdaa..4f3f818 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
@@ -303,7 +303,6 @@ private[state] class HDFSBackedStateStoreProvider(
val mapFromFile = readSnapshotFile(version).getOrElse {
val prevMap = loadMap(version - 1)
val newMap = new MapType(prevMap)
- newMap.putAll(prevMap)
updateFromDeltaFile(version, newMap)
newMap
}
http://git-wip-us.apache.org/repos/asf/spark/blob/83a42897/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
index de72f1c..acfaa8e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
@@ -26,9 +26,11 @@ private[streaming] class StateStoreConf(@transient private val conf: SQLConf) ex
val minDeltasForSnapshot = conf.stateStoreMinDeltasForSnapshot
- val minVersionsToRetain = conf.stateStoreMinVersionsToRetain
+ val minVersionsToRetain = conf.minBatchesToRetain
}
private[streaming] object StateStoreConf {
val empty = new StateStoreConf()
+
+ def apply(conf: SQLConf): StateStoreConf = new StateStoreConf(conf)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/83a42897/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 64c373f..4d25f54 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -480,18 +480,17 @@ object SQLConf {
.intConf
.createWithDefault(10)
- val STATE_STORE_MIN_VERSIONS_TO_RETAIN =
- SQLConfigBuilder("spark.sql.streaming.stateStore.minBatchesToRetain")
- .internal()
- .doc("Minimum number of versions of a state store's data to retain after cleaning.")
- .intConf
- .createWithDefault(2)
-
val CHECKPOINT_LOCATION = SQLConfigBuilder("spark.sql.streaming.checkpointLocation")
.doc("The default location for storing checkpoint data for streaming queries.")
.stringConf
.createOptional
+ val MIN_BATCHES_TO_RETAIN = SQLConfigBuilder("spark.sql.streaming.minBatchesToRetain")
+ .internal()
+ .doc("The minimum number of batches that must be retained and made recoverable.")
+ .intConf
+ .createWithDefault(100)
+
val UNSUPPORTED_OPERATION_CHECK_ENABLED =
SQLConfigBuilder("spark.sql.streaming.unsupportedOperationCheck")
.internal()
@@ -668,8 +667,6 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
def stateStoreMinDeltasForSnapshot: Int = getConf(STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT)
- def stateStoreMinVersionsToRetain: Int = getConf(STATE_STORE_MIN_VERSIONS_TO_RETAIN)
-
def checkpointLocation: Option[String] = getConf(CHECKPOINT_LOCATION)
def isUnsupportedOperationCheckEnabled: Boolean = getConf(UNSUPPORTED_OPERATION_CHECK_ENABLED)
@@ -723,6 +720,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
def minNumPostShufflePartitions: Int =
getConf(SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS)
+ def minBatchesToRetain: Int = getConf(MIN_BATCHES_TO_RETAIN)
+
def parquetFilterPushDown: Boolean = getConf(PARQUET_FILTER_PUSHDOWN_ENABLED)
def orcFilterPushDown: Boolean = getConf(ORC_FILTER_PUSHDOWN_ENABLED)
http://git-wip-us.apache.org/repos/asf/spark/blob/83a42897/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala
index e511fda..435d874 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala
@@ -104,6 +104,7 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext
withFakeCompactibleFileStreamLog(
fileCleanupDelayMs = Long.MaxValue,
defaultCompactInterval = 3,
+ defaultMinBatchesToRetain = 1,
compactibleLog => {
assert("0" === compactibleLog.batchIdToPath(0).getName)
assert("1" === compactibleLog.batchIdToPath(1).getName)
@@ -118,6 +119,7 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext
withFakeCompactibleFileStreamLog(
fileCleanupDelayMs = Long.MaxValue,
defaultCompactInterval = 3,
+ defaultMinBatchesToRetain = 1,
compactibleLog => {
val logs = Array("entry_1", "entry_2", "entry_3")
val expected = s"""${FakeCompactibleFileStreamLog.VERSION}
@@ -138,6 +140,7 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext
withFakeCompactibleFileStreamLog(
fileCleanupDelayMs = Long.MaxValue,
defaultCompactInterval = 3,
+ defaultMinBatchesToRetain = 1,
compactibleLog => {
val logs = s"""${FakeCompactibleFileStreamLog.VERSION}
|"entry_1"
@@ -157,6 +160,7 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext
withFakeCompactibleFileStreamLog(
fileCleanupDelayMs = Long.MaxValue,
defaultCompactInterval = 3,
+ defaultMinBatchesToRetain = 1,
compactibleLog => {
for (batchId <- 0 to 10) {
compactibleLog.add(batchId, Array("some_path_" + batchId))
@@ -175,6 +179,7 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext
withFakeCompactibleFileStreamLog(
fileCleanupDelayMs = 0,
defaultCompactInterval = 3,
+ defaultMinBatchesToRetain = 1,
compactibleLog => {
val fs = compactibleLog.metadataPath.getFileSystem(spark.sessionState.newHadoopConf())
@@ -194,25 +199,29 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext
compactibleLog.add(1, Array("some_path_1"))
assert(Set("0", "1") === listBatchFiles())
compactibleLog.add(2, Array("some_path_2"))
- assert(Set("2.compact") === listBatchFiles())
+ assert(Set("0", "1", "2.compact") === listBatchFiles())
compactibleLog.add(3, Array("some_path_3"))
assert(Set("2.compact", "3") === listBatchFiles())
compactibleLog.add(4, Array("some_path_4"))
assert(Set("2.compact", "3", "4") === listBatchFiles())
compactibleLog.add(5, Array("some_path_5"))
- assert(Set("5.compact") === listBatchFiles())
+ assert(Set("2.compact", "3", "4", "5.compact") === listBatchFiles())
+ compactibleLog.add(6, Array("some_path_6"))
+ assert(Set("5.compact", "6") === listBatchFiles())
})
}
private def withFakeCompactibleFileStreamLog(
fileCleanupDelayMs: Long,
defaultCompactInterval: Int,
+ defaultMinBatchesToRetain: Int,
f: FakeCompactibleFileStreamLog => Unit
): Unit = {
withTempDir { file =>
val compactibleLog = new FakeCompactibleFileStreamLog(
fileCleanupDelayMs,
defaultCompactInterval,
+ defaultMinBatchesToRetain,
spark,
file.getCanonicalPath)
f(compactibleLog)
@@ -227,6 +236,7 @@ object FakeCompactibleFileStreamLog {
class FakeCompactibleFileStreamLog(
_fileCleanupDelayMs: Long,
_defaultCompactInterval: Int,
+ _defaultMinBatchesToRetain: Int,
sparkSession: SparkSession,
path: String)
extends CompactibleFileStreamLog[String](
@@ -241,5 +251,7 @@ class FakeCompactibleFileStreamLog(
override protected def defaultCompactInterval: Int = _defaultCompactInterval
+ override protected val minBatchesToRetain: Int = _defaultMinBatchesToRetain
+
override def compactLogs(logs: Seq[String]): Seq[String] = logs
}
http://git-wip-us.apache.org/repos/asf/spark/blob/83a42897/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
index 8a21b76..7e0de5e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
@@ -151,10 +151,11 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
testWithUninterruptibleThread("delete expired file") {
// Set FILE_SINK_LOG_CLEANUP_DELAY to 0 so that we can detect the deleting behaviour
- // deterministically
+ // deterministically and one min batches to retain
withSQLConf(
SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3",
- SQLConf.FILE_SINK_LOG_CLEANUP_DELAY.key -> "0") {
+ SQLConf.FILE_SINK_LOG_CLEANUP_DELAY.key -> "0",
+ SQLConf.MIN_BATCHES_TO_RETAIN.key -> "1") {
withFileStreamSinkLog { sinkLog =>
val fs = sinkLog.metadataPath.getFileSystem(spark.sessionState.newHadoopConf())
@@ -174,13 +175,52 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
sinkLog.add(1, Array(newFakeSinkFileStatus("/a/b/1", FileStreamSinkLog.ADD_ACTION)))
assert(Set("0", "1") === listBatchFiles())
sinkLog.add(2, Array(newFakeSinkFileStatus("/a/b/2", FileStreamSinkLog.ADD_ACTION)))
- assert(Set("2.compact") === listBatchFiles())
+ assert(Set("0", "1", "2.compact") === listBatchFiles())
sinkLog.add(3, Array(newFakeSinkFileStatus("/a/b/3", FileStreamSinkLog.ADD_ACTION)))
assert(Set("2.compact", "3") === listBatchFiles())
sinkLog.add(4, Array(newFakeSinkFileStatus("/a/b/4", FileStreamSinkLog.ADD_ACTION)))
assert(Set("2.compact", "3", "4") === listBatchFiles())
sinkLog.add(5, Array(newFakeSinkFileStatus("/a/b/5", FileStreamSinkLog.ADD_ACTION)))
- assert(Set("5.compact") === listBatchFiles())
+ assert(Set("2.compact", "3", "4", "5.compact") === listBatchFiles())
+ sinkLog.add(6, Array(newFakeSinkFileStatus("/a/b/6", FileStreamSinkLog.ADD_ACTION)))
+ assert(Set("5.compact", "6") === listBatchFiles())
+ }
+ }
+
+ withSQLConf(
+ SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3",
+ SQLConf.FILE_SINK_LOG_CLEANUP_DELAY.key -> "0",
+ SQLConf.MIN_BATCHES_TO_RETAIN.key -> "2") {
+ withFileStreamSinkLog { sinkLog =>
+ val fs = sinkLog.metadataPath.getFileSystem(spark.sessionState.newHadoopConf())
+
+ def listBatchFiles(): Set[String] = {
+ fs.listStatus(sinkLog.metadataPath).map(_.getPath.getName).filter { fileName =>
+ try {
+ getBatchIdFromFileName(fileName)
+ true
+ } catch {
+ case _: NumberFormatException => false
+ }
+ }.toSet
+ }
+
+ sinkLog.add(0, Array(newFakeSinkFileStatus("/a/b/0", FileStreamSinkLog.ADD_ACTION)))
+ assert(Set("0") === listBatchFiles())
+ sinkLog.add(1, Array(newFakeSinkFileStatus("/a/b/1", FileStreamSinkLog.ADD_ACTION)))
+ assert(Set("0", "1") === listBatchFiles())
+ sinkLog.add(2, Array(newFakeSinkFileStatus("/a/b/2", FileStreamSinkLog.ADD_ACTION)))
+ assert(Set("0", "1", "2.compact") === listBatchFiles())
+ sinkLog.add(3, Array(newFakeSinkFileStatus("/a/b/3", FileStreamSinkLog.ADD_ACTION)))
+ assert(Set("0", "1", "2.compact", "3") === listBatchFiles())
+ sinkLog.add(4, Array(newFakeSinkFileStatus("/a/b/4", FileStreamSinkLog.ADD_ACTION)))
+ assert(Set("2.compact", "3", "4") === listBatchFiles())
+ sinkLog.add(5, Array(newFakeSinkFileStatus("/a/b/5", FileStreamSinkLog.ADD_ACTION)))
+ assert(Set("2.compact", "3", "4", "5.compact") === listBatchFiles())
+ sinkLog.add(6, Array(newFakeSinkFileStatus("/a/b/6", FileStreamSinkLog.ADD_ACTION)))
+ assert(Set("2.compact", "3", "4", "5.compact", "6") === listBatchFiles())
+ sinkLog.add(7, Array(newFakeSinkFileStatus("/a/b/7", FileStreamSinkLog.ADD_ACTION)))
+ assert(Set("5.compact", "6", "7") === listBatchFiles())
}
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/83a42897/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
index 05fc734..3404b11 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
@@ -376,7 +376,9 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth
val opId = 0
val dir = Utils.createDirectory(tempDir, Random.nextString(5)).toString
val storeId = StateStoreId(dir, opId, 0)
- val storeConf = StateStoreConf.empty
+ val sqlConf = new SQLConf()
+ sqlConf.setConf(SQLConf.MIN_BATCHES_TO_RETAIN, 2)
+ val storeConf = StateStoreConf(sqlConf)
val hadoopConf = new Configuration()
val provider = new HDFSBackedStateStoreProvider(
storeId, keySchema, valueSchema, storeConf, hadoopConf)
@@ -606,6 +608,7 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth
): HDFSBackedStateStoreProvider = {
val sqlConf = new SQLConf()
sqlConf.setConf(SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT, minDeltasForSnapshot)
+ sqlConf.setConf(SQLConf.MIN_BATCHES_TO_RETAIN, 2)
new HDFSBackedStateStoreProvider(
StateStoreId(dir, opId, partition),
keySchema,
http://git-wip-us.apache.org/repos/asf/spark/blob/83a42897/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 7be2f21..c66d6b1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.types.StructType
import org.apache.spark.SparkException
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.functions._
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.ManualClock
@@ -369,25 +370,52 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
testQuietly("StreamExecution metadata garbage collection") {
val inputData = MemoryStream[Int]
val mapped = inputData.toDS().map(6 / _)
+ withSQLConf(SQLConf.MIN_BATCHES_TO_RETAIN.key -> "1") {
+ // Run 3 batches, and then assert that only 2 metadata files is are at the end
+ // since the first should have been purged.
+ testStream(mapped)(
+ AddData(inputData, 1, 2),
+ CheckAnswer(6, 3),
+ AddData(inputData, 1, 2),
+ CheckAnswer(6, 3, 6, 3),
+ AddData(inputData, 4, 6),
+ CheckAnswer(6, 3, 6, 3, 1, 1),
+
+ AssertOnQuery("metadata log should contain only two files") { q =>
+ val metadataLogDir = new java.io.File(q.offsetLog.metadataPath.toString)
+ val logFileNames = metadataLogDir.listFiles().toSeq.map(_.getName())
+ val toTest = logFileNames.filter(!_.endsWith(".crc")).sorted // Workaround for SPARK-17475
+ assert(toTest.size == 2 && toTest.head == "1")
+ true
+ }
+ )
+ }
- // Run 3 batches, and then assert that only 2 metadata files is are at the end
- // since the first should have been purged.
- testStream(mapped)(
- AddData(inputData, 1, 2),
- CheckAnswer(6, 3),
- AddData(inputData, 1, 2),
- CheckAnswer(6, 3, 6, 3),
- AddData(inputData, 4, 6),
- CheckAnswer(6, 3, 6, 3, 1, 1),
-
- AssertOnQuery("metadata log should contain only two files") { q =>
- val metadataLogDir = new java.io.File(q.offsetLog.metadataPath.toString)
- val logFileNames = metadataLogDir.listFiles().toSeq.map(_.getName())
- val toTest = logFileNames.filter(! _.endsWith(".crc")).sorted // Workaround for SPARK-17475
- assert(toTest.size == 2 && toTest.head == "1")
- true
- }
- )
+ val inputData2 = MemoryStream[Int]
+ withSQLConf(SQLConf.MIN_BATCHES_TO_RETAIN.key -> "2") {
+ // Run 5 batches, and then assert that 3 metadata files is are at the end
+ // since the two should have been purged.
+ testStream(inputData2.toDS())(
+ AddData(inputData2, 1, 2),
+ CheckAnswer(1, 2),
+ AddData(inputData2, 1, 2),
+ CheckAnswer(1, 2, 1, 2),
+ AddData(inputData2, 3, 4),
+ CheckAnswer(1, 2, 1, 2, 3, 4),
+ AddData(inputData2, 5, 6),
+ CheckAnswer(1, 2, 1, 2, 3, 4, 5, 6),
+ AddData(inputData2, 7, 8),
+ CheckAnswer(1, 2, 1, 2, 3, 4, 5, 6, 7, 8),
+
+ AssertOnQuery("metadata log should contain three files") { q =>
+ val metadataLogDir = new java.io.File(q.offsetLog.metadataPath.toString)
+ val logFileNames = metadataLogDir.listFiles().toSeq.map(_.getName())
+ val toTest = logFileNames.filter(!_.endsWith(".crc")).sorted // Workaround for SPARK-17475
+ assert(toTest.size == 3 && toTest.head == "2")
+ true
+ }
+ )
+ }
}
/** Create a streaming DF that only execute one batch in which it returns the given static DF */
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org