You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by viper-kun <gi...@git.apache.org> on 2014/09/20 10:33:33 UTC

[GitHub] spark pull request: Periodic cleanup event logs

GitHub user viper-kun opened a pull request:

    https://github.com/apache/spark/pull/2471

    Periodic cleanup event logs

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/viper-kun/spark deletelog2

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/2471.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2471
    
----
commit f085cf02921574f734caa816ba1515b6e44b7341
Author: xukun 00228947 <xu...@huawei.com>
Date:   2014-09-20T06:45:28Z

    Periodic cleanup event logs

commit 9cd45d48a4db3b6acdbbfea652a3fefc2b030190
Author: viper-kun <xu...@huawei.com>
Date:   2014-09-20T08:26:03Z

    Update monitoring.md

commit 78e78cb5917be9df6b019e297c24d47d88ff5c12
Author: viper-kun <xu...@huawei.com>
Date:   2014-09-20T08:27:50Z

    Update monitoring.md

commit e72b51ad20f3c7fbac8cd92860827df2f23ba71d
Author: viper-kun <xu...@huawei.com>
Date:   2014-09-20T08:28:39Z

    Update monitoring.md

commit da637c0a9ee7aaf163c4da77f07affc67fecfacd
Author: viper-kun <xu...@huawei.com>
Date:   2014-09-20T08:30:13Z

    Update monitoring.md

commit ce53db2db2e1be8b2a1a1d1dec56fd8df8a43b41
Author: viper-kun <xu...@huawei.com>
Date:   2014-09-20T08:31:37Z

    Update monitoring.md

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the pull request:

    https://github.com/apache/spark/pull/2471#issuecomment-59302984
  
    @viper-kun mostly good, just a few minor things left as far as I'm concerned.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by viper-kun <gi...@git.apache.org>.
Github user viper-kun commented on the pull request:

    https://github.com/apache/spark/pull/2471#issuecomment-60027733
  
    @vanzin @andrewor14. is it ok to go?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2471#discussion_r19179121
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
    @@ -29,14 +31,36 @@ import org.apache.spark.scheduler._
     import org.apache.spark.ui.SparkUI
     import org.apache.spark.util.Utils
     
    +import com.google.common.util.concurrent.ThreadFactoryBuilder
    --- End diff --
    
    nit: this should come before the `org.hadoop.` import (and be in the same "group").


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2471#discussion_r18740457
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
    @@ -195,22 +241,68 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
               }
             }
     
    -        val newIterator = logInfos.iterator.buffered
    -        val oldIterator = applications.values.iterator.buffered
    -        while (newIterator.hasNext && oldIterator.hasNext) {
    -          if (newIterator.head.endTime > oldIterator.head.endTime) {
    -            addIfAbsent(newIterator.next)
    -          } else {
    -            addIfAbsent(oldIterator.next)
    +        applications.synchronized {
    +          val newIterator = logInfos.iterator.buffered
    +          val oldIterator = applications.values.iterator.buffered
    +          while (newIterator.hasNext && oldIterator.hasNext) {
    +            if (newIterator.head.endTime > oldIterator.head.endTime) {
    +              addIfAbsent(newIterator.next)
    +            } else {
    +              addIfAbsent(oldIterator.next)
    +            }
               }
    +          newIterator.foreach(addIfAbsent)
    +          oldIterator.foreach(addIfAbsent)
    +
    +          applications = newApps
             }
    -        newIterator.foreach(addIfAbsent)
    -        oldIterator.foreach(addIfAbsent)
    +      }
    +    } catch {
    +      case t: Throwable => logError("Exception in checking for event log updates", t)
    +    }
    +  }
    +
    +  /**
    +   *  Deleting apps if setting cleaner.
    +   */
    +  private def cleanLogs() = {
    +    lastLogCleanTimeMs = getMonotonicTimeMs()
    +    logDebug("Cleaning logs. Time is now %d.".format(lastLogCleanTimeMs))
    --- End diff --
    
    Nit: string interpolation is probably clearer: `s:"Cleaning ... now $lastLogCleanTimeMs"`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the pull request:

    https://github.com/apache/spark/pull/2471#issuecomment-57666110
  
    > a system approach means using something like logrotate or a cleaner process that's run from cron.
    
    The only thing you can really use system utilities for is cron, which is the least important part of this change. Really, this is not an expensive process that will bring down the HDFS server, and it's scheduled to run at very long intervals. The constant polling for new logs is orders of magnitude more disruptive than this cleanup thread.
    
    AFAIK, logrotate doesn't work on HDFS. Now you'd be asking for people to set us the NFS bridge or even fuse-hdfs just to clean up Spark event log files.
    
    Finally, Spark theoretically supports Windows. This is a simple way to achieve compatibility with that. And it doesn't require people to set things up outside of their Spark ecosystem, meaning it's easier to maintain.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2471#discussion_r17943683
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
    @@ -214,6 +252,32 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
         }
       }
     
    +  /**
    +   *  Deleting apps if setting cleaner.
    +   */
    +  private def cleanLogs() = {
    +    lastLogCleanTimeMs = System.currentTimeMillis()
    +    logDebug("Cleaning logs. Time is now %d.".format(lastLogCleanTimeMs))
    +    try {
    +      val logStatus = fs.listStatus(new Path(resolvedLogDir))
    +      val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]()
    +      val maxAge = conf.getLong("spark.history.fs.maxAge-s",
    +        DEFAULT_SPARK_HISTORY_FS_MAXAGE_S) * 1000
    +
    +      // scan all logs from the log directory.
    +      // Only directories older than this many seconds will be deleted .
    +      logDirs.foreach { dir =>
    +        //history file older than this many seconds will be deleted when the history cleaner runs.
    --- End diff --
    
    nit: comment formatting.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by viper-kun <gi...@git.apache.org>.
Github user viper-kun commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2471#discussion_r18740124
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
    @@ -195,22 +241,68 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
               }
             }
     
    -        val newIterator = logInfos.iterator.buffered
    -        val oldIterator = applications.values.iterator.buffered
    -        while (newIterator.hasNext && oldIterator.hasNext) {
    -          if (newIterator.head.endTime > oldIterator.head.endTime) {
    -            addIfAbsent(newIterator.next)
    -          } else {
    -            addIfAbsent(oldIterator.next)
    +        applications.synchronized {
    +          val newIterator = logInfos.iterator.buffered
    +          val oldIterator = applications.values.iterator.buffered
    +          while (newIterator.hasNext && oldIterator.hasNext) {
    +            if (newIterator.head.endTime > oldIterator.head.endTime) {
    +              addIfAbsent(newIterator.next)
    +            } else {
    +              addIfAbsent(oldIterator.next)
    +            }
               }
    +          newIterator.foreach(addIfAbsent)
    +          oldIterator.foreach(addIfAbsent)
    +
    +          applications = newApps
             }
    -        newIterator.foreach(addIfAbsent)
    -        oldIterator.foreach(addIfAbsent)
    +      }
    +    } catch {
    +      case t: Throwable => logError("Exception in checking for event log updates", t)
    +    }
    +  }
    +
    +  /**
    +   *  Deleting apps if setting cleaner.
    +   */
    +  private def cleanLogs() = {
    +    lastLogCleanTimeMs = getMonotonicTimeMs()
    +    logDebug("Cleaning logs. Time is now %d.".format(lastLogCleanTimeMs))
    +    try {
    +      val logStatus = fs.listStatus(new Path(resolvedLogDir))
    +      val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]()
    +      val maxAge = conf.getLong("spark.history.fs.maxAge.seconds",
    +        DEFAULT_SPARK_HISTORY_FS_MAXAGE_S) * 1000
    +
    +      val now = System.currentTimeMillis()
    +      fs.synchronized {
    +        // scan all logs from the log directory.
    +        // Only directories older than this many seconds will be deleted .
    +        logDirs.foreach { dir =>
    +          // history file older than this many seconds will be deleted 
    +          // when the history cleaner runs.
    +          if (now - getModificationTime(dir) > maxAge) {
    +            fs.delete(dir.getPath, true)
    +          }
    +        }
    +      }
    +      
    +      val newApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()
    +      def addIfNotExpire(info: FsApplicationHistoryInfo) = {
    +        if(now - info.lastUpdated <= maxAge) {
    +          newApps += (info.id -> info)
    --- End diff --
    
    info.lastUpdated is the timestamps of the directory and the info.lastUpdated is always bigger than the files timestamps. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2471#discussion_r17943673
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
    @@ -100,6 +132,12 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
         checkForLogs()
         logCheckingThread.setDaemon(true)
         logCheckingThread.start()
    +
    +    //start cleaner thread if spark.history.fs.cleaner.enable is true
    +    if(conf.getBoolean("spark.history.fs.cleaner.enable", false)) {
    --- End diff --
    
    nit: `if (`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by viper-kun <gi...@git.apache.org>.
Github user viper-kun commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2471#discussion_r18739911
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
    @@ -195,22 +241,68 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
               }
             }
     
    -        val newIterator = logInfos.iterator.buffered
    -        val oldIterator = applications.values.iterator.buffered
    -        while (newIterator.hasNext && oldIterator.hasNext) {
    -          if (newIterator.head.endTime > oldIterator.head.endTime) {
    -            addIfAbsent(newIterator.next)
    -          } else {
    -            addIfAbsent(oldIterator.next)
    +        applications.synchronized {
    --- End diff --
    
    I think there is a need for the two tasks to never run concurrently. if the order is: 
    1. check task get applications 
    2. clean task get applications
    3. clean task get result, and replace applications
    4. check task get result, and replace applications
    then clean task result is covered by check result.
    use a ScheduledExecutorService with a single worker thread is a good way to solve it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by viper-kun <gi...@git.apache.org>.
Github user viper-kun closed the pull request at:

    https://github.com/apache/spark/pull/2471


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2471#discussion_r17943665
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
    @@ -100,6 +132,12 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
         checkForLogs()
         logCheckingThread.setDaemon(true)
         logCheckingThread.start()
    +
    +    //start cleaner thread if spark.history.fs.cleaner.enable is true
    --- End diff --
    
    nit: comment format. (please run the scalastyle target on your code)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2471#discussion_r18737263
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
    @@ -54,35 +66,57 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
       // is already known.
       private var lastModifiedTime = -1L
     
    +  // A timestamp of when the disk was last accessed to check for event log to delete
    +  private var lastLogCleanTimeMs = -1L
    +
       // Mapping of application IDs to their metadata, in descending end time order. Apps are inserted
       // into the map in order, so the LinkedHashMap maintains the correct ordering.
       @volatile private var applications: mutable.LinkedHashMap[String, FsApplicationHistoryInfo]
         = new mutable.LinkedHashMap()
     
       /**
    -   * A background thread that periodically checks for event log updates on disk.
    -   *
    -   * If a log check is invoked manually in the middle of a period, this thread re-adjusts the
    -   * time at which it performs the next log check to maintain the same period as before.
    +   * A background thread that periodically do something about event log.
        *
    -   * TODO: Add a mechanism to update manually.
    +   * If operateFun is invoked manually in the middle of a period, this thread re-adjusts the
    +   * time at which it does operateFun to maintain the same period as before.
        */
    -  private val logCheckingThread = new Thread("LogCheckingThread") {
    -    override def run() = Utils.logUncaughtExceptions {
    -      while (true) {
    -        val now = getMonotonicTimeMs()
    -        if (now - lastLogCheckTimeMs > UPDATE_INTERVAL_MS) {
    -          Thread.sleep(UPDATE_INTERVAL_MS)
    -        } else {
    -          // If the user has manually checked for logs recently, wait until
    -          // UPDATE_INTERVAL_MS after the last check time
    -          Thread.sleep(lastLogCheckTimeMs + UPDATE_INTERVAL_MS - now)
    +  private def getThread(
    +      name: String,
    +      lastTimeMsFun: () => Long,
    --- End diff --
    
    You could manage this value inside the thread itself and avoid having the fields for each thread and the extra code to pass it to this method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the pull request:

    https://github.com/apache/spark/pull/2471#issuecomment-59998653
  
    @viper-kun lgtm, but you'll need to get the attention of a committer. :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the pull request:

    https://github.com/apache/spark/pull/2471#issuecomment-56600443
  
    I think there's a very unlikely race in your code: it's possible, if things are messed up just right, that the reader thread might try to read a log file that is being deleted by the cleaner thread. I believe that the code will handle that correctly, but it doesn't hurt to check.
    
    @mattf don't know what you mean by "functionality that is already provided by the system". I'm not aware of HDFS having any way to automatically to housekeeping of old files.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2471#discussion_r18737439
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
    @@ -145,43 +186,48 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
       private def checkForLogs() = {
         lastLogCheckTimeMs = getMonotonicTimeMs()
         logDebug("Checking for logs. Time is now %d.".format(lastLogCheckTimeMs))
    -    try {
    -      val logStatus = fs.listStatus(new Path(resolvedLogDir))
    -      val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]()
    +    var logInfos: Seq[FsApplicationHistoryInfo] = null
     
    -      // Load all new logs from the log directory. Only directories that have a modification time
    -      // later than the last known log directory will be loaded.
    -      var newLastModifiedTime = lastModifiedTime
    -      val logInfos = logDirs
    -        .filter { dir =>
    -          if (fs.isFile(new Path(dir.getPath(), EventLoggingListener.APPLICATION_COMPLETE))) {
    -            val modTime = getModificationTime(dir)
    -            newLastModifiedTime = math.max(newLastModifiedTime, modTime)
    -            modTime > lastModifiedTime
    -          } else {
    -            false
    +    // Load all new logs from the log directory. Only directories that have a modification time
    +    // later than the last known log directory will be loaded.
    +    var newLastModifiedTime = lastModifiedTime
    +    
    +    try {
    +      fs.synchronized {
    --- End diff --
    
    What's this synchronization trying to achieve?
    
    If there's really a need for the two tasks to never run concurrently, I'd say it's better to use a `ScheduledExecutorService` with a single worker thread.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the pull request:

    https://github.com/apache/spark/pull/2471#issuecomment-57689992
  
    @mattf I understand what you're trying to say, but think about it in context. As I said above, the "when to poll the file system" code is the most trivial part of this change. The *only* advantage of using cron for that is that you'd have more scheduling options - e.g., absolute times instead of a period.
    
    To achieve that, you'd be considerably complicating everything else. You'd be creating a new command line tool in Spark, that needs to deal with command line arguments, be documented, and handle security settings (e.g. kerberos) - so it's more burden for everybody, maintaners of the code and admins alike.
    
    And all that for a trivial, and I'd say, not really needed gain in functionality.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by aw-altiscale <gi...@git.apache.org>.
Github user aw-altiscale commented on the pull request:

    https://github.com/apache/spark/pull/2471#issuecomment-58933013
  
    *appears in a puff of smoke*
    
    Hi.  I did a very quick skim.  You might be interested in HDFS-6382.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2471#discussion_r18934502
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
    @@ -60,29 +73,23 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
         = new mutable.LinkedHashMap()
     
       /**
    -   * A background thread that periodically checks for event log updates on disk.
    -   *
    -   * If a log check is invoked manually in the middle of a period, this thread re-adjusts the
    -   * time at which it performs the next log check to maintain the same period as before.
    -   *
    -   * TODO: Add a mechanism to update manually.
    +   * A background thread that periodically do something about event log.
        */
    -  private val logCheckingThread = new Thread("LogCheckingThread") {
    -    override def run() = Utils.logUncaughtExceptions {
    -      while (true) {
    -        val now = getMonotonicTimeMs()
    -        if (now - lastLogCheckTimeMs > UPDATE_INTERVAL_MS) {
    -          Thread.sleep(UPDATE_INTERVAL_MS)
    -        } else {
    -          // If the user has manually checked for logs recently, wait until
    -          // UPDATE_INTERVAL_MS after the last check time
    -          Thread.sleep(lastLogCheckTimeMs + UPDATE_INTERVAL_MS - now)
    -        }
    -        checkForLogs()
    +  private def getThread(name: String, operateFun: () => Unit): Thread = {
    +    val thread = new Thread(name) {
    +      override def run() = Utils.logUncaughtExceptions {
    +        operateFun()
           }
         }
    +    thread
       }
     
    +  // A background thread that periodically checks for event log updates on disk.
    +  private val logCheckingThread = getThread("LogCheckingThread", checkForLogs)
    +
    +  // A background thread that periodically cleans event logs on disk.
    +  private val logCleaningThread = getThread("LogCleaningThread", cleanLogs)
    --- End diff --
    
    And this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2471#discussion_r18737534
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
    @@ -195,22 +241,68 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
               }
             }
     
    -        val newIterator = logInfos.iterator.buffered
    -        val oldIterator = applications.values.iterator.buffered
    -        while (newIterator.hasNext && oldIterator.hasNext) {
    -          if (newIterator.head.endTime > oldIterator.head.endTime) {
    -            addIfAbsent(newIterator.next)
    -          } else {
    -            addIfAbsent(oldIterator.next)
    +        applications.synchronized {
    +          val newIterator = logInfos.iterator.buffered
    +          val oldIterator = applications.values.iterator.buffered
    +          while (newIterator.hasNext && oldIterator.hasNext) {
    +            if (newIterator.head.endTime > oldIterator.head.endTime) {
    +              addIfAbsent(newIterator.next)
    +            } else {
    +              addIfAbsent(oldIterator.next)
    +            }
               }
    +          newIterator.foreach(addIfAbsent)
    +          oldIterator.foreach(addIfAbsent)
    +
    +          applications = newApps
             }
    -        newIterator.foreach(addIfAbsent)
    -        oldIterator.foreach(addIfAbsent)
    +      }
    +    } catch {
    +      case t: Throwable => logError("Exception in checking for event log updates", t)
    +    }
    +  }
    +
    +  /**
    +   *  Deleting apps if setting cleaner.
    +   */
    +  private def cleanLogs() = {
    +    lastLogCleanTimeMs = getMonotonicTimeMs()
    +    logDebug("Cleaning logs. Time is now %d.".format(lastLogCleanTimeMs))
    +    try {
    +      val logStatus = fs.listStatus(new Path(resolvedLogDir))
    +      val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]()
    +      val maxAge = conf.getLong("spark.history.fs.maxAge.seconds",
    +        DEFAULT_SPARK_HISTORY_FS_MAXAGE_S) * 1000
    +
    +      val now = System.currentTimeMillis()
    +      fs.synchronized {
    +        // scan all logs from the log directory.
    +        // Only directories older than this many seconds will be deleted .
    +        logDirs.foreach { dir =>
    +          // history file older than this many seconds will be deleted 
    +          // when the history cleaner runs.
    +          if (now - getModificationTime(dir) > maxAge) {
    +            fs.delete(dir.getPath, true)
    +          }
    +        }
    +      }
    +      
    +      val newApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()
    +      def addIfNotExpire(info: FsApplicationHistoryInfo) = {
    +        if(now - info.lastUpdated <= maxAge) {
    +          newApps += (info.id -> info)
    --- End diff --
    
    So, there's the potential that `info.lastUpdated` may be out of sync with the file system. This could happen if someone messes with the timestamps of the files, for example, so that `info.lastUpdated` is recent but the files in the directory were actually cleaned up by the code above.
    
    In that case you'd have stale entries in the applications list and the user would see an error when clicking on the link.
    
    That's not a huge issue, since it's not the only way that can happen (as with all cases where you're polling things). I was thinking about some way to make `checkForLogs` deal with that instead, but not sure if that would be any simpler.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2471#discussion_r18107089
  
    --- Diff: docs/monitoring.md ---
    @@ -135,6 +135,29 @@ follows:
           <code>spark.ui.view.acls</code> when the application was run will also have authorization
           to view that application. 
           If disabled, no access control checks are made. 
    +    </td>  
    +  </tr>   
    +  <tr>
    +    <td>spark.history.fs.cleaner.enable</td>
    +    <td>false</td>
    +    <td>
    +      Specifies whether job history cleaner should check for files to delete.
    +    </td>
    +  </tr>
    +  <tr>
    +    <td>spark.history.fs.cleaner.interval-s</td>
    +    <td>86400</td>
    +    <td>
    +      How often the job history cleaner checks for files to delete, in seconds. Defaults to 864000 (one day).
    +      Files are only deleted if they are older than spark.history.fs.maxAge-s.
    +    </td>
    +  </tr>
    +  <tr>
    +    <td>spark.history.fs.maxAge-s</td>
    --- End diff --
    
    I think `maxAge.seconds` looks much nicer


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2471#discussion_r18737213
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
    @@ -28,15 +28,27 @@ import org.apache.spark.deploy.SparkHadoopUtil
     import org.apache.spark.scheduler._
     import org.apache.spark.ui.SparkUI
     import org.apache.spark.util.Utils
    +import java.util.concurrent.TimeUnit
    +import scala.concurrent.duration.Duration
    --- End diff --
    
    nit: should be up there with other scala imports


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2471#discussion_r18737575
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
    @@ -195,22 +241,68 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
               }
             }
     
    -        val newIterator = logInfos.iterator.buffered
    -        val oldIterator = applications.values.iterator.buffered
    -        while (newIterator.hasNext && oldIterator.hasNext) {
    -          if (newIterator.head.endTime > oldIterator.head.endTime) {
    -            addIfAbsent(newIterator.next)
    -          } else {
    -            addIfAbsent(oldIterator.next)
    +        applications.synchronized {
    +          val newIterator = logInfos.iterator.buffered
    +          val oldIterator = applications.values.iterator.buffered
    +          while (newIterator.hasNext && oldIterator.hasNext) {
    +            if (newIterator.head.endTime > oldIterator.head.endTime) {
    +              addIfAbsent(newIterator.next)
    +            } else {
    +              addIfAbsent(oldIterator.next)
    +            }
               }
    +          newIterator.foreach(addIfAbsent)
    +          oldIterator.foreach(addIfAbsent)
    +
    +          applications = newApps
             }
    -        newIterator.foreach(addIfAbsent)
    -        oldIterator.foreach(addIfAbsent)
    +      }
    +    } catch {
    +      case t: Throwable => logError("Exception in checking for event log updates", t)
    +    }
    +  }
    +
    +  /**
    +   *  Deleting apps if setting cleaner.
    --- End diff --
    
    nit: the comment is very cryptic.
    
    Try something like: "Delete event logs from the log directory according to the clean policy defined by the user."


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by mattf <gi...@git.apache.org>.
Github user mattf commented on the pull request:

    https://github.com/apache/spark/pull/2471#issuecomment-57687603
  
    > Then what's your argument? Which is the system tool that this code is replacing?
    
    in this case you can separate the timer code and the function that does the rotation from spark. wrap the function w/ a main method and turn it into a spark-log-rotation-tool, use cron to execute it periodically.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the pull request:

    https://github.com/apache/spark/pull/2471#issuecomment-59231360
  
    @mattf Always in for a beer but unfortunately I'm not in NY...
    
    Also, you mention "rotating" a lot. This is not rotating. This is cleaning up, as in deleting existing logs. Having an external process do that for you complicates the History Server because now there's the possibility that it will serve stale data - links to jobs that have been cleaned up by that external process, and will result in an error in the UI. (Which is why I mentioned above that an external sweeper without inotify is a no-go.)
    
    Also, this is not a trace log. This is application history information. This is not your Linux syslog or Windows event log. This is very application-specific.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2471#discussion_r18737446
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
    @@ -195,22 +241,68 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
               }
             }
     
    -        val newIterator = logInfos.iterator.buffered
    -        val oldIterator = applications.values.iterator.buffered
    -        while (newIterator.hasNext && oldIterator.hasNext) {
    -          if (newIterator.head.endTime > oldIterator.head.endTime) {
    -            addIfAbsent(newIterator.next)
    -          } else {
    -            addIfAbsent(oldIterator.next)
    +        applications.synchronized {
    +          val newIterator = logInfos.iterator.buffered
    +          val oldIterator = applications.values.iterator.buffered
    +          while (newIterator.hasNext && oldIterator.hasNext) {
    +            if (newIterator.head.endTime > oldIterator.head.endTime) {
    +              addIfAbsent(newIterator.next)
    +            } else {
    +              addIfAbsent(oldIterator.next)
    +            }
               }
    +          newIterator.foreach(addIfAbsent)
    +          oldIterator.foreach(addIfAbsent)
    +
    +          applications = newApps
             }
    -        newIterator.foreach(addIfAbsent)
    -        oldIterator.foreach(addIfAbsent)
    +      }
    +    } catch {
    +      case t: Throwable => logError("Exception in checking for event log updates", t)
    --- End diff --
    
    Catching `Throwable` is generally frowned upon.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by viper-kun <gi...@git.apache.org>.
Github user viper-kun commented on the pull request:

    https://github.com/apache/spark/pull/2471#issuecomment-71575458
  
    I have file a new pr #4214


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by viper-kun <gi...@git.apache.org>.
Github user viper-kun commented on the pull request:

    https://github.com/apache/spark/pull/2471#issuecomment-58624839
  
    @mattf @vanzin  is this ok to go ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by viper-kun <gi...@git.apache.org>.
Github user viper-kun commented on the pull request:

    https://github.com/apache/spark/pull/2471#issuecomment-59038886
  
    in my opinion,  spark create event log data, and spark delete it. In hadoop, event log is deleted by JobHistoryServer, not by fileSystem.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2471#discussion_r18106863
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
    @@ -83,6 +96,25 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
         }
       }
     
    +  /**
    +   * A background thread that periodically cleans event logs on disk.
    +   *
    +   * TODO: Add a mechanism to delete manually.
    +   */
    +  private val logCleaningThread = new Thread("LogCleaningThread") {
    +    override def run() = Utils.logUncaughtExceptions {
    +      while (true) {
    +        val now = System.currentTimeMillis()
    +        if (now - lastLogCleanTimeMs > CLEAN_INTERVAL_MS) {
    +          Thread.sleep(CLEAN_INTERVAL_MS)
    +        } else {
    +          Thread.sleep(lastLogCleanTimeMs + CLEAN_INTERVAL_MS - now)
    +        }
    +        cleanLogs()
    +      }
    +    }
    +  }
    +
    --- End diff --
    
    This looks a lot like `logCheckingThread`. Maybe it makes sense to make an abstract parent thread that both of these extend.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by mattf <gi...@git.apache.org>.
Github user mattf commented on the pull request:

    https://github.com/apache/spark/pull/2471#issuecomment-56284715
  
    i strongly suggest against duplicating functionality that is already provided by the system where these logs are written.
    
    however, if you proceed with this, the logic for triggering a clean needs to be improved. first, interrupts to sleep need to be considered. second, clean should occur on startup and the interval, otherwise it may never occur at all. third, if cleaning may be an expensive operation, it more desirable to trigger at a known off peak time / predictable time, instead of X seconds since startup (default every day keyed off startup).
    
    if there's special handling that should be done with these log files, i'd suggest a log clean utility that can be triggered by cron.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the pull request:

    https://github.com/apache/spark/pull/2471#issuecomment-71058558
  
    I'm not a committer so I can't merge the patch. But it has merge conflicts now, so that at least needs to be fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2471#discussion_r18106912
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
    @@ -34,10 +34,20 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
     
       private val NOT_STARTED = "<Not Started>"
     
    +  //one day
    +  private val DEFAULT_SPARK_HISTORY_FS_CLEANER_INTERVAL_S = 1 * 24 * 60 * 60
    +
    +  //one week
    +  private val DEFAULT_SPARK_HISTORY_FS_MAXAGE_S = 7 * 24 * 60 * 60
    +
       // Interval between each check for event log updates
       private val UPDATE_INTERVAL_MS = conf.getInt("spark.history.fs.updateInterval",
         conf.getInt("spark.history.updateInterval", 10)) * 1000
     
    +  // Interval between each cleaner checks for event logs to delete
    +  private val CLEAN_INTERVAL_MS = conf.getLong("spark.history.fs.cleaner.interval-s",
    --- End diff --
    
    can you make this consistent with the other config, i.e. `spark.history.fs.cleanInterval`? I think it's ok to leave the "seconds" part in the configuration and in the comments, otherwise the config would look jumbled.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the pull request:

    https://github.com/apache/spark/pull/2471#issuecomment-59231417
  
    @viper-kun haven't had a chance to look at the diff again, but it seems there are merge conflicts now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2471#discussion_r18935441
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
    @@ -46,8 +58,9 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
       private val fs = Utils.getHadoopFileSystem(resolvedLogDir,
         SparkHadoopUtil.get.newConfiguration(conf))
     
    -  // A timestamp of when the disk was last accessed to check for log updates
    -  private var lastLogCheckTimeMs = -1L
    +  // The schedule thread pool size must be one,otherwise it will have concurrent issues about fs
    +  // and applications between check task and clean task..
    +  private val pool = Executors.newScheduledThreadPool(1)
    --- End diff --
    
    Ah, another thing: you should override `stop()` and shut down this executor cleanly (it's mostly a "best effort" thing, but still).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2471#discussion_r18934416
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
    @@ -34,9 +36,19 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
     
       private val NOT_STARTED = "<Not Started>"
     
    +  // One day
    +  private val DEFAULT_SPARK_HISTORY_FS_CLEANER_INTERVAL_S = Duration(1, TimeUnit.DAYS).toSeconds
    +
    +  // One week
    +  private val DEFAULT_SPARK_HISTORY_FS_MAXAGE_S = Duration(7, TimeUnit.DAYS).toSeconds
    +
       // Interval between each check for event log updates
    -  private val UPDATE_INTERVAL_MS = conf.getInt("spark.history.fs.updateInterval",
    --- End diff --
    
    (Bonus for printing a warning log if the old setting is used.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the pull request:

    https://github.com/apache/spark/pull/2471#issuecomment-58731904
  
    I saw the introduction of some synchronization in the code that doesn't seem to be serving any purpose... a better explanation for why it's needed would be nice. I'm pretty sure it is not needed, and whatever problem you're trying to work around should be done in a different way.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by viper-kun <gi...@git.apache.org>.
Github user viper-kun commented on the pull request:

    https://github.com/apache/spark/pull/2471#issuecomment-60541215
  
    @vanzin @andrewor14 @srowen . is it ok to go?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2471#discussion_r18934400
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
    @@ -34,9 +36,19 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
     
       private val NOT_STARTED = "<Not Started>"
     
    +  // One day
    +  private val DEFAULT_SPARK_HISTORY_FS_CLEANER_INTERVAL_S = Duration(1, TimeUnit.DAYS).toSeconds
    +
    +  // One week
    +  private val DEFAULT_SPARK_HISTORY_FS_MAXAGE_S = Duration(7, TimeUnit.DAYS).toSeconds
    +
       // Interval between each check for event log updates
    -  private val UPDATE_INTERVAL_MS = conf.getInt("spark.history.fs.updateInterval",
    --- End diff --
    
    You can't just remove the existing setting. You need to still read it for backwards compatibility.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2471#discussion_r18934524
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
    @@ -210,7 +220,44 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
             applications = newApps
           }
         } catch {
    -      case t: Throwable => logError("Exception in checking for event log updates", t)
    +      case t: Exception => logError("Exception in checking for event log updates", t)
    +    }
    +  }
    +
    +  /**
    +   *  Delete event logs from the log directory according to the clean policy defined by the user.
    +   */
    +  private def cleanLogs() = {
    +    try {
    +      val logStatus = fs.listStatus(new Path(resolvedLogDir))
    +      val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]()
    +      val maxAge = conf.getLong("spark.history.fs.maxAge.seconds",
    +        DEFAULT_SPARK_HISTORY_FS_MAXAGE_S) * 1000
    +
    +      val now = System.currentTimeMillis()
    +
    +      // Scan all logs from the log directory.
    +      // Only directories older than now maxAge milliseconds mill will be deleted
    +      logDirs.foreach { dir =>
    +        if (now - getModificationTime(dir) > maxAge) {
    +          fs.delete(dir.getPath, true)
    --- End diff --
    
    You forgot to handle exceptions here (see my previous coments on the subject).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2471#discussion_r18737466
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
    @@ -195,22 +241,68 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
               }
             }
     
    -        val newIterator = logInfos.iterator.buffered
    -        val oldIterator = applications.values.iterator.buffered
    -        while (newIterator.hasNext && oldIterator.hasNext) {
    -          if (newIterator.head.endTime > oldIterator.head.endTime) {
    -            addIfAbsent(newIterator.next)
    -          } else {
    -            addIfAbsent(oldIterator.next)
    +        applications.synchronized {
    +          val newIterator = logInfos.iterator.buffered
    +          val oldIterator = applications.values.iterator.buffered
    +          while (newIterator.hasNext && oldIterator.hasNext) {
    +            if (newIterator.head.endTime > oldIterator.head.endTime) {
    +              addIfAbsent(newIterator.next)
    +            } else {
    +              addIfAbsent(oldIterator.next)
    +            }
               }
    +          newIterator.foreach(addIfAbsent)
    +          oldIterator.foreach(addIfAbsent)
    +
    +          applications = newApps
             }
    -        newIterator.foreach(addIfAbsent)
    -        oldIterator.foreach(addIfAbsent)
    +      }
    +    } catch {
    +      case t: Throwable => logError("Exception in checking for event log updates", t)
    +    }
    +  }
    +
    +  /**
    +   *  Deleting apps if setting cleaner.
    +   */
    +  private def cleanLogs() = {
    +    lastLogCleanTimeMs = getMonotonicTimeMs()
    +    logDebug("Cleaning logs. Time is now %d.".format(lastLogCleanTimeMs))
    +    try {
    +      val logStatus = fs.listStatus(new Path(resolvedLogDir))
    +      val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]()
    +      val maxAge = conf.getLong("spark.history.fs.maxAge.seconds",
    +        DEFAULT_SPARK_HISTORY_FS_MAXAGE_S) * 1000
    +
    +      val now = System.currentTimeMillis()
    +      fs.synchronized {
    +        // scan all logs from the log directory.
    +        // Only directories older than this many seconds will be deleted .
    +        logDirs.foreach { dir =>
    +          // history file older than this many seconds will be deleted 
    +          // when the history cleaner runs.
    +          if (now - getModificationTime(dir) > maxAge) {
    +            fs.delete(dir.getPath, true)
    +          }
    +        }
    +      }
    +      
    +      val newApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()
    +      def addIfNotExpire(info: FsApplicationHistoryInfo) = {
    +        if(now - info.lastUpdated <= maxAge) {
    --- End diff --
    
    nit: `if (`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by mattf <gi...@git.apache.org>.
Github user mattf commented on the pull request:

    https://github.com/apache/spark/pull/2471#issuecomment-57676552
  
    > Really, this is not an expensive process that will bring down the HDFS server
    
    i'm not concerned about bringing down HDFS. the operation run from spark or a log archiving tool should be of equal load on HDFS.
    
    > Finally, Spark theoretically supports Windows
    
    i'm concered that spark becomes a repository of tools that are duplicated by one platform or another. i'm not much of a windows person, but i'd expect there are tools that fill the same function as cron and something-like-logrotate that are native to windows. those tools are likely familiar to windows admins just as cron&logrotate are for a linux admin.
    
    > AFAIK, logrotate doesn't work on HDFS.
    
    i don't think logrotate works on HDFS either


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2471#discussion_r18782362
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
    @@ -195,22 +241,68 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
               }
             }
     
    -        val newIterator = logInfos.iterator.buffered
    -        val oldIterator = applications.values.iterator.buffered
    -        while (newIterator.hasNext && oldIterator.hasNext) {
    -          if (newIterator.head.endTime > oldIterator.head.endTime) {
    -            addIfAbsent(newIterator.next)
    -          } else {
    -            addIfAbsent(oldIterator.next)
    +        applications.synchronized {
    --- End diff --
    
    You shouldn't ever need to synchronize on `applications` because it's a read-only variable. It's replaced atomically with a new list (which is why it's volatile) when there are changes. But synchronizing on it doesn't achieve anything.
    
    Your explanation doesn't cover a whole lot of other sources of races when two tasks are running concurrently and looking at the current status of the file system (and potentially modifying it). So yeah, having these tasks run single-threaded would be much more simple to reason about.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2471#discussion_r18934425
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
    @@ -46,8 +58,9 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
       private val fs = Utils.getHadoopFileSystem(resolvedLogDir,
         SparkHadoopUtil.get.newConfiguration(conf))
     
    -  // A timestamp of when the disk was last accessed to check for log updates
    -  private var lastLogCheckTimeMs = -1L
    +  // The schedule thread pool size must be one,otherwise it will have concurrent issues about fs
    --- End diff --
    
    nit: space after comma


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by mattf <gi...@git.apache.org>.
Github user mattf commented on the pull request:

    https://github.com/apache/spark/pull/2471#issuecomment-57629838
  
    > @mattf don't know what you mean by "functionality that is already provided by the system". I'm not aware of HDFS having any way to automatically do housekeeping of old files.
    
    a system approach means using something like logrotate or a cleaner process that's run from cron.
    
    such an approach is beneficial in a number of ways, including reducing the complexity of spark by not duplicating functionality that's already available in spark's environment - akin to using a standard library for i/o instead of interacting w/ devices directly. in this case the context for the environment is the system, where you'll find things like logrotate and cron readily available.
    
    as for rotating logs in hdfs - i wouldn't expect hdfs to provide such a feature, because the feature serves a specific use case on top of hdfs. some searching suggests that there are a few solutions available for doing rotation or pruning of files in hdfs and points out that rotating/pruning/cleaning/purging can be done remotely and independently from spark since hdfs is distributed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by suyanNone <gi...@git.apache.org>.
Github user suyanNone commented on the pull request:

    https://github.com/apache/spark/pull/2471#issuecomment-70964880
  
    Is thi patch ok to merge?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2471#discussion_r17943541
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
    @@ -34,10 +34,20 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
     
       private val NOT_STARTED = "<Not Started>"
     
    +  //one day
    +  private val DEFAULT_SPARK_HISTORY_FS_CLEANER_INTERVAL_S = 1 * 24 * 60 * 60
    --- End diff --
    
    a.k.a.
    
        Duration(1, TimeUnit.DAYS).toSeconds()


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2471#discussion_r18934497
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
    @@ -60,29 +73,23 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
         = new mutable.LinkedHashMap()
     
       /**
    -   * A background thread that periodically checks for event log updates on disk.
    -   *
    -   * If a log check is invoked manually in the middle of a period, this thread re-adjusts the
    -   * time at which it performs the next log check to maintain the same period as before.
    -   *
    -   * TODO: Add a mechanism to update manually.
    +   * A background thread that periodically do something about event log.
        */
    -  private val logCheckingThread = new Thread("LogCheckingThread") {
    -    override def run() = Utils.logUncaughtExceptions {
    -      while (true) {
    -        val now = getMonotonicTimeMs()
    -        if (now - lastLogCheckTimeMs > UPDATE_INTERVAL_MS) {
    -          Thread.sleep(UPDATE_INTERVAL_MS)
    -        } else {
    -          // If the user has manually checked for logs recently, wait until
    -          // UPDATE_INTERVAL_MS after the last check time
    -          Thread.sleep(lastLogCheckTimeMs + UPDATE_INTERVAL_MS - now)
    -        }
    -        checkForLogs()
    +  private def getThread(name: String, operateFun: () => Unit): Thread = {
    +    val thread = new Thread(name) {
    +      override def run() = Utils.logUncaughtExceptions {
    +        operateFun()
           }
         }
    +    thread
       }
     
    +  // A background thread that periodically checks for event log updates on disk.
    +  private val logCheckingThread = getThread("LogCheckingThread", checkForLogs)
    --- End diff --
    
    Do you need to keep references to this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2471#discussion_r17943751
  
    --- Diff: docs/monitoring.md ---
    @@ -135,6 +135,29 @@ follows:
           <code>spark.ui.view.acls</code> when the application was run will also have authorization
           to view that application. 
           If disabled, no access control checks are made. 
    +    </td>  
    +  </tr>   
    +  <tr>
    +    <td>spark.history.fs.cleaner.enable</td>
    +    <td>false</td>
    +    <td>
    +      Specifies whether job history cleaner should check for files to delete.
    --- End diff --
    
    "... whether the History Server should periodically clean up event logs from storage."


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2471#discussion_r18934480
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
    @@ -46,8 +58,9 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
       private val fs = Utils.getHadoopFileSystem(resolvedLogDir,
         SparkHadoopUtil.get.newConfiguration(conf))
     
    -  // A timestamp of when the disk was last accessed to check for log updates
    -  private var lastLogCheckTimeMs = -1L
    +  // The schedule thread pool size must be one,otherwise it will have concurrent issues about fs
    +  // and applications between check task and clean task..
    +  private val pool = Executors.newScheduledThreadPool(1)
    --- End diff --
    
    You should use a thread factory so that you can set the thread name and daemon status. See com.google.common.util.concurrent.ThreadFactoryBuilder.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by viper-kun <gi...@git.apache.org>.
Github user viper-kun commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2471#discussion_r18741084
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
    @@ -195,22 +241,68 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
               }
             }
     
    -        val newIterator = logInfos.iterator.buffered
    -        val oldIterator = applications.values.iterator.buffered
    -        while (newIterator.hasNext && oldIterator.hasNext) {
    -          if (newIterator.head.endTime > oldIterator.head.endTime) {
    -            addIfAbsent(newIterator.next)
    -          } else {
    -            addIfAbsent(oldIterator.next)
    +        applications.synchronized {
    +          val newIterator = logInfos.iterator.buffered
    +          val oldIterator = applications.values.iterator.buffered
    +          while (newIterator.hasNext && oldIterator.hasNext) {
    +            if (newIterator.head.endTime > oldIterator.head.endTime) {
    +              addIfAbsent(newIterator.next)
    +            } else {
    +              addIfAbsent(oldIterator.next)
    +            }
               }
    +          newIterator.foreach(addIfAbsent)
    +          oldIterator.foreach(addIfAbsent)
    +
    +          applications = newApps
             }
    -        newIterator.foreach(addIfAbsent)
    -        oldIterator.foreach(addIfAbsent)
    +      }
    +    } catch {
    +      case t: Throwable => logError("Exception in checking for event log updates", t)
    --- End diff --
    
    you means: don't catch Throwable? what should we do?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2471#discussion_r18737559
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
    @@ -195,22 +241,68 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
               }
             }
     
    -        val newIterator = logInfos.iterator.buffered
    -        val oldIterator = applications.values.iterator.buffered
    -        while (newIterator.hasNext && oldIterator.hasNext) {
    -          if (newIterator.head.endTime > oldIterator.head.endTime) {
    -            addIfAbsent(newIterator.next)
    -          } else {
    -            addIfAbsent(oldIterator.next)
    +        applications.synchronized {
    +          val newIterator = logInfos.iterator.buffered
    +          val oldIterator = applications.values.iterator.buffered
    +          while (newIterator.hasNext && oldIterator.hasNext) {
    +            if (newIterator.head.endTime > oldIterator.head.endTime) {
    +              addIfAbsent(newIterator.next)
    +            } else {
    +              addIfAbsent(oldIterator.next)
    +            }
               }
    +          newIterator.foreach(addIfAbsent)
    +          oldIterator.foreach(addIfAbsent)
    +
    +          applications = newApps
             }
    -        newIterator.foreach(addIfAbsent)
    -        oldIterator.foreach(addIfAbsent)
    +      }
    +    } catch {
    +      case t: Throwable => logError("Exception in checking for event log updates", t)
    +    }
    +  }
    +
    +  /**
    +   *  Deleting apps if setting cleaner.
    +   */
    +  private def cleanLogs() = {
    +    lastLogCleanTimeMs = getMonotonicTimeMs()
    +    logDebug("Cleaning logs. Time is now %d.".format(lastLogCleanTimeMs))
    +    try {
    +      val logStatus = fs.listStatus(new Path(resolvedLogDir))
    +      val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]()
    +      val maxAge = conf.getLong("spark.history.fs.maxAge.seconds",
    +        DEFAULT_SPARK_HISTORY_FS_MAXAGE_S) * 1000
    +
    +      val now = System.currentTimeMillis()
    +      fs.synchronized {
    +        // scan all logs from the log directory.
    +        // Only directories older than this many seconds will be deleted .
    +        logDirs.foreach { dir =>
    +          // history file older than this many seconds will be deleted 
    --- End diff --
    
    This comment says the same thing as the comment above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2471#discussion_r18737207
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
    @@ -28,15 +28,27 @@ import org.apache.spark.deploy.SparkHadoopUtil
     import org.apache.spark.scheduler._
     import org.apache.spark.ui.SparkUI
     import org.apache.spark.util.Utils
    +import java.util.concurrent.TimeUnit
    --- End diff --
    
    nit: should be up there with other java imports


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the pull request:

    https://github.com/apache/spark/pull/2471#issuecomment-63698395
  
    LGTM. Everybody else is kinda busy with releases so I doubt they'll look at this in the next several days...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by mattf <gi...@git.apache.org>.
Github user mattf commented on the pull request:

    https://github.com/apache/spark/pull/2471#issuecomment-58885115
  
    > @mattf I understand what you're trying to say, but think about it in context. As I said above, the "when to poll the file system" code is the most trivial part of this change. The only advantage of using cron for that is that you'd have more scheduling options - e.g., absolute times instead of a period.
    > 
    > To achieve that, you'd be considerably complicating everything else. You'd be creating a new command line tool in Spark, that needs to deal with command line arguments, be documented, and handle security settings (e.g. kerberos) - so it's more burden for everybody, maintaners of the code and admins alike.
    > 
    > And all that for a trivial, and I'd say, not really needed gain in functionality.
    
    @aw-altiscale pointed me to camus which has a nearly separable component: https://github.com/linkedin/camus/tree/master/camus-sweeper
    
    my objection to this is about the architecture and responsibilities of the spark components. i don't object to having the functionality.
    
    i think you should implement the ability to sweep/rotate/clean log files in hdfs, but not as part of a spark process.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2471#discussion_r18737282
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
    @@ -54,35 +66,57 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
       // is already known.
       private var lastModifiedTime = -1L
     
    +  // A timestamp of when the disk was last accessed to check for event log to delete
    +  private var lastLogCleanTimeMs = -1L
    +
       // Mapping of application IDs to their metadata, in descending end time order. Apps are inserted
       // into the map in order, so the LinkedHashMap maintains the correct ordering.
       @volatile private var applications: mutable.LinkedHashMap[String, FsApplicationHistoryInfo]
         = new mutable.LinkedHashMap()
     
       /**
    -   * A background thread that periodically checks for event log updates on disk.
    -   *
    -   * If a log check is invoked manually in the middle of a period, this thread re-adjusts the
    -   * time at which it performs the next log check to maintain the same period as before.
    +   * A background thread that periodically do something about event log.
        *
    -   * TODO: Add a mechanism to update manually.
    +   * If operateFun is invoked manually in the middle of a period, this thread re-adjusts the
    +   * time at which it does operateFun to maintain the same period as before.
        */
    -  private val logCheckingThread = new Thread("LogCheckingThread") {
    -    override def run() = Utils.logUncaughtExceptions {
    -      while (true) {
    -        val now = getMonotonicTimeMs()
    -        if (now - lastLogCheckTimeMs > UPDATE_INTERVAL_MS) {
    -          Thread.sleep(UPDATE_INTERVAL_MS)
    -        } else {
    -          // If the user has manually checked for logs recently, wait until
    -          // UPDATE_INTERVAL_MS after the last check time
    -          Thread.sleep(lastLogCheckTimeMs + UPDATE_INTERVAL_MS - now)
    +  private def getThread(
    --- End diff --
    
    I wonder if it isn't better to use a `ScheduledExecutorService` here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2471#discussion_r18934514
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
    @@ -97,9 +104,14 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
             "Logging directory specified is not a directory: %s".format(resolvedLogDir))
         }
     
    -    checkForLogs()
         logCheckingThread.setDaemon(true)
    -    logCheckingThread.start()
    +    pool.scheduleAtFixedRate(logCheckingThread, 0, UPDATE_INTERVAL_MS, TimeUnit.MILLISECONDS)
    +
    +    // Start cleaner thread if spark.history.fs.cleaner.enable is true
    --- End diff --
    
    Comment is sort of redundant... also, you're not starting a thread, you're scheduling a task.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2471#discussion_r17943689
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
    @@ -214,6 +252,32 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
         }
       }
     
    +  /**
    +   *  Deleting apps if setting cleaner.
    +   */
    +  private def cleanLogs() = {
    +    lastLogCleanTimeMs = System.currentTimeMillis()
    +    logDebug("Cleaning logs. Time is now %d.".format(lastLogCleanTimeMs))
    +    try {
    +      val logStatus = fs.listStatus(new Path(resolvedLogDir))
    +      val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]()
    +      val maxAge = conf.getLong("spark.history.fs.maxAge-s",
    +        DEFAULT_SPARK_HISTORY_FS_MAXAGE_S) * 1000
    +
    +      // scan all logs from the log directory.
    +      // Only directories older than this many seconds will be deleted .
    +      logDirs.foreach { dir =>
    +        //history file older than this many seconds will be deleted when the history cleaner runs.
    --- End diff --
    
    nit: comment formatting


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the pull request:

    https://github.com/apache/spark/pull/2471#issuecomment-71702876
  
    @viper-kun could you close this one in that case? thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2471#discussion_r17943561
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
    @@ -34,10 +34,20 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
     
       private val NOT_STARTED = "<Not Started>"
     
    +  //one day
    +  private val DEFAULT_SPARK_HISTORY_FS_CLEANER_INTERVAL_S = 1 * 24 * 60 * 60
    +
    +  //one week
    +  private val DEFAULT_SPARK_HISTORY_FS_MAXAGE_S = 7 * 24 * 60 * 60
    --- End diff --
    
    a.k.a.
    
        Duration(4, TimeUnit.DAYS).toSeconds()


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by viper-kun <gi...@git.apache.org>.
Github user viper-kun commented on the pull request:

    https://github.com/apache/spark/pull/2471#issuecomment-59164199
  
    @vanzin , is it ok to go?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2471#discussion_r18934463
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
    @@ -60,29 +73,23 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
         = new mutable.LinkedHashMap()
     
       /**
    -   * A background thread that periodically checks for event log updates on disk.
    -   *
    -   * If a log check is invoked manually in the middle of a period, this thread re-adjusts the
    -   * time at which it performs the next log check to maintain the same period as before.
    -   *
    -   * TODO: Add a mechanism to update manually.
    +   * A background thread that periodically do something about event log.
        */
    -  private val logCheckingThread = new Thread("LogCheckingThread") {
    -    override def run() = Utils.logUncaughtExceptions {
    -      while (true) {
    -        val now = getMonotonicTimeMs()
    -        if (now - lastLogCheckTimeMs > UPDATE_INTERVAL_MS) {
    -          Thread.sleep(UPDATE_INTERVAL_MS)
    -        } else {
    -          // If the user has manually checked for logs recently, wait until
    -          // UPDATE_INTERVAL_MS after the last check time
    -          Thread.sleep(lastLogCheckTimeMs + UPDATE_INTERVAL_MS - now)
    -        }
    -        checkForLogs()
    +  private def getThread(name: String, operateFun: () => Unit): Thread = {
    --- End diff --
    
    This should return a Runnable, not a Thread, since you're submitting them to the executor. Thread works but is overkill here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2471#discussion_r18737405
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
    @@ -195,22 +241,68 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
               }
             }
     
    -        val newIterator = logInfos.iterator.buffered
    -        val oldIterator = applications.values.iterator.buffered
    -        while (newIterator.hasNext && oldIterator.hasNext) {
    -          if (newIterator.head.endTime > oldIterator.head.endTime) {
    -            addIfAbsent(newIterator.next)
    -          } else {
    -            addIfAbsent(oldIterator.next)
    +        applications.synchronized {
    --- End diff --
    
    There isn't a need to synchronize on `applications` (here nor in the other method). The idea is that you create the new list and atomically replace the old one (note how it's `volatile`).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by viper-kun <gi...@git.apache.org>.
Github user viper-kun commented on the pull request:

    https://github.com/apache/spark/pull/2471#issuecomment-59648648
  
    @vanzin. is it ok to go?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2471#discussion_r18781725
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
    @@ -214,6 +224,43 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
         }
    --- End diff --
    
    Throables are "catastrophic" errors, you shouldn't catch them. (IIRC they're also used as control for some Scala idioms, which also means you shouldn't catch them.)
    
    Catch `Exception` instead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2471#discussion_r19179055
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
    @@ -210,7 +226,46 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
             applications = newApps
           }
         } catch {
    -      case t: Throwable => logError("Exception in checking for event log updates", t)
    +      case t: Exception => logError("Exception in checking for event log updates", t)
    +    }
    +  }
    +
    +  /**
    +   *  Delete event logs from the log directory according to the clean policy defined by the user.
    +   */
    +  private def cleanLogs() = {
    +    try {
    +      val logStatus = fs.listStatus(new Path(resolvedLogDir))
    +      val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]()
    +      val maxAge = conf.getLong("spark.history.fs.maxAge.seconds",
    +        DEFAULT_SPARK_HISTORY_FS_MAXAGE_S) * 1000
    +
    +      val now = System.currentTimeMillis()
    +      val newApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()
    +      def addIfNotExpire(info: FsApplicationHistoryInfo) = {
    +        if (now - info.lastUpdated <= maxAge) {
    +          newApps += (info.id -> info)
    +        }
    +      }
    +
    +      val oldIterator = applications.values.iterator.buffered
    +      oldIterator.foreach(addIfNotExpire)
    +
    +      applications = newApps
    +
    +      // Scan all logs from the log directory.
    +      // Only directories older than now maxAge milliseconds mill will be deleted
    +      logDirs.foreach { dir =>
    +        try{
    +          if (now - getModificationTime(dir) > maxAge) {
    +            fs.delete(dir.getPath, true)
    +          }
    +        } catch {
    +          case t: IOException => logError("IOException in cleaning logs", t)
    --- End diff --
    
    nit: add `$dir` to the log message, in case it does not show up in the exception.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2471#discussion_r18107160
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
    @@ -34,10 +34,20 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
     
       private val NOT_STARTED = "<Not Started>"
     
    +  //one day
    +  private val DEFAULT_SPARK_HISTORY_FS_CLEANER_INTERVAL_S = 1 * 24 * 60 * 60
    +
    +  //one week
    +  private val DEFAULT_SPARK_HISTORY_FS_MAXAGE_S = 7 * 24 * 60 * 60
    +
       // Interval between each check for event log updates
       private val UPDATE_INTERVAL_MS = conf.getInt("spark.history.fs.updateInterval",
    --- End diff --
    
    Maybe we should make the naming of this config consistent by calling this `spark.history.fs.update.interval.ms` (and deprecate the old one)? We don't have to do that in this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2471#discussion_r17943824
  
    --- Diff: docs/monitoring.md ---
    @@ -135,6 +135,29 @@ follows:
           <code>spark.ui.view.acls</code> when the application was run will also have authorization
           to view that application. 
           If disabled, no access control checks are made. 
    +    </td>  
    +  </tr>   
    +  <tr>
    +    <td>spark.history.fs.cleaner.enable</td>
    +    <td>false</td>
    +    <td>
    +      Specifies whether job history cleaner should check for files to delete.
    +    </td>
    +  </tr>
    +  <tr>
    +    <td>spark.history.fs.cleaner.interval-s</td>
    +    <td>86400</td>
    +    <td>
    +      How often the job history cleaner checks for files to delete, in seconds. Defaults to 864000 (one day).
    --- End diff --
    
    Hmmm. Wonder if we should add a version of `Utils.memoryStringToMb` for time (so that you could write `1d` instead of that big number).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by viper-kun <gi...@git.apache.org>.
Github user viper-kun commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2471#discussion_r18740169
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
    @@ -195,22 +241,68 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
               }
             }
     
    -        val newIterator = logInfos.iterator.buffered
    -        val oldIterator = applications.values.iterator.buffered
    -        while (newIterator.hasNext && oldIterator.hasNext) {
    -          if (newIterator.head.endTime > oldIterator.head.endTime) {
    -            addIfAbsent(newIterator.next)
    -          } else {
    -            addIfAbsent(oldIterator.next)
    +        applications.synchronized {
    +          val newIterator = logInfos.iterator.buffered
    +          val oldIterator = applications.values.iterator.buffered
    +          while (newIterator.hasNext && oldIterator.hasNext) {
    +            if (newIterator.head.endTime > oldIterator.head.endTime) {
    +              addIfAbsent(newIterator.next)
    +            } else {
    +              addIfAbsent(oldIterator.next)
    +            }
               }
    +          newIterator.foreach(addIfAbsent)
    +          oldIterator.foreach(addIfAbsent)
    +
    +          applications = newApps
             }
    -        newIterator.foreach(addIfAbsent)
    -        oldIterator.foreach(addIfAbsent)
    +      }
    +    } catch {
    +      case t: Throwable => logError("Exception in checking for event log updates", t)
    +    }
    +  }
    +
    +  /**
    +   *  Deleting apps if setting cleaner.
    +   */
    +  private def cleanLogs() = {
    +    lastLogCleanTimeMs = getMonotonicTimeMs()
    +    logDebug("Cleaning logs. Time is now %d.".format(lastLogCleanTimeMs))
    +    try {
    +      val logStatus = fs.listStatus(new Path(resolvedLogDir))
    +      val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]()
    +      val maxAge = conf.getLong("spark.history.fs.maxAge.seconds",
    +        DEFAULT_SPARK_HISTORY_FS_MAXAGE_S) * 1000
    +
    +      val now = System.currentTimeMillis()
    +      fs.synchronized {
    +        // scan all logs from the log directory.
    +        // Only directories older than this many seconds will be deleted .
    +        logDirs.foreach { dir =>
    +          // history file older than this many seconds will be deleted 
    +          // when the history cleaner runs.
    +          if (now - getModificationTime(dir) > maxAge) {
    +            fs.delete(dir.getPath, true)
    --- End diff --
    
    Can you tell me the detail reason that add try..catch into fs.delete?   i think the exception may be caught by try..catch(line 271). 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Periodic cleanup event logs

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2471#issuecomment-56261050
  
    Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by viper-kun <gi...@git.apache.org>.
Github user viper-kun commented on the pull request:

    https://github.com/apache/spark/pull/2471#issuecomment-57083376
  
    Thanks for your options. @vanzin @andrewor14 .i have changed code according your options.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2471#discussion_r20600235
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
    @@ -46,8 +72,10 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
       private val fs = Utils.getHadoopFileSystem(resolvedLogDir,
         SparkHadoopUtil.get.newConfiguration(conf))
     
    -  // A timestamp of when the disk was last accessed to check for log updates
    -  private var lastLogCheckTimeMs = -1L
    +  // The schedule thread pool size must be one, otherwise it will have concurrent issues about fs
    +  // and applications between check task and clean task..
    +  private val pool = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
    --- End diff --
    
    minor: you could use `Utils.namedThreadFactory()` here (just noticed that method the other day).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2471#discussion_r18934542
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
    @@ -210,7 +220,44 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
             applications = newApps
           }
         } catch {
    -      case t: Throwable => logError("Exception in checking for event log updates", t)
    +      case t: Exception => logError("Exception in checking for event log updates", t)
    +    }
    +  }
    +
    +  /**
    +   *  Delete event logs from the log directory according to the clean policy defined by the user.
    +   */
    +  private def cleanLogs() = {
    +    try {
    +      val logStatus = fs.listStatus(new Path(resolvedLogDir))
    +      val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]()
    +      val maxAge = conf.getLong("spark.history.fs.maxAge.seconds",
    +        DEFAULT_SPARK_HISTORY_FS_MAXAGE_S) * 1000
    +
    +      val now = System.currentTimeMillis()
    +
    +      // Scan all logs from the log directory.
    +      // Only directories older than now maxAge milliseconds mill will be deleted
    +      logDirs.foreach { dir =>
    +        if (now - getModificationTime(dir) > maxAge) {
    +          fs.delete(dir.getPath, true)
    +        }
    +      }
    +
    +      val newApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()
    +      def addIfNotExpire(info: FsApplicationHistoryInfo) = {
    +        if (now - info.lastUpdated <= maxAge) {
    +          newApps += (info.id -> info)
    +        }
    +      }
    +
    +      val oldIterator = applications.values.iterator.buffered
    +      oldIterator.foreach(addIfNotExpire)
    +
    +      applications = newApps
    --- End diff --
    
    I think you should update the app list before you clean up the file system. That way, you won't be serving links to dead logs (even if only for a short period).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by viper-kun <gi...@git.apache.org>.
Github user viper-kun commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2471#discussion_r18763243
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
    @@ -214,6 +224,43 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
         }
    --- End diff --
    
    @vanzin  sorry, i do not what you means. do you means that do not throw Throwable?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the pull request:

    https://github.com/apache/spark/pull/2471#issuecomment-58924618
  
    > i think you should implement the ability to sweep/rotate/clean log files in hdfs
    
    Well, good luck with adding something like that to HDFS... that is not the responsibility of filesystems. It's great that there might be tools out there that does it, but, personally, you still haven't convinced me that a directory that is completely managed within Spark (the event log directory) shouldn't also be cleaned up by Spark.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the pull request:

    https://github.com/apache/spark/pull/2471#issuecomment-58934284
  
    inotify is certainly interesting for the HistoryServer, but kinda orthogonal to this. The other bug is more related to this one, and they're discussing exactly what I mentioned: that deleting things is not the responsibility of the fs. The applications that manage the files are the ones who have the most context about when it's safe to delete things and how to do so.
    
    This case is a simplified one in that "when" doesn't matter much, as long as the list of things that exist is kept up-to-date in the application. But the point remains that the fs seems like the wrong place for that; and having a sweeper-like feature in HDFS without inotify would be kinda bad for applications like the HistoryServer.
    
    But I digress. Even if HDFS-6382 is implemented, it would only come in a new version of HDFS, and Spark needs to work with existing versions...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2471#discussion_r18737461
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
    @@ -195,22 +241,68 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
               }
             }
     
    -        val newIterator = logInfos.iterator.buffered
    -        val oldIterator = applications.values.iterator.buffered
    -        while (newIterator.hasNext && oldIterator.hasNext) {
    -          if (newIterator.head.endTime > oldIterator.head.endTime) {
    -            addIfAbsent(newIterator.next)
    -          } else {
    -            addIfAbsent(oldIterator.next)
    +        applications.synchronized {
    +          val newIterator = logInfos.iterator.buffered
    +          val oldIterator = applications.values.iterator.buffered
    +          while (newIterator.hasNext && oldIterator.hasNext) {
    +            if (newIterator.head.endTime > oldIterator.head.endTime) {
    +              addIfAbsent(newIterator.next)
    +            } else {
    +              addIfAbsent(oldIterator.next)
    +            }
               }
    +          newIterator.foreach(addIfAbsent)
    +          oldIterator.foreach(addIfAbsent)
    +
    +          applications = newApps
             }
    -        newIterator.foreach(addIfAbsent)
    -        oldIterator.foreach(addIfAbsent)
    +      }
    +    } catch {
    +      case t: Throwable => logError("Exception in checking for event log updates", t)
    +    }
    +  }
    +
    +  /**
    +   *  Deleting apps if setting cleaner.
    +   */
    +  private def cleanLogs() = {
    +    lastLogCleanTimeMs = getMonotonicTimeMs()
    +    logDebug("Cleaning logs. Time is now %d.".format(lastLogCleanTimeMs))
    +    try {
    +      val logStatus = fs.listStatus(new Path(resolvedLogDir))
    +      val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]()
    +      val maxAge = conf.getLong("spark.history.fs.maxAge.seconds",
    +        DEFAULT_SPARK_HISTORY_FS_MAXAGE_S) * 1000
    +
    +      val now = System.currentTimeMillis()
    +      fs.synchronized {
    +        // scan all logs from the log directory.
    +        // Only directories older than this many seconds will be deleted .
    --- End diff --
    
    nit: what does `this` refer to? Seems like the comment refers to L274.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2471#discussion_r18737552
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
    @@ -195,22 +241,68 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
               }
             }
     
    -        val newIterator = logInfos.iterator.buffered
    -        val oldIterator = applications.values.iterator.buffered
    -        while (newIterator.hasNext && oldIterator.hasNext) {
    -          if (newIterator.head.endTime > oldIterator.head.endTime) {
    -            addIfAbsent(newIterator.next)
    -          } else {
    -            addIfAbsent(oldIterator.next)
    +        applications.synchronized {
    +          val newIterator = logInfos.iterator.buffered
    +          val oldIterator = applications.values.iterator.buffered
    +          while (newIterator.hasNext && oldIterator.hasNext) {
    +            if (newIterator.head.endTime > oldIterator.head.endTime) {
    +              addIfAbsent(newIterator.next)
    +            } else {
    +              addIfAbsent(oldIterator.next)
    +            }
               }
    +          newIterator.foreach(addIfAbsent)
    +          oldIterator.foreach(addIfAbsent)
    +
    +          applications = newApps
             }
    -        newIterator.foreach(addIfAbsent)
    -        oldIterator.foreach(addIfAbsent)
    +      }
    +    } catch {
    +      case t: Throwable => logError("Exception in checking for event log updates", t)
    +    }
    +  }
    +
    +  /**
    +   *  Deleting apps if setting cleaner.
    +   */
    +  private def cleanLogs() = {
    +    lastLogCleanTimeMs = getMonotonicTimeMs()
    +    logDebug("Cleaning logs. Time is now %d.".format(lastLogCleanTimeMs))
    +    try {
    +      val logStatus = fs.listStatus(new Path(resolvedLogDir))
    +      val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]()
    +      val maxAge = conf.getLong("spark.history.fs.maxAge.seconds",
    +        DEFAULT_SPARK_HISTORY_FS_MAXAGE_S) * 1000
    +
    +      val now = System.currentTimeMillis()
    +      fs.synchronized {
    +        // scan all logs from the log directory.
    +        // Only directories older than this many seconds will be deleted .
    +        logDirs.foreach { dir =>
    +          // history file older than this many seconds will be deleted 
    +          // when the history cleaner runs.
    +          if (now - getModificationTime(dir) > maxAge) {
    +            fs.delete(dir.getPath, true)
    --- End diff --
    
    I might have suggested this before... but I think there should be a `try..catch` here so that you try other directories if there is an issue deleting one (e.g. if it was created with the wrong permissions).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2471#discussion_r19179186
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
    @@ -29,14 +31,36 @@ import org.apache.spark.scheduler._
     import org.apache.spark.ui.SparkUI
     import org.apache.spark.util.Utils
     
    +import com.google.common.util.concurrent.ThreadFactoryBuilder
    +
     private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider
       with Logging {
     
       private val NOT_STARTED = "<Not Started>"
     
    +  // One day
    +  private val DEFAULT_SPARK_HISTORY_FS_CLEANER_INTERVAL_S = Duration(1, TimeUnit.DAYS).toSeconds
    +
    +  // One week
    +  private val DEFAULT_SPARK_HISTORY_FS_MAXAGE_S = Duration(7, TimeUnit.DAYS).toSeconds
    +
    +  private def warnUpdateInterval(value: String): String = {
    +    logWarning("Using spark.history.fs.updateInterval to set interval " +
    +      "between each check for event log updates is deprecated, " +
    +      "please use spark.history.fs.update.interval.seconds instead.")
    +    value
    +  }
    +
       // Interval between each check for event log updates
    -  private val UPDATE_INTERVAL_MS = conf.getInt("spark.history.fs.updateInterval",
    -    conf.getInt("spark.history.updateInterval", 10)) * 1000
    +  private val UPDATE_INTERVAL_MS = conf.getOption("spark.history.fs.update.interval.seconds")
    +    .orElse(conf.getOption("spark.history.fs.updateInterval").map(warnUpdateInterval))
    +    .orElse(conf.getOption("spark.history.updateInterval"))
    --- End diff --
    
    nit: could warn here too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by suyanNone <gi...@git.apache.org>.
Github user suyanNone commented on the pull request:

    https://github.com/apache/spark/pull/2471#issuecomment-71309691
  
     @vanzin  = =! I got it,  sigh~


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2471#discussion_r18106938
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
    @@ -214,6 +252,32 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
         }
       }
     
    +  /**
    +   *  Deleting apps if setting cleaner.
    +   */
    +  private def cleanLogs() = {
    --- End diff --
    
    cleanLogs(): Unit =


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2471#discussion_r17943624
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
    @@ -100,6 +132,12 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
         checkForLogs()
         logCheckingThread.setDaemon(true)
         logCheckingThread.start()
    +
    +    //start cleaner thread if spark.history.fs.cleaner.enable is true
    --- End diff --
    
    scalastyle would probably complain about this, but in any case: needs a space after `//`, should start with capital letter.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2471#discussion_r17943630
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
    @@ -100,6 +132,12 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
         checkForLogs()
         logCheckingThread.setDaemon(true)
         logCheckingThread.start()
    +
    +    //start cleaner thread if spark.history.fs.cleaner.enable is true
    +    if(conf.getBoolean("spark.history.fs.cleaner.enable", false)) {
    --- End diff --
    
    nit: `if (`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by mattf <gi...@git.apache.org>.
Github user mattf commented on the pull request:

    https://github.com/apache/spark/pull/2471#issuecomment-59191797
  
    > Well, good luck with adding something like that to HDFS... that is not the responsibility of filesystems.
    
    just so we're on the same page, i'm not advocating adding this functionality to HDFS. i believe it should be separate functionality that doesn't live in a spark process, as it's an operational activity.
    
    
    
    > a directory that is completely managed within Spark (the event log directory) shouldn't also be cleaned up by Spark
    
    we can agree to disagree. you can view a trace log as allocating some amount of disk space that you then have to manage, similar to memory allocations in a program. however, doing that management is more involved than periodically rotating.
    
    if you're in nyc for strata we should grab a beer and debate the finer points of resource management.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2471#discussion_r18782185
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
    @@ -195,22 +241,68 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
               }
             }
     
    -        val newIterator = logInfos.iterator.buffered
    -        val oldIterator = applications.values.iterator.buffered
    -        while (newIterator.hasNext && oldIterator.hasNext) {
    -          if (newIterator.head.endTime > oldIterator.head.endTime) {
    -            addIfAbsent(newIterator.next)
    -          } else {
    -            addIfAbsent(oldIterator.next)
    +        applications.synchronized {
    +          val newIterator = logInfos.iterator.buffered
    +          val oldIterator = applications.values.iterator.buffered
    +          while (newIterator.hasNext && oldIterator.hasNext) {
    +            if (newIterator.head.endTime > oldIterator.head.endTime) {
    +              addIfAbsent(newIterator.next)
    +            } else {
    +              addIfAbsent(oldIterator.next)
    +            }
               }
    +          newIterator.foreach(addIfAbsent)
    +          oldIterator.foreach(addIfAbsent)
    +
    +          applications = newApps
             }
    -        newIterator.foreach(addIfAbsent)
    -        oldIterator.foreach(addIfAbsent)
    +      }
    +    } catch {
    +      case t: Throwable => logError("Exception in checking for event log updates", t)
    +    }
    +  }
    +
    +  /**
    +   *  Deleting apps if setting cleaner.
    +   */
    +  private def cleanLogs() = {
    +    lastLogCleanTimeMs = getMonotonicTimeMs()
    +    logDebug("Cleaning logs. Time is now %d.".format(lastLogCleanTimeMs))
    +    try {
    +      val logStatus = fs.listStatus(new Path(resolvedLogDir))
    +      val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]()
    +      val maxAge = conf.getLong("spark.history.fs.maxAge.seconds",
    +        DEFAULT_SPARK_HISTORY_FS_MAXAGE_S) * 1000
    +
    +      val now = System.currentTimeMillis()
    +      fs.synchronized {
    +        // scan all logs from the log directory.
    +        // Only directories older than this many seconds will be deleted .
    +        logDirs.foreach { dir =>
    +          // history file older than this many seconds will be deleted 
    +          // when the history cleaner runs.
    +          if (now - getModificationTime(dir) > maxAge) {
    +            fs.delete(dir.getPath, true)
    --- End diff --
    
    Because the existing `try..catch` means that if you fail to delete a directory, you'll stop trying to delete others. So if a directory in the middle of the list has wrong permissions, you'll never clean up any directory that is more recent than it is (well, depending on the ordering HDFS returns the file list).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the pull request:

    https://github.com/apache/spark/pull/2471#issuecomment-57678742
  
    > i don't think logrotate works on HDFS either
    
    Then what's your argument? Which is the system tool that this code is replacing?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org