You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/10/31 23:03:47 UTC
spark git commit: [SPARK-18143][SQL] Ignore Structured Streaming
event logs to avoid breaking history server (branch 2.0)
Repository: spark
Updated Branches:
refs/heads/branch-2.0 9f924747d -> 300d596a5
[SPARK-18143][SQL] Ignore Structured Streaming event logs to avoid breaking history server (branch 2.0)
## What changes were proposed in this pull request?
Backport #15663 to branch-2.0 and fixed conflicts in `ReplayListenerBus`.
## How was this patch tested?
Jenkins
Author: Shixiong Zhu <sh...@databricks.com>
Closes #15695 from zsxwing/fix-event-log-2.0.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/300d596a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/300d596a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/300d596a
Branch: refs/heads/branch-2.0
Commit: 300d596a5177ae372194f73f717174b7ff7acd36
Parents: 9f92474
Author: Shixiong Zhu <sh...@databricks.com>
Authored: Mon Oct 31 16:03:44 2016 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Mon Oct 31 16:03:44 2016 -0700
----------------------------------------------------------------------
.../spark/scheduler/ReplayListenerBus.scala | 17 ++++++++
.../query-event-logs-version-2.0.0.txt | 4 ++
.../query-event-logs-version-2.0.1.txt | 4 ++
.../streaming/StreamingQueryListenerSuite.scala | 42 ++++++++++++++++++++
4 files changed, 67 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/300d596a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
index d32f5eb..c65e7a2 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
@@ -25,6 +25,7 @@ import com.fasterxml.jackson.core.JsonParseException
import org.json4s.jackson.JsonMethods._
import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.ReplayListenerBus._
import org.apache.spark.util.JsonProtocol
/**
@@ -57,6 +58,10 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging {
try {
postToAll(JsonProtocol.sparkEventFromJson(parse(currentLine)))
} catch {
+ case e: ClassNotFoundException if KNOWN_REMOVED_CLASSES.contains(e.getMessage) =>
+ // Ignore events generated by Structured Streaming in Spark 2.0.0 and 2.0.1.
+ // It's safe since no place uses them.
+ logWarning(s"Dropped incompatible Structured Streaming log: $currentLine")
case jpe: JsonParseException =>
// We can only ignore exception from last line of the file that might be truncated
if (!maybeTruncated || lines.hasNext) {
@@ -78,3 +83,15 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging {
}
}
+
+private[spark] object ReplayListenerBus {
+
+ /**
+ * Classes that were removed. Structured Streaming doesn't use them any more. However, parsing
+ * old json may fail and we can just ignore these failures.
+ */
+ val KNOWN_REMOVED_CLASSES = Set(
+ "org.apache.spark.sql.streaming.StreamingQueryListener$QueryProgress",
+ "org.apache.spark.sql.streaming.StreamingQueryListener$QueryTerminated"
+ )
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/300d596a/sql/core/src/test/resources/structured-streaming/query-event-logs-version-2.0.0.txt
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/structured-streaming/query-event-logs-version-2.0.0.txt b/sql/core/src/test/resources/structured-streaming/query-event-logs-version-2.0.0.txt
new file mode 100644
index 0000000..aa7e9a8
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/query-event-logs-version-2.0.0.txt
@@ -0,0 +1,4 @@
+{"Event":"org.apache.spark.sql.streaming.StreamingQueryListener$QueryProgress","queryInfo":{"name":"hello","id":0,"sourceStatuses":[{"description":"FileStreamSource[file:/Users/zsx/stream]","offsetDesc":"#0"}],"sinkStatus":{"description":"org.apache.spark.sql.execution.streaming.MemorySink@2b85b3a5","offsetDesc":"[#0]"}}}
+{"Event":"org.apache.spark.sql.streaming.StreamingQueryListener$QueryTerminated","queryInfo":{"name":"hello","id":0,"sourceStatuses":[{"description":"FileStreamSource[file:/Users/zsx/stream]","offsetDesc":"#0"}],"sinkStatus":{"description":"org.apache.spark.sql.execution.streaming.MemorySink@2b85b3a5","offsetDesc":"[#0]"}},"exception":null,"stackTrace":[]}
+{"Event":"org.apache.spark.sql.streaming.StreamingQueryListener$QueryTerminated","queryInfo":{"name":"hello","id":0,"sourceStatuses":[{"description":"FileStreamSource[file:/Users/zsx/stream]","offsetDesc":"#0"}],"sinkStatus":{"description":"org.apache.spark.sql.execution.streaming.MemorySink@514502dc","offsetDesc":"[-]"}},"exception":"Query hello terminated with exception: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.ArithmeticException: / by zero\n\tat $line15.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:25)\n\tat $line15.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:25)\n\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)\n\tat org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)\n\tat org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$an
on$1.hasNext(WholeStageCodegenExec.scala:370)\n\tat org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)\n\tat org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)\n\tat org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)\n\tat org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)\n\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:283)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:85)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.jav
a:617)\n\tat java.lang.Thread.run(Thread.java:745)\n\nDriver stacktrace:","stackTrace":[{"methodName":"org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches","fileName":"StreamExecution.scala","lineNumber":208,"className":"org.apache.spark.sql.execution.streaming.StreamExecution","nativeMethod":false},{"methodName":"run","fileName":"StreamExecution.scala","lineNumber":120,"className":"org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1","nativeMethod":false}]}
+{"Event":"SparkListenerApplicationEnd","Timestamp":1477593059313}
http://git-wip-us.apache.org/repos/asf/spark/blob/300d596a/sql/core/src/test/resources/structured-streaming/query-event-logs-version-2.0.1.txt
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/structured-streaming/query-event-logs-version-2.0.1.txt b/sql/core/src/test/resources/structured-streaming/query-event-logs-version-2.0.1.txt
new file mode 100644
index 0000000..646cf10
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/query-event-logs-version-2.0.1.txt
@@ -0,0 +1,4 @@
+{"Event":"org.apache.spark.sql.streaming.StreamingQueryListener$QueryProgress","queryInfo":{"name":"hello","id":0,"sourceStatuses":[{"description":"FileStreamSource[file:/Users/zsx/stream]","offsetDesc":"#0"}],"sinkStatus":{"description":"org.apache.spark.sql.execution.streaming.MemorySink@10e5ec94","offsetDesc":"[#0]"}}}
+{"Event":"org.apache.spark.sql.streaming.StreamingQueryListener$QueryTerminated","queryInfo":{"name":"hello","id":0,"sourceStatuses":[{"description":"FileStreamSource[file:/Users/zsx/stream]","offsetDesc":"#0"}],"sinkStatus":{"description":"org.apache.spark.sql.execution.streaming.MemorySink@10e5ec94","offsetDesc":"[#0]"}},"exception":null}
+{"Event":"org.apache.spark.sql.streaming.StreamingQueryListener$QueryTerminated","queryInfo":{"name":"hello","id":0,"sourceStatuses":[{"description":"FileStreamSource[file:/Users/zsx/stream]","offsetDesc":"#0"}],"sinkStatus":{"description":"org.apache.spark.sql.execution.streaming.MemorySink@70c61dc8","offsetDesc":"[-]"}},"exception":"org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.ArithmeticException: / by zero\n\tat $line15.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:25)\n\tat $line15.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:25)\n\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)\n\tat org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)\n\tat org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.h
asNext(WholeStageCodegenExec.scala:370)\n\tat org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)\n\tat org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)\n\tat org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)\n\tat org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)\n\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:283)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:86)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
\n\tat java.lang.Thread.run(Thread.java:745)\n\nDriver stacktrace:\n\tat org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441)\n\tat scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)\n\tat scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)\n\tat org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)\n\tat scala.Option.foreach(Option.scala:257)\n\tat org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)\n\t
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)\n\tat org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)\n\tat org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1890)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1903)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1916)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1930)\n\tat org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:912)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)\n\tat org.apache.spark.rdd.RDD.withScope(RDD.sc
ala:358)\n\tat org.apache.spark.rdd.RDD.collect(RDD.scala:911)\n\tat org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:290)\n\tat org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2193)\n\tat org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)\n\tat org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2546)\n\tat org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2192)\n\tat org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$collect$1.apply(Dataset.scala:2197)\n\tat org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$collect$1.apply(Dataset.scala:2197)\n\tat org.apache.spark.sql.Dataset.withCallback(Dataset.scala:2559)\n\tat org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2197)\n\tat org.apache.spark.sql.Dataset.collect(Dataset.scala:2173)\n\tat org.apache.spark.sql.execut
ion.streaming.MemorySink.addBatch(memory.scala:154)\n\tat org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch(StreamExecution.scala:366)\n\tat org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:197)\n\tat org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:43)\n\tat org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:187)\n\tat org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:124)\nCaused by: java.lang.ArithmeticException: / by zero\n\tat $line15.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:25)\n\tat $line15.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:25)\n\tat org.apache.spar
k.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)\n\tat org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)\n\tat org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)\n\tat org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)\n\tat org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)\n\tat org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)\n\tat org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)\n\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:283)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)\n\tat org.apache.spark.scheduler.Task.run(T
ask.scala:86)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\tat java.lang.Thread.run(Thread.java:745)\n"}
+{"Event":"SparkListenerApplicationEnd","Timestamp":1477701734609}
http://git-wip-us.apache.org/repos/asf/spark/blob/300d596a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
index ff84386..fad24bb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
@@ -17,11 +17,14 @@
package org.apache.spark.sql.streaming
+import scala.collection.mutable
+
import org.scalactic.TolerantNumerics
import org.scalatest.BeforeAndAfter
import org.scalatest.PrivateMethodTester._
import org.apache.spark.SparkException
+import org.apache.spark.scheduler._
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.functions._
@@ -206,6 +209,45 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
assert(queryQueryTerminated.exception === newQueryTerminated.exception)
}
+ test("ReplayListenerBus should ignore broken event jsons generated in 2.0.0") {
+ // query-event-logs-version-2.0.0.txt has all types of events generated by
+ // Structured Streaming in Spark 2.0.0.
+ // SparkListenerApplicationEnd is the only valid event and it's the last event. We use it
+ // to verify that we can skip broken jsons generated by Structured Streaming.
+ testReplayListenerBusWithBrokenEventJsons("query-event-logs-version-2.0.0.txt")
+ }
+
+ test("ReplayListenerBus should ignore broken event jsons generated in 2.0.1") {
+ // query-event-logs-version-2.0.1.txt has all types of events generated by
+ // Structured Streaming in Spark 2.0.1.
+ // SparkListenerApplicationEnd is the only valid event and it's the last event. We use it
+ // to verify that we can skip broken jsons generated by Structured Streaming.
+ testReplayListenerBusWithBrokenEventJsons("query-event-logs-version-2.0.1.txt")
+ }
+
+ private def testReplayListenerBusWithBrokenEventJsons(fileName: String): Unit = {
+ val input = getClass.getResourceAsStream(s"/structured-streaming/$fileName")
+ val events = mutable.ArrayBuffer[SparkListenerEvent]()
+ try {
+ val replayer = new ReplayListenerBus() {
+ // Redirect all parsed events to `events`
+ override def doPostEvent(
+ listener: SparkListenerInterface,
+ event: SparkListenerEvent): Unit = {
+ events += event
+ }
+ }
+ // Add a dummy listener so that "doPostEvent" will be called.
+ replayer.addListener(new SparkListener {})
+ replayer.replay(input, fileName)
+ // SparkListenerApplicationEnd is the only valid event
+ assert(events.size === 1)
+ assert(events(0).isInstanceOf[SparkListenerApplicationEnd])
+ } finally {
+ input.close()
+ }
+ }
+
private def assertStreamingQueryInfoEquals(
expected: StreamingQueryStatus,
actual: StreamingQueryStatus): Unit = {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org