You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2016/12/12 07:38:40 UTC

spark git commit: [SPARK-18790][SS] Keep a general offset history of stream batches

Repository: spark
Updated Branches:
  refs/heads/master c802ad871 -> 83a42897a


[SPARK-18790][SS] Keep a general offset history of stream batches

## What changes were proposed in this pull request?

Instead of only keeping the minimum number of offsets around, we should keep enough information to allow us to roll back n batches and reexecute the stream starting from a given point. In particular, we should create a config in SQLConf, spark.sql.streaming.retainedBatches that defaults to 100 and ensure that we keep enough log files in the following places to roll back the specified number of batches:
the offsets that are present in each batch
versions of the state store
the files lists stored for the FileStreamSource
the metadata log stored by the FileStreamSink

marmbrus zsxwing

## How was this patch tested?

The following tests were added.

### StreamExecution offset metadata
Test added to StreamingQuerySuite that ensures offset metadata is garbage collected according to minBatchesRetain

### CompactibleFileStreamLog
Tests added in CompactibleFileStreamLogSuite to ensure that logs are purged starting before the first compaction file that proceeds the current batch id - minBatchesToRetain.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Tyson Condie <tc...@gmail.com>

Closes #16219 from tcondie/offset_hist.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/83a42897
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/83a42897
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/83a42897

Branch: refs/heads/master
Commit: 83a42897ae90d84a54373db386a985e3e2d5903a
Parents: c802ad8
Author: Tyson Condie <tc...@gmail.com>
Authored: Sun Dec 11 23:38:31 2016 -0800
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Sun Dec 11 23:38:31 2016 -0800

----------------------------------------------------------------------
 .../streaming/CompactibleFileStreamLog.scala    | 69 +++++++++++++-------
 .../execution/streaming/StreamExecution.scala   | 10 ++-
 .../state/HDFSBackedStateStoreProvider.scala    |  1 -
 .../streaming/state/StateStoreConf.scala        |  4 +-
 .../org/apache/spark/sql/internal/SQLConf.scala | 17 +++--
 .../CompactibleFileStreamLogSuite.scala         | 16 ++++-
 .../streaming/FileStreamSinkLogSuite.scala      | 48 ++++++++++++--
 .../streaming/state/StateStoreSuite.scala       |  5 +-
 .../sql/streaming/StreamingQuerySuite.scala     | 64 +++++++++++++-----
 9 files changed, 170 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/83a42897/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
index 8529cea..5a6f9e8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
@@ -52,6 +52,8 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag](
   /** Needed to serialize type T into JSON when using Jackson */
   private implicit val manifest = Manifest.classType[T](implicitly[ClassTag[T]].runtimeClass)
 
+  protected val minBatchesToRetain = sparkSession.sessionState.conf.minBatchesToRetain
+
   /**
    * If we delete the old files after compaction at once, there is a race condition in S3: other
    * processes may see the old files are deleted but still cannot see the compaction file using
@@ -152,11 +154,16 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag](
   }
 
   override def add(batchId: Long, logs: Array[T]): Boolean = {
-    if (isCompactionBatch(batchId, compactInterval)) {
-      compact(batchId, logs)
-    } else {
-      super.add(batchId, logs)
+    val batchAdded =
+      if (isCompactionBatch(batchId, compactInterval)) {
+        compact(batchId, logs)
+      } else {
+        super.add(batchId, logs)
+      }
+    if (batchAdded && isDeletingExpiredLog) {
+      deleteExpiredLog(batchId)
     }
+    batchAdded
   }
 
   /**
@@ -167,9 +174,6 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag](
     val validBatches = getValidBatchesBeforeCompactionBatch(batchId, compactInterval)
     val allLogs = validBatches.flatMap(batchId => super.get(batchId)).flatten ++ logs
     if (super.add(batchId, compactLogs(allLogs).toArray)) {
-      if (isDeletingExpiredLog) {
-        deleteExpiredLog(batchId)
-      }
       true
     } else {
       // Return false as there is another writer.
@@ -210,26 +214,41 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag](
   }
 
   /**
-   * Since all logs before `compactionBatchId` are compacted and written into the
-   * `compactionBatchId` log file, they can be removed. However, due to the eventual consistency of
-   * S3, the compaction file may not be seen by other processes at once. So we only delete files
-   * created `fileCleanupDelayMs` milliseconds ago.
+   * Delete expired log entries that proceed the currentBatchId and retain
+   * sufficient minimum number of batches (given by minBatchsToRetain). This
+   * equates to retaining the earliest compaction log that proceeds
+   * batch id position currentBatchId + 1 - minBatchesToRetain. All log entries
+   * prior to the earliest compaction log proceeding that position will be removed.
+   * However, due to the eventual consistency of S3, the compaction file may not
+   * be seen by other processes at once. So we only delete files created
+   * `fileCleanupDelayMs` milliseconds ago.
    */
-  private def deleteExpiredLog(compactionBatchId: Long): Unit = {
-    val expiredTime = System.currentTimeMillis() - fileCleanupDelayMs
-    fileManager.list(metadataPath, new PathFilter {
-      override def accept(path: Path): Boolean = {
-        try {
-          val batchId = getBatchIdFromFileName(path.getName)
-          batchId < compactionBatchId
-        } catch {
-          case _: NumberFormatException =>
-            false
+  private def deleteExpiredLog(currentBatchId: Long): Unit = {
+    if (compactInterval <= currentBatchId + 1 - minBatchesToRetain) {
+      // Find the first compaction batch id that maintains minBatchesToRetain
+      val minBatchId = currentBatchId + 1 - minBatchesToRetain
+      val minCompactionBatchId = minBatchId - (minBatchId % compactInterval) - 1
+      assert(isCompactionBatch(minCompactionBatchId, compactInterval),
+        s"$minCompactionBatchId is not a compaction batch")
+
+      logInfo(s"Current compact batch id = $currentBatchId " +
+        s"min compaction batch id to delete = $minCompactionBatchId")
+
+      val expiredTime = System.currentTimeMillis() - fileCleanupDelayMs
+      fileManager.list(metadataPath, new PathFilter {
+        override def accept(path: Path): Boolean = {
+          try {
+            val batchId = getBatchIdFromFileName(path.getName)
+            batchId < minCompactionBatchId
+          } catch {
+            case _: NumberFormatException =>
+              false
+          }
+        }
+      }).foreach { f =>
+        if (f.getModificationTime <= expiredTime) {
+          fileManager.delete(f.getPath)
         }
-      }
-    }).foreach { f =>
-      if (f.getModificationTime <= expiredTime) {
-        fileManager.delete(f.getPath)
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/83a42897/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index b52810d..48eee42 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -58,6 +58,9 @@ class StreamExecution(
 
   private val pollingDelayMs = sparkSession.sessionState.conf.streamingPollingDelay
 
+  private val minBatchesToRetain = sparkSession.sessionState.conf.minBatchesToRetain
+  require(minBatchesToRetain > 0, "minBatchesToRetain has to be positive")
+
   /**
    * A lock used to wait/notify when batches complete. Use a fair lock to avoid thread starvation.
    */
@@ -400,10 +403,11 @@ class StreamExecution(
           }
         }
 
-        // Now that we have logged the new batch, no further processing will happen for
-        // the batch before the previous batch, and it is safe to discard the old metadata.
+        // It is now safe to discard the metadata beyond the minimum number to retain.
         // Note that purge is exclusive, i.e. it purges everything before the target ID.
-        offsetLog.purge(currentBatchId - 1)
+        if (minBatchesToRetain < currentBatchId) {
+          offsetLog.purge(currentBatchId - minBatchesToRetain)
+        }
       }
     } else {
       awaitBatchLock.lock()

http://git-wip-us.apache.org/repos/asf/spark/blob/83a42897/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
index 493fdaa..4f3f818 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
@@ -303,7 +303,6 @@ private[state] class HDFSBackedStateStoreProvider(
       val mapFromFile = readSnapshotFile(version).getOrElse {
         val prevMap = loadMap(version - 1)
         val newMap = new MapType(prevMap)
-        newMap.putAll(prevMap)
         updateFromDeltaFile(version, newMap)
         newMap
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/83a42897/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
index de72f1c..acfaa8e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
@@ -26,9 +26,11 @@ private[streaming] class StateStoreConf(@transient private val conf: SQLConf) ex
 
   val minDeltasForSnapshot = conf.stateStoreMinDeltasForSnapshot
 
-  val minVersionsToRetain = conf.stateStoreMinVersionsToRetain
+  val minVersionsToRetain = conf.minBatchesToRetain
 }
 
 private[streaming] object StateStoreConf {
   val empty = new StateStoreConf()
+
+  def apply(conf: SQLConf): StateStoreConf = new StateStoreConf(conf)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/83a42897/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 64c373f..4d25f54 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -480,18 +480,17 @@ object SQLConf {
       .intConf
       .createWithDefault(10)
 
-  val STATE_STORE_MIN_VERSIONS_TO_RETAIN =
-    SQLConfigBuilder("spark.sql.streaming.stateStore.minBatchesToRetain")
-      .internal()
-      .doc("Minimum number of versions of a state store's data to retain after cleaning.")
-      .intConf
-      .createWithDefault(2)
-
   val CHECKPOINT_LOCATION = SQLConfigBuilder("spark.sql.streaming.checkpointLocation")
     .doc("The default location for storing checkpoint data for streaming queries.")
     .stringConf
     .createOptional
 
+  val MIN_BATCHES_TO_RETAIN = SQLConfigBuilder("spark.sql.streaming.minBatchesToRetain")
+    .internal()
+    .doc("The minimum number of batches that must be retained and made recoverable.")
+    .intConf
+    .createWithDefault(100)
+
   val UNSUPPORTED_OPERATION_CHECK_ENABLED =
     SQLConfigBuilder("spark.sql.streaming.unsupportedOperationCheck")
       .internal()
@@ -668,8 +667,6 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
 
   def stateStoreMinDeltasForSnapshot: Int = getConf(STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT)
 
-  def stateStoreMinVersionsToRetain: Int = getConf(STATE_STORE_MIN_VERSIONS_TO_RETAIN)
-
   def checkpointLocation: Option[String] = getConf(CHECKPOINT_LOCATION)
 
   def isUnsupportedOperationCheckEnabled: Boolean = getConf(UNSUPPORTED_OPERATION_CHECK_ENABLED)
@@ -723,6 +720,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
   def minNumPostShufflePartitions: Int =
     getConf(SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS)
 
+  def minBatchesToRetain: Int = getConf(MIN_BATCHES_TO_RETAIN)
+
   def parquetFilterPushDown: Boolean = getConf(PARQUET_FILTER_PUSHDOWN_ENABLED)
 
   def orcFilterPushDown: Boolean = getConf(ORC_FILTER_PUSHDOWN_ENABLED)

http://git-wip-us.apache.org/repos/asf/spark/blob/83a42897/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala
index e511fda..435d874 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala
@@ -104,6 +104,7 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext
     withFakeCompactibleFileStreamLog(
       fileCleanupDelayMs = Long.MaxValue,
       defaultCompactInterval = 3,
+      defaultMinBatchesToRetain = 1,
       compactibleLog => {
         assert("0" === compactibleLog.batchIdToPath(0).getName)
         assert("1" === compactibleLog.batchIdToPath(1).getName)
@@ -118,6 +119,7 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext
     withFakeCompactibleFileStreamLog(
       fileCleanupDelayMs = Long.MaxValue,
       defaultCompactInterval = 3,
+      defaultMinBatchesToRetain = 1,
       compactibleLog => {
         val logs = Array("entry_1", "entry_2", "entry_3")
         val expected = s"""${FakeCompactibleFileStreamLog.VERSION}
@@ -138,6 +140,7 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext
     withFakeCompactibleFileStreamLog(
       fileCleanupDelayMs = Long.MaxValue,
       defaultCompactInterval = 3,
+      defaultMinBatchesToRetain = 1,
       compactibleLog => {
         val logs = s"""${FakeCompactibleFileStreamLog.VERSION}
             |"entry_1"
@@ -157,6 +160,7 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext
     withFakeCompactibleFileStreamLog(
       fileCleanupDelayMs = Long.MaxValue,
       defaultCompactInterval = 3,
+      defaultMinBatchesToRetain = 1,
       compactibleLog => {
         for (batchId <- 0 to 10) {
           compactibleLog.add(batchId, Array("some_path_" + batchId))
@@ -175,6 +179,7 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext
     withFakeCompactibleFileStreamLog(
       fileCleanupDelayMs = 0,
       defaultCompactInterval = 3,
+      defaultMinBatchesToRetain = 1,
       compactibleLog => {
         val fs = compactibleLog.metadataPath.getFileSystem(spark.sessionState.newHadoopConf())
 
@@ -194,25 +199,29 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext
         compactibleLog.add(1, Array("some_path_1"))
         assert(Set("0", "1") === listBatchFiles())
         compactibleLog.add(2, Array("some_path_2"))
-        assert(Set("2.compact") === listBatchFiles())
+        assert(Set("0", "1", "2.compact") === listBatchFiles())
         compactibleLog.add(3, Array("some_path_3"))
         assert(Set("2.compact", "3") === listBatchFiles())
         compactibleLog.add(4, Array("some_path_4"))
         assert(Set("2.compact", "3", "4") === listBatchFiles())
         compactibleLog.add(5, Array("some_path_5"))
-        assert(Set("5.compact") === listBatchFiles())
+        assert(Set("2.compact", "3", "4", "5.compact") === listBatchFiles())
+        compactibleLog.add(6, Array("some_path_6"))
+        assert(Set("5.compact", "6") === listBatchFiles())
       })
   }
 
   private def withFakeCompactibleFileStreamLog(
     fileCleanupDelayMs: Long,
     defaultCompactInterval: Int,
+    defaultMinBatchesToRetain: Int,
     f: FakeCompactibleFileStreamLog => Unit
   ): Unit = {
     withTempDir { file =>
       val compactibleLog = new FakeCompactibleFileStreamLog(
         fileCleanupDelayMs,
         defaultCompactInterval,
+        defaultMinBatchesToRetain,
         spark,
         file.getCanonicalPath)
       f(compactibleLog)
@@ -227,6 +236,7 @@ object FakeCompactibleFileStreamLog {
 class FakeCompactibleFileStreamLog(
     _fileCleanupDelayMs: Long,
     _defaultCompactInterval: Int,
+    _defaultMinBatchesToRetain: Int,
     sparkSession: SparkSession,
     path: String)
   extends CompactibleFileStreamLog[String](
@@ -241,5 +251,7 @@ class FakeCompactibleFileStreamLog(
 
   override protected def defaultCompactInterval: Int = _defaultCompactInterval
 
+  override protected val minBatchesToRetain: Int = _defaultMinBatchesToRetain
+
   override def compactLogs(logs: Seq[String]): Seq[String] = logs
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/83a42897/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
index 8a21b76..7e0de5e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
@@ -151,10 +151,11 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
 
   testWithUninterruptibleThread("delete expired file") {
     // Set FILE_SINK_LOG_CLEANUP_DELAY to 0 so that we can detect the deleting behaviour
-    // deterministically
+    // deterministically and one min batches to retain
     withSQLConf(
       SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3",
-      SQLConf.FILE_SINK_LOG_CLEANUP_DELAY.key -> "0") {
+      SQLConf.FILE_SINK_LOG_CLEANUP_DELAY.key -> "0",
+      SQLConf.MIN_BATCHES_TO_RETAIN.key -> "1") {
       withFileStreamSinkLog { sinkLog =>
         val fs = sinkLog.metadataPath.getFileSystem(spark.sessionState.newHadoopConf())
 
@@ -174,13 +175,52 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
         sinkLog.add(1, Array(newFakeSinkFileStatus("/a/b/1", FileStreamSinkLog.ADD_ACTION)))
         assert(Set("0", "1") === listBatchFiles())
         sinkLog.add(2, Array(newFakeSinkFileStatus("/a/b/2", FileStreamSinkLog.ADD_ACTION)))
-        assert(Set("2.compact") === listBatchFiles())
+        assert(Set("0", "1", "2.compact") === listBatchFiles())
         sinkLog.add(3, Array(newFakeSinkFileStatus("/a/b/3", FileStreamSinkLog.ADD_ACTION)))
         assert(Set("2.compact", "3") === listBatchFiles())
         sinkLog.add(4, Array(newFakeSinkFileStatus("/a/b/4", FileStreamSinkLog.ADD_ACTION)))
         assert(Set("2.compact", "3", "4") === listBatchFiles())
         sinkLog.add(5, Array(newFakeSinkFileStatus("/a/b/5", FileStreamSinkLog.ADD_ACTION)))
-        assert(Set("5.compact") === listBatchFiles())
+        assert(Set("2.compact", "3", "4", "5.compact") === listBatchFiles())
+        sinkLog.add(6, Array(newFakeSinkFileStatus("/a/b/6", FileStreamSinkLog.ADD_ACTION)))
+        assert(Set("5.compact", "6") === listBatchFiles())
+      }
+    }
+
+    withSQLConf(
+      SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3",
+      SQLConf.FILE_SINK_LOG_CLEANUP_DELAY.key -> "0",
+      SQLConf.MIN_BATCHES_TO_RETAIN.key -> "2") {
+      withFileStreamSinkLog { sinkLog =>
+        val fs = sinkLog.metadataPath.getFileSystem(spark.sessionState.newHadoopConf())
+
+        def listBatchFiles(): Set[String] = {
+          fs.listStatus(sinkLog.metadataPath).map(_.getPath.getName).filter { fileName =>
+            try {
+              getBatchIdFromFileName(fileName)
+              true
+            } catch {
+              case _: NumberFormatException => false
+            }
+          }.toSet
+        }
+
+        sinkLog.add(0, Array(newFakeSinkFileStatus("/a/b/0", FileStreamSinkLog.ADD_ACTION)))
+        assert(Set("0") === listBatchFiles())
+        sinkLog.add(1, Array(newFakeSinkFileStatus("/a/b/1", FileStreamSinkLog.ADD_ACTION)))
+        assert(Set("0", "1") === listBatchFiles())
+        sinkLog.add(2, Array(newFakeSinkFileStatus("/a/b/2", FileStreamSinkLog.ADD_ACTION)))
+        assert(Set("0", "1", "2.compact") === listBatchFiles())
+        sinkLog.add(3, Array(newFakeSinkFileStatus("/a/b/3", FileStreamSinkLog.ADD_ACTION)))
+        assert(Set("0", "1", "2.compact", "3") === listBatchFiles())
+        sinkLog.add(4, Array(newFakeSinkFileStatus("/a/b/4", FileStreamSinkLog.ADD_ACTION)))
+        assert(Set("2.compact", "3", "4") === listBatchFiles())
+        sinkLog.add(5, Array(newFakeSinkFileStatus("/a/b/5", FileStreamSinkLog.ADD_ACTION)))
+        assert(Set("2.compact", "3", "4", "5.compact") === listBatchFiles())
+        sinkLog.add(6, Array(newFakeSinkFileStatus("/a/b/6", FileStreamSinkLog.ADD_ACTION)))
+        assert(Set("2.compact", "3", "4", "5.compact", "6") === listBatchFiles())
+        sinkLog.add(7, Array(newFakeSinkFileStatus("/a/b/7", FileStreamSinkLog.ADD_ACTION)))
+        assert(Set("5.compact", "6", "7") === listBatchFiles())
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/83a42897/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
index 05fc734..3404b11 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
@@ -376,7 +376,9 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth
     val opId = 0
     val dir = Utils.createDirectory(tempDir, Random.nextString(5)).toString
     val storeId = StateStoreId(dir, opId, 0)
-    val storeConf = StateStoreConf.empty
+    val sqlConf = new SQLConf()
+    sqlConf.setConf(SQLConf.MIN_BATCHES_TO_RETAIN, 2)
+    val storeConf = StateStoreConf(sqlConf)
     val hadoopConf = new Configuration()
     val provider = new HDFSBackedStateStoreProvider(
       storeId, keySchema, valueSchema, storeConf, hadoopConf)
@@ -606,6 +608,7 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth
     ): HDFSBackedStateStoreProvider = {
     val sqlConf = new SQLConf()
     sqlConf.setConf(SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT, minDeltasForSnapshot)
+    sqlConf.setConf(SQLConf.MIN_BATCHES_TO_RETAIN, 2)
     new HDFSBackedStateStoreProvider(
       StateStoreId(dir, opId, partition),
       keySchema,

http://git-wip-us.apache.org/repos/asf/spark/blob/83a42897/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 7be2f21..c66d6b1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.types.StructType
 import org.apache.spark.SparkException
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.functions._
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.util.ManualClock
 
 
@@ -369,25 +370,52 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
   testQuietly("StreamExecution metadata garbage collection") {
     val inputData = MemoryStream[Int]
     val mapped = inputData.toDS().map(6 / _)
+    withSQLConf(SQLConf.MIN_BATCHES_TO_RETAIN.key -> "1") {
+      // Run 3 batches, and then assert that only 2 metadata files is are at the end
+      // since the first should have been purged.
+      testStream(mapped)(
+        AddData(inputData, 1, 2),
+        CheckAnswer(6, 3),
+        AddData(inputData, 1, 2),
+        CheckAnswer(6, 3, 6, 3),
+        AddData(inputData, 4, 6),
+        CheckAnswer(6, 3, 6, 3, 1, 1),
+
+        AssertOnQuery("metadata log should contain only two files") { q =>
+          val metadataLogDir = new java.io.File(q.offsetLog.metadataPath.toString)
+          val logFileNames = metadataLogDir.listFiles().toSeq.map(_.getName())
+          val toTest = logFileNames.filter(!_.endsWith(".crc")).sorted // Workaround for SPARK-17475
+          assert(toTest.size == 2 && toTest.head == "1")
+          true
+        }
+      )
+    }
 
-    // Run 3 batches, and then assert that only 2 metadata files is are at the end
-    // since the first should have been purged.
-    testStream(mapped)(
-      AddData(inputData, 1, 2),
-      CheckAnswer(6, 3),
-      AddData(inputData, 1, 2),
-      CheckAnswer(6, 3, 6, 3),
-      AddData(inputData, 4, 6),
-      CheckAnswer(6, 3, 6, 3, 1, 1),
-
-      AssertOnQuery("metadata log should contain only two files") { q =>
-        val metadataLogDir = new java.io.File(q.offsetLog.metadataPath.toString)
-        val logFileNames = metadataLogDir.listFiles().toSeq.map(_.getName())
-        val toTest = logFileNames.filter(! _.endsWith(".crc")).sorted  // Workaround for SPARK-17475
-        assert(toTest.size == 2 && toTest.head == "1")
-        true
-      }
-    )
+    val inputData2 = MemoryStream[Int]
+    withSQLConf(SQLConf.MIN_BATCHES_TO_RETAIN.key -> "2") {
+      // Run 5 batches, and then assert that 3 metadata files is are at the end
+      // since the two should have been purged.
+      testStream(inputData2.toDS())(
+        AddData(inputData2, 1, 2),
+        CheckAnswer(1, 2),
+        AddData(inputData2, 1, 2),
+        CheckAnswer(1, 2, 1, 2),
+        AddData(inputData2, 3, 4),
+        CheckAnswer(1, 2, 1, 2, 3, 4),
+        AddData(inputData2, 5, 6),
+        CheckAnswer(1, 2, 1, 2, 3, 4, 5, 6),
+        AddData(inputData2, 7, 8),
+        CheckAnswer(1, 2, 1, 2, 3, 4, 5, 6, 7, 8),
+
+        AssertOnQuery("metadata log should contain three files") { q =>
+          val metadataLogDir = new java.io.File(q.offsetLog.metadataPath.toString)
+          val logFileNames = metadataLogDir.listFiles().toSeq.map(_.getName())
+          val toTest = logFileNames.filter(!_.endsWith(".crc")).sorted // Workaround for SPARK-17475
+          assert(toTest.size == 3 && toTest.head == "2")
+          true
+        }
+      )
+    }
   }
 
   /** Create a streaming DF that only execute one batch in which it returns the given static DF */


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org