You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/11/24 14:07:48 UTC
spark git commit: [SPARK-22559][CORE] history server: handle
exception on opening corrupted listing.ldb
Repository: spark
Updated Branches:
refs/heads/master 554adc77d -> 449e26ecd
[SPARK-22559][CORE] history server: handle exception on opening corrupted listing.ldb
## What changes were proposed in this pull request?
Currently history server v2 failed to start if `listing.ldb` is corrupted.
This patch get rid of the corrupted `listing.ldb` and re-create it.
The exception handling follows [opening disk store for app](https://github.com/apache/spark/blob/0ffa7c488fa8156e2a1aa282e60b7c36b86d8af8/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala#L307)
## How was this patch tested?
manual test
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: Wang Gengliang <lt...@gmail.com>
Closes #19786 from gengliangwang/listingException.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/449e26ec
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/449e26ec
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/449e26ec
Branch: refs/heads/master
Commit: 449e26ecdc891039198c26ece99454a2e76d5455
Parents: 554adc7
Author: Wang Gengliang <lt...@gmail.com>
Authored: Fri Nov 24 15:07:43 2017 +0100
Committer: Wenchen Fan <we...@databricks.com>
Committed: Fri Nov 24 15:07:43 2017 +0100
----------------------------------------------------------------------
.../apache/spark/deploy/history/FsHistoryProvider.scala | 12 ++++++++----
1 file changed, 8 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/449e26ec/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 25f82b5..69ccde3 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -34,10 +34,10 @@ import org.apache.hadoop.fs.permission.FsAction
import org.apache.hadoop.hdfs.DistributedFileSystem
import org.apache.hadoop.hdfs.protocol.HdfsConstants
import org.apache.hadoop.security.AccessControlException
+import org.fusesource.leveldbjni.internal.NativeDB
import org.apache.spark.{SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.deploy.history.config._
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.ReplayListenerBus._
@@ -132,7 +132,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
AppStatusStore.CURRENT_VERSION, logDir.toString())
try {
- open(new File(path, "listing.ldb"), metadata)
+ open(dbPath, metadata)
} catch {
// If there's an error, remove the listing database and any existing UI database
// from the store directory, since it's extremely likely that they'll all contain
@@ -140,7 +140,12 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
case _: UnsupportedStoreVersionException | _: MetadataMismatchException =>
logInfo("Detected incompatible DB versions, deleting...")
path.listFiles().foreach(Utils.deleteRecursively)
- open(new File(path, "listing.ldb"), metadata)
+ open(dbPath, metadata)
+ case dbExc: NativeDB.DBException =>
+ // Get rid of the corrupted listing.ldb and re-create it.
+ logWarning(s"Failed to load disk store $dbPath :", dbExc)
+ Utils.deleteRecursively(dbPath)
+ open(dbPath, metadata)
}
}.getOrElse(new InMemoryStore())
@@ -568,7 +573,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
val logPath = fileStatus.getPath()
- logInfo(s"Replaying log path: $logPath")
val bus = new ReplayListenerBus()
val listener = new AppListingListener(fileStatus, clock)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org