You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/11/24 14:07:48 UTC

spark git commit: [SPARK-22559][CORE] history server: handle exception on opening corrupted listing.ldb

Repository: spark
Updated Branches:
  refs/heads/master 554adc77d -> 449e26ecd


[SPARK-22559][CORE] history server: handle exception on opening corrupted listing.ldb

## What changes were proposed in this pull request?
Currently history server v2 failed to start if `listing.ldb` is corrupted.
This patch get rid of the corrupted `listing.ldb` and re-create it.
The exception handling follows [opening disk store for app](https://github.com/apache/spark/blob/0ffa7c488fa8156e2a1aa282e60b7c36b86d8af8/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala#L307)
## How was this patch tested?
manual test

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Wang Gengliang <lt...@gmail.com>

Closes #19786 from gengliangwang/listingException.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/449e26ec
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/449e26ec
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/449e26ec

Branch: refs/heads/master
Commit: 449e26ecdc891039198c26ece99454a2e76d5455
Parents: 554adc7
Author: Wang Gengliang <lt...@gmail.com>
Authored: Fri Nov 24 15:07:43 2017 +0100
Committer: Wenchen Fan <we...@databricks.com>
Committed: Fri Nov 24 15:07:43 2017 +0100

----------------------------------------------------------------------
 .../apache/spark/deploy/history/FsHistoryProvider.scala | 12 ++++++++----
 1 file changed, 8 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/449e26ec/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 25f82b5..69ccde3 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -34,10 +34,10 @@ import org.apache.hadoop.fs.permission.FsAction
 import org.apache.hadoop.hdfs.DistributedFileSystem
 import org.apache.hadoop.hdfs.protocol.HdfsConstants
 import org.apache.hadoop.security.AccessControlException
+import org.fusesource.leveldbjni.internal.NativeDB
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkException}
 import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.deploy.history.config._
 import org.apache.spark.internal.Logging
 import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.ReplayListenerBus._
@@ -132,7 +132,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
       AppStatusStore.CURRENT_VERSION, logDir.toString())
 
     try {
-      open(new File(path, "listing.ldb"), metadata)
+      open(dbPath, metadata)
     } catch {
       // If there's an error, remove the listing database and any existing UI database
       // from the store directory, since it's extremely likely that they'll all contain
@@ -140,7 +140,12 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
       case _: UnsupportedStoreVersionException | _: MetadataMismatchException =>
         logInfo("Detected incompatible DB versions, deleting...")
         path.listFiles().foreach(Utils.deleteRecursively)
-        open(new File(path, "listing.ldb"), metadata)
+        open(dbPath, metadata)
+      case dbExc: NativeDB.DBException =>
+        // Get rid of the corrupted listing.ldb and re-create it.
+        logWarning(s"Failed to load disk store $dbPath :", dbExc)
+        Utils.deleteRecursively(dbPath)
+        open(dbPath, metadata)
     }
   }.getOrElse(new InMemoryStore())
 
@@ -568,7 +573,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
     }
 
     val logPath = fileStatus.getPath()
-    logInfo(s"Replaying log path: $logPath")
 
     val bus = new ReplayListenerBus()
     val listener = new AppListingListener(fileStatus, clock)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org