You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2015/05/16 13:50:22 UTC
spark git commit: [SPARK-6197][CORE] handle json exception when
hisotry file not finished writing
Repository: spark
Updated Branches:
refs/heads/branch-1.3 d618df273 -> 91442fdfc
[SPARK-6197][CORE] handle json exception when hisotry file not finished writing
For details, please refer to [SPARK-6197](https://issues.apache.org/jira/browse/SPARK-6197)
Author: Zhang, Liye <li...@intel.com>
Closes #4927 from liyezhang556520/jsonParseError and squashes the following commits:
5cbdc82 [Zhang, Liye] without unnecessary wrap
2b48831 [Zhang, Liye] small changes with sean owen's comments
2973024 [Zhang, Liye] handle json exception when file not finished writing
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/91442fdf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/91442fdf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/91442fdf
Branch: refs/heads/branch-1.3
Commit: 91442fdfc76b192e9f4347ef3125be3461e49048
Parents: d618df2
Author: Zhang, Liye <li...@intel.com>
Authored: Fri Mar 13 13:59:54 2015 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Sat May 16 12:49:55 2015 +0100
----------------------------------------------------------------------
.../org/apache/spark/deploy/master/Master.scala | 3 ++-
.../spark/scheduler/ReplayListenerBus.scala | 25 ++++++++++++++++----
2 files changed, 23 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/91442fdf/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index ef6181e..9e508f6 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -760,8 +760,9 @@ private[spark] class Master(
val replayBus = new ReplayListenerBus()
val ui = SparkUI.createHistoryUI(new SparkConf, replayBus, new SecurityManager(conf),
appName + " (completed)", HistoryServer.UI_PATH_PREFIX + s"/${app.id}")
+ val maybeTruncated = eventLogFile.endsWith(EventLoggingListener.IN_PROGRESS)
try {
- replayBus.replay(logInput, eventLogFile)
+ replayBus.replay(logInput, eventLogFile, maybeTruncated)
} finally {
logInput.close()
}
http://git-wip-us.apache.org/repos/asf/spark/blob/91442fdf/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
index 95273c7..86f357a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
@@ -21,6 +21,7 @@ import java.io.{InputStream, IOException}
import scala.io.Source
+import com.fasterxml.jackson.core.JsonParseException
import org.json4s.jackson.JsonMethods._
import org.apache.spark.Logging
@@ -40,15 +41,31 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging {
*
* @param logData Stream containing event log data.
* @param sourceName Filename (or other source identifier) from whence @logData is being read
+ * @param maybeTruncated Indicate whether log file might be truncated (some abnormal situations
+ * encountered, log file might not finished writing) or not
*/
- def replay(logData: InputStream, sourceName: String): Unit = {
+ def replay(
+ logData: InputStream,
+ sourceName: String,
+ maybeTruncated: Boolean = false): Unit = {
var currentLine: String = null
var lineNumber: Int = 1
try {
val lines = Source.fromInputStream(logData).getLines()
- lines.foreach { line =>
- currentLine = line
- postToAll(JsonProtocol.sparkEventFromJson(parse(line)))
+ while (lines.hasNext) {
+ currentLine = lines.next()
+ try {
+ postToAll(JsonProtocol.sparkEventFromJson(parse(currentLine)))
+ } catch {
+ case jpe: JsonParseException =>
+ // We can only ignore exception from last line of the file that might be truncated
+ if (!maybeTruncated || lines.hasNext) {
+ throw jpe
+ } else {
+ logWarning(s"Got JsonParseException from log file $sourceName" +
+ s" at line $lineNumber, the file might not have finished writing cleanly.")
+ }
+ }
lineNumber += 1
}
} catch {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org