You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/08/26 23:05:37 UTC

spark git commit: [SPARK-17235][SQL] Support purging of old logs in MetadataLog

Repository: spark
Updated Branches:
  refs/heads/master a11d10f18 -> f64a1ddd0


[SPARK-17235][SQL] Support purging of old logs in MetadataLog

## What changes were proposed in this pull request?
This patch adds a purge interface to MetadataLog, and an implementation in HDFSMetadataLog. The purge function is currently unused, but I will use it to purge old execution and file source logs in follow-up patches. These changes are required in a production structured streaming job that runs for a long period of time.

## How was this patch tested?
Added a unit test case in HDFSMetadataLogSuite.

Author: petermaxlee <pe...@gmail.com>

Closes #14802 from petermaxlee/SPARK-17235.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f64a1ddd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f64a1ddd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f64a1ddd

Branch: refs/heads/master
Commit: f64a1ddd09a34d5d867ccbaba46204d75fad038d
Parents: a11d10f
Author: petermaxlee <pe...@gmail.com>
Authored: Fri Aug 26 16:05:34 2016 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Fri Aug 26 16:05:34 2016 -0700

----------------------------------------------------------------------
 .../execution/streaming/HDFSMetadataLog.scala   | 14 ++++++++++
 .../sql/execution/streaming/MetadataLog.scala   |  6 +++++
 .../streaming/HDFSMetadataLogSuite.scala        | 27 +++++++++++++++++---
 3 files changed, 43 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f64a1ddd/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
index 2b6f76c..127ece9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
@@ -227,6 +227,20 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String)
     None
   }
 
+  /**
+   * Removes all the log entry earlier than thresholdBatchId (exclusive).
+   */
+  override def purge(thresholdBatchId: Long): Unit = {
+    val batchIds = fileManager.list(metadataPath, batchFilesFilter)
+      .map(f => pathToBatchId(f.getPath))
+
+    for (batchId <- batchIds if batchId < thresholdBatchId) {
+      val path = batchIdToPath(batchId)
+      fileManager.delete(path)
+      logTrace(s"Removed metadata log file: $path")
+    }
+  }
+
   private def createFileManager(): FileManager = {
     val hadoopConf = sparkSession.sessionState.newHadoopConf()
     try {

http://git-wip-us.apache.org/repos/asf/spark/blob/f64a1ddd/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala
index cc70e1d..78d6be1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala
@@ -48,4 +48,10 @@ trait MetadataLog[T] {
    * Return the latest batch Id and its metadata if exist.
    */
   def getLatest(): Option[(Long, T)]
+
+  /**
+   * Removes all the log entry earlier than thresholdBatchId (exclusive).
+   * This operation should be idempotent.
+   */
+  def purge(thresholdBatchId: Long): Unit
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/f64a1ddd/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
index ab5a2d2..4259384 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
@@ -46,14 +46,14 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
   test("FileManager: FileContextManager") {
     withTempDir { temp =>
       val path = new Path(temp.getAbsolutePath)
-      testManager(path, new FileContextManager(path, new Configuration))
+      testFileManager(path, new FileContextManager(path, new Configuration))
     }
   }
 
   test("FileManager: FileSystemManager") {
     withTempDir { temp =>
       val path = new Path(temp.getAbsolutePath)
-      testManager(path, new FileSystemManager(path, new Configuration))
+      testFileManager(path, new FileSystemManager(path, new Configuration))
     }
   }
 
@@ -103,6 +103,25 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
     }
   }
 
+  testWithUninterruptibleThread("HDFSMetadataLog: purge") {
+    withTempDir { temp =>
+      val metadataLog = new HDFSMetadataLog[String](spark, temp.getAbsolutePath)
+      assert(metadataLog.add(0, "batch0"))
+      assert(metadataLog.add(1, "batch1"))
+      assert(metadataLog.add(2, "batch2"))
+      assert(metadataLog.get(0).isDefined)
+      assert(metadataLog.get(1).isDefined)
+      assert(metadataLog.get(2).isDefined)
+      assert(metadataLog.getLatest().get._1 == 2)
+
+      metadataLog.purge(2)
+      assert(metadataLog.get(0).isEmpty)
+      assert(metadataLog.get(1).isEmpty)
+      assert(metadataLog.get(2).isDefined)
+      assert(metadataLog.getLatest().get._1 == 2)
+    }
+  }
+
   testWithUninterruptibleThread("HDFSMetadataLog: restart") {
     withTempDir { temp =>
       val metadataLog = new HDFSMetadataLog[String](spark, temp.getAbsolutePath)
@@ -155,8 +174,8 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
     }
   }
 
-
-  def testManager(basePath: Path, fm: FileManager): Unit = {
+  /** Basic test case for [[FileManager]] implementation. */
+  private def testFileManager(basePath: Path, fm: FileManager): Unit = {
     // Mkdirs
     val dir = new Path(s"$basePath/dir/subdir/subsubdir")
     assert(!fm.exists(dir))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org