You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/12/14 12:37:25 UTC

[GitHub] [spark] shuyouZZ commented on a diff in pull request #38983: [SPARK-41447][CORE] Reduce the number of doMergeApplicationListing invocations

shuyouZZ commented on code in PR #38983:
URL: https://github.com/apache/spark/pull/38983#discussion_r1048416951


##########
core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala:
##########
@@ -1705,6 +1705,61 @@ abstract class FsHistoryProviderSuite extends SparkFunSuite with Matchers with P
     provider.stop()
   }
 
+  test("SPARK-41447: clean up expired event log files that don't exist in listing db") {
+    class TestFsHistoryProvider(conf: SparkConf, clock: Clock)
+      extends FsHistoryProvider(conf, clock) {
+      var doMergeApplicationListingCall = 0
+      override private[history] def doMergeApplicationListing(
+          reader: EventLogFileReader,
+          lastSeen: Long,
+          enableSkipToEnd: Boolean,
+          lastCompactionIndex: Option[Long]): Unit = {
+        super.doMergeApplicationListing(reader, lastSeen, enableSkipToEnd, lastCompactionIndex)
+        doMergeApplicationListingCall += 1
+      }
+    }
+
+    val maxAge = TimeUnit.SECONDS.toMillis(10)
+    val clock = new ManualClock(maxAge / 2)
+    val conf = createTestConf().set(MAX_LOG_AGE_S.key, s"${maxAge}ms").set(CLEANER_ENABLED, true)
+    val provider = new TestFsHistoryProvider(conf, clock)
+
+    val log1 = newLogFile("app1", Some("attempt1"), inProgress = false)
+    writeFile(log1, None,
+      SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", Some("attempt1")),
+      SparkListenerApplicationEnd(2L)
+    )
+    log1.setLastModified(0L)
+
+    val log2 = newLogFile("app1", Some("attempt2"), inProgress = false)
+    writeFile(log2, None,
+      SparkListenerApplicationStart("app1", Some("app1"), 2L, "test", Some("attempt2")),
+      SparkListenerApplicationEnd(4L)
+    )
+    log2.setLastModified(clock.getTimeMillis())
+
+    val log3 = newLogFile("app2", Some("attempt1"), inProgress = false)
+    writeFile(log3, None,
+      SparkListenerApplicationStart("app2", Some("app1"), 3L, "test", Some("attempt1")),
+      SparkListenerApplicationEnd(4L)
+    )
+    log3.setLastModified(0L)
+
+    provider.getListing().size should be (0)
+
+    // Move the clock forward so log1 and log3 exceed the max age.
+    clock.advance(maxAge)
+    // Avoid unnecessary parse, the expired log files would be cleaned by checkForLogs().
+    provider.checkForLogs()
+
+    provider.doMergeApplicationListingCall should be (1)
+    provider.getListing().size should be (1)

Review Comment:
   > I want to help you by merging this PR. If `deleting expired log` are processed already in Apache Spark by other logic, please revise the PR title and description. Then, it will be easier for me to merge this PR.
   
   Have fixed, thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org