You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2016/12/01 00:18:29 UTC

spark git commit: [SPARK-18655][SS] Ignore Structured Streaming 2.0.2 logs in history server

Repository: spark
Updated Branches:
  refs/heads/master 93e9d880b -> c4979f6ea


[SPARK-18655][SS] Ignore Structured Streaming 2.0.2 logs in history server

## What changes were proposed in this pull request?

As `queryStatus` in StreamingQueryListener events was removed in #15954, parsing 2.0.2 structured streaming logs will throw the following errror:

```
[info]   com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "queryStatus" (class org.apache.spark.sql.streaming.StreamingQueryListener$QueryTerminatedEvent), not marked as ignorable (2 known properties: "id", "exception"])
[info]  at [Source: {"Event":"org.apache.spark.sql.streaming.StreamingQueryListener$QueryTerminatedEvent","queryStatus":{"name":"query-1","id":1,"timestamp":1480491532753,"inputRate":0.0,"processingRate":0.0,"latency":null,"sourceStatuses":[{"description":"FileStreamSource[file:/Users/zsx/stream]","offsetDesc":"#0","inputRate":0.0,"processingRate":0.0,"triggerDetails":{"latency.getOffset.source":"1","triggerId":"1"}}],"sinkStatus":{"description":"FileSink[/Users/zsx/stream2]","offsetDesc":"[#0]"},"triggerDetails":{}},"exception":null}; line: 1, column: 521] (through reference chain: org.apache.spark.sql.streaming.QueryTerminatedEvent["queryStatus"])
[info]   at com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:51)
[info]   at com.fasterxml.jackson.databind.DeserializationContext.reportUnknownProperty(DeserializationContext.java:839)
[info]   at com.fasterxml.jackson.databind.deser.std.StdDeserializer.handleUnknownProperty(StdDeserializer.java:1045)
[info]   at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperty(BeanDeserializerBase.java:1352)
[info]   at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperties(BeanDeserializerBase.java:1306)
[info]   at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:453)
[info]   at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1099)
...
```

This PR just ignores such errors and adds a test to make sure we can read 2.0.2 logs.

## How was this patch tested?

`query-event-logs-version-2.0.2.txt` has all types of events generated by Structured Streaming in Spark 2.0.2. `testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2.0.2")` verified we can load them without any error.

Author: Shixiong Zhu <sh...@databricks.com>

Closes #16085 from zsxwing/SPARK-18655.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c4979f6e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c4979f6e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c4979f6e

Branch: refs/heads/master
Commit: c4979f6ea8ed44fd87ded3133efa6df39d4842c3
Parents: 93e9d88
Author: Shixiong Zhu <sh...@databricks.com>
Authored: Wed Nov 30 16:18:53 2016 -0800
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Wed Nov 30 16:18:53 2016 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/scheduler/ReplayListenerBus.scala | 7 +++++++
 .../structured-streaming/query-event-logs-version-2.0.2.txt  | 5 +++++
 .../spark/sql/streaming/StreamingQueryListenerSuite.scala    | 8 ++++++++
 3 files changed, 20 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c4979f6e/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
index 0bd5a6b..08e05ae 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
@@ -22,6 +22,7 @@ import java.io.{InputStream, IOException}
 import scala.io.Source
 
 import com.fasterxml.jackson.core.JsonParseException
+import com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException
 import org.json4s.jackson.JsonMethods._
 
 import org.apache.spark.internal.Logging
@@ -87,6 +88,12 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging {
             // Ignore events generated by Structured Streaming in Spark 2.0.0 and 2.0.1.
             // It's safe since no place uses them.
             logWarning(s"Dropped incompatible Structured Streaming log: $currentLine")
+          case e: UnrecognizedPropertyException if e.getMessage != null && e.getMessage.startsWith(
+            "Unrecognized field \"queryStatus\" " +
+              "(class org.apache.spark.sql.streaming.StreamingQueryListener$") =>
+            // Ignore events generated by Structured Streaming in Spark 2.0.2
+            // It's safe since no place uses them.
+            logWarning(s"Dropped incompatible Structured Streaming log: $currentLine")
           case jpe: JsonParseException =>
             // We can only ignore exception from last line of the file that might be truncated
             // the last entry may not be the very last line in the event log, but we treat it

http://git-wip-us.apache.org/repos/asf/spark/blob/c4979f6e/sql/core/src/test/resources/structured-streaming/query-event-logs-version-2.0.2.txt
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/structured-streaming/query-event-logs-version-2.0.2.txt b/sql/core/src/test/resources/structured-streaming/query-event-logs-version-2.0.2.txt
new file mode 100644
index 0000000..57c44c8
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/query-event-logs-version-2.0.2.txt
@@ -0,0 +1,5 @@
+{"Event":"org.apache.spark.sql.streaming.StreamingQueryListener$QueryStartedEvent","queryStatus":{"name":"query-1","id":1,"timestamp":1480491481350,"inputRate":0.0,"processingRate":0.0,"latency":null,"sourceStatuses":[{"description":"FileStreamSource[file:/Users/zsx/stream]","offsetDesc":"-","inputRate":0.0,"processingRate":0.0,"triggerDetails":{}}],"sinkStatus":{"description":"FileSink[/Users/zsx/stream2]","offsetDesc":"[-]"},"triggerDetails":{}}}
+{"Event":"org.apache.spark.sql.streaming.StreamingQueryListener$QueryProgressEvent","queryStatus":{"name":"query-1","id":1,"timestamp":1480491493386,"inputRate":83.33333333333333,"processingRate":0.5773672055427251,"latency":1738.0,"sourceStatuses":[{"description":"FileStreamSource[file:/Users/zsx/stream]","offsetDesc":"#0","inputRate":83.33333333333333,"processingRate":0.5773672055427251,"triggerDetails":{"latency.getBatch.source":"39","numRows.input.source":"1","latency.getOffset.source":"91","triggerId":"0"}}],"sinkStatus":{"description":"FileSink[/Users/zsx/stream2]","offsetDesc":"[#0]"},"triggerDetails":{"timestamp.afterGetBatch":"1480491491817","latency.offsetLogWrite":"26","timestamp.triggerStart":"1480491491653","triggerId":"0","timestamp.triggerFinish":"1480491493385","latency.fullTrigger":"1732","latency.getBatch.total":"44","timestamp.afterGetOffset":"1480491491772","numRows.input.total":"1","isTriggerActive":"false","latency.optimizer":"406","latency.getOffset.total":"91
 ","isDataPresentInTrigger":"true"}}}
+{"Event":"org.apache.spark.sql.streaming.StreamingQueryListener$QueryTerminatedEvent","queryStatus":{"name":"query-1","id":1,"timestamp":1480491532753,"inputRate":0.0,"processingRate":0.0,"latency":null,"sourceStatuses":[{"description":"FileStreamSource[file:/Users/zsx/stream]","offsetDesc":"#0","inputRate":0.0,"processingRate":0.0,"triggerDetails":{"latency.getOffset.source":"1","triggerId":"1"}}],"sinkStatus":{"description":"FileSink[/Users/zsx/stream2]","offsetDesc":"[#0]"},"triggerDetails":{}},"exception":null}
+{"Event":"org.apache.spark.sql.streaming.StreamingQueryListener$QueryTerminatedEvent","queryStatus":{"name":"query-0","id":0,"timestamp":1480491812530,"inputRate":0.0,"processingRate":0.0,"latency":null,"sourceStatuses":[{"description":"FileStreamSource[file:/Users/zsx/stream]","offsetDesc":"#0","inputRate":0.0,"processingRate":0.0,"triggerDetails":{"latency.getBatch.source":"25","latency.getOffset.source":"65","triggerId":"0"}}],"sinkStatus":{"description":"FileSink[/Users/zsx/stream2]","offsetDesc":"[-]"},"triggerDetails":{}},"exception":"org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): org.apache.spark.SparkException: Task failed while writing rows.\n\tat org.apache.spark.sql.execution.streaming.FileStreamSinkWriter.writePartitionToSingleFile(FileStreamSink.scala:183)\n\tat org.apache.spark.sql.execution.streaming.FileStreamSinkWriter$$anonfun$write$1.apply(Fi
 leStreamSink.scala:155)\n\tat org.apache.spark.sql.execution.streaming.FileStreamSinkWriter$$anonfun$write$1.apply(FileStreamSink.scala:153)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:86)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\tat java.lang.Thread.run(Thread.java:745)\nCaused by: java.lang.ArithmeticException: / by zero\n\tat $line15.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:25)\n\tat $line15.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:25)\n\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)\n\tat org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)\n\tat org.ap
 ache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)\n\tat org.apache.spark.sql.execution.streaming.FileStreamSinkWriter.writePartitionToSingleFile(FileStreamSink.scala:172)\n\t... 8 more\n\nDriver stacktrace:\n\tat org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441)\n\tat scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)\n\tat scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)\n\tat org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)\n\tat org.apache.spark.scheduler.DAGScheduler$$a
 nonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)\n\tat scala.Option.foreach(Option.scala:257)\n\tat org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)\n\tat org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)\n\tat org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1873)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1886)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1906)\n\tat org.apache.spark.sql.execution.streaming.FileStreamSinkWriter.write(FileStreamSink.scala:151)\n\tat org.apache.spark.sql.execution.streaming.FileStreamSi
 nk.addBatch(FileStreamSink.scala:70)\n\tat org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch(StreamExecution.scala:437)\n\tat org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply$mcZ$sp(StreamExecution.scala:225)\n\tat org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:213)\n\tat org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:213)\n\tat org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$reportTimeTaken(StreamExecution.scala:656)\n\tat org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun
 $org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:212)\n\tat org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:43)\n\tat org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:208)\n\tat org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:142)\nCaused by: org.apache.spark.SparkException: Task failed while writing rows.\n\tat org.apache.spark.sql.execution.streaming.FileStreamSinkWriter.writePartitionToSingleFile(FileStreamSink.scala:183)\n\tat org.apache.spark.sql.execution.streaming.FileStreamSinkWriter$$anonfun$write$1.apply(FileStreamSink.scala:155)\n\tat org.apache.spark.sql.execution.streaming.FileStreamSinkWriter$$anonfun$write$1.apply(FileStreamSink.scala:153)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)\n\tat org.apach
 e.spark.scheduler.Task.run(Task.scala:86)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\tat java.lang.Thread.run(Thread.java:745)\nCaused by: java.lang.ArithmeticException: / by zero\n\tat $line15.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:25)\n\tat $line15.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:25)\n\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)\n\tat org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)\n\tat org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)\n\tat org.apache.spark.sql.execution.streaming.FileStreamSinkWriter.writePartitionToSingleFile(FileStreamSink.scala:172)\n
 \t... 8 more\n"}
+{"Event":"SparkListenerApplicationEnd","Timestamp":1480491541552}

http://git-wip-us.apache.org/repos/asf/spark/blob/c4979f6e/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
index 08b93e7..07a13a4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
@@ -207,6 +207,14 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
     testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.1.txt")
   }
 
+  testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2.0.2") {
+    // query-event-logs-version-2.0.2.txt has all types of events generated by
+    // Structured Streaming in Spark 2.0.2.
+    // SparkListenerApplicationEnd is the only valid event and it's the last event. We use it
+    // to verify that we can skip broken jsons generated by Structured Streaming.
+    testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.2.txt")
+  }
+
   private def testReplayListenerBusWithBorkenEventJsons(fileName: String): Unit = {
     val input = getClass.getResourceAsStream(s"/structured-streaming/$fileName")
     val events = mutable.ArrayBuffer[SparkListenerEvent]()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org