You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2015/05/16 13:50:22 UTC

spark git commit: [SPARK-6197][CORE] handle json exception when hisotry file not finished writing

Repository: spark
Updated Branches:
  refs/heads/branch-1.3 d618df273 -> 91442fdfc


[SPARK-6197][CORE] handle json exception when hisotry file not finished writing

For details, please refer to [SPARK-6197](https://issues.apache.org/jira/browse/SPARK-6197)

Author: Zhang, Liye <li...@intel.com>

Closes #4927 from liyezhang556520/jsonParseError and squashes the following commits:

5cbdc82 [Zhang, Liye] without unnecessary wrap
2b48831 [Zhang, Liye] small changes with sean owen's comments
2973024 [Zhang, Liye] handle json exception when file not finished writing


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/91442fdf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/91442fdf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/91442fdf

Branch: refs/heads/branch-1.3
Commit: 91442fdfc76b192e9f4347ef3125be3461e49048
Parents: d618df2
Author: Zhang, Liye <li...@intel.com>
Authored: Fri Mar 13 13:59:54 2015 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Sat May 16 12:49:55 2015 +0100

----------------------------------------------------------------------
 .../org/apache/spark/deploy/master/Master.scala |  3 ++-
 .../spark/scheduler/ReplayListenerBus.scala     | 25 ++++++++++++++++----
 2 files changed, 23 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/91442fdf/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index ef6181e..9e508f6 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -760,8 +760,9 @@ private[spark] class Master(
       val replayBus = new ReplayListenerBus()
       val ui = SparkUI.createHistoryUI(new SparkConf, replayBus, new SecurityManager(conf),
         appName + " (completed)", HistoryServer.UI_PATH_PREFIX + s"/${app.id}")
+      val maybeTruncated = eventLogFile.endsWith(EventLoggingListener.IN_PROGRESS)
       try {
-        replayBus.replay(logInput, eventLogFile)
+        replayBus.replay(logInput, eventLogFile, maybeTruncated)
       } finally {
         logInput.close()
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/91442fdf/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
index 95273c7..86f357a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
@@ -21,6 +21,7 @@ import java.io.{InputStream, IOException}
 
 import scala.io.Source
 
+import com.fasterxml.jackson.core.JsonParseException
 import org.json4s.jackson.JsonMethods._
 
 import org.apache.spark.Logging
@@ -40,15 +41,31 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging {
    *
    * @param logData Stream containing event log data.
    * @param sourceName Filename (or other source identifier) from whence @logData is being read
+   * @param maybeTruncated Indicate whether log file might be truncated (some abnormal situations 
+   *        encountered, log file might not finished writing) or not
    */
-  def replay(logData: InputStream, sourceName: String): Unit = {
+  def replay(
+      logData: InputStream,
+      sourceName: String,
+      maybeTruncated: Boolean = false): Unit = {
     var currentLine: String = null
     var lineNumber: Int = 1
     try {
       val lines = Source.fromInputStream(logData).getLines()
-      lines.foreach { line =>
-        currentLine = line
-        postToAll(JsonProtocol.sparkEventFromJson(parse(line)))
+      while (lines.hasNext) {
+        currentLine = lines.next()
+        try {
+          postToAll(JsonProtocol.sparkEventFromJson(parse(currentLine)))
+        } catch {
+          case jpe: JsonParseException =>
+            // We can only ignore exception from last line of the file that might be truncated
+            if (!maybeTruncated || lines.hasNext) {
+              throw jpe
+            } else {
+              logWarning(s"Got JsonParseException from log file $sourceName" + 
+                s" at line $lineNumber, the file might not have finished writing cleanly.")
+            }
+        }
         lineNumber += 1
       }
     } catch {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org