You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ir...@apache.org on 2019/09/05 20:55:41 UTC

[spark] branch master updated: [SPARK-28770][CORE][TEST] Fix ReplayListenerSuite tests that sometimes fail

This is an automated email from the ASF dual-hosted git repository.

irashid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 151b954  [SPARK-28770][CORE][TEST] Fix ReplayListenerSuite tests that sometimes fail
151b954 is described below

commit 151b954e52c66f78a72530ab69f38100101f6cb7
Author: Wing Yew Poon <wy...@cloudera.com>
AuthorDate: Thu Sep 5 15:55:22 2019 -0500

    [SPARK-28770][CORE][TEST] Fix ReplayListenerSuite tests that sometimes fail
    
    ### What changes were proposed in this pull request?
    
    `ReplayListenerSuite` depends on a listener class to listen for replayed events. This class was implemented by extending `EventLoggingListener`. `EventLoggingListener` does not log executor metrics update events, but uses them to update internal state; on a stage completion event, it then logs stage executor metrics events using this internal state. As executor metrics update events do not get written to the event log, they do not get replayed. The internal state of the replay listene [...]
    
    We reimplement the replay listener to simply buffer each and every event it receives. This makes it a simpler yet better tool for verifying the events that get sent through the ReplayListenerBus.
    
    ### Why are the changes needed?
    
    As explained above. Tests sometimes fail due to events being received by the `EventLoggingListener` that do not get logged (and thus do not get replayed) but influence other events that get logged.
    
    ### Does this PR introduce any user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing unit tests.
    
    Closes #25673 from wypoon/SPARK-28770.
    
    Authored-by: Wing Yew Poon <wy...@cloudera.com>
    Signed-off-by: Imran Rashid <ir...@cloudera.com>
---
 .../spark/scheduler/ReplayListenerSuite.scala      | 36 ++++++++++------------
 1 file changed, 16 insertions(+), 20 deletions(-)

diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
index e796137..d65b5cb 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
@@ -21,12 +21,14 @@ import java.io._
 import java.net.URI
 import java.util.concurrent.atomic.AtomicInteger
 
+import scala.collection.mutable.ArrayBuffer
+
 import org.apache.hadoop.fs.Path
 import org.json4s.JsonAST.JValue
 import org.json4s.jackson.JsonMethods._
 import org.scalatest.BeforeAndAfter
 
-import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite}
+import org.apache.spark._
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.io.{CompressionCodec, LZ4CompressionCodec}
 import org.apache.spark.util.{JsonProtocol, JsonProtocolSuite, Utils}
@@ -62,7 +64,7 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp
 
     val conf = EventLoggingListenerSuite.getLoggingConf(logFilePath)
     val logData = fileSystem.open(logFilePath)
-    val eventMonster = new EventMonster(conf)
+    val eventMonster = new EventBufferingListener
     try {
       val replayer = new ReplayListenerBus()
       replayer.addListener(eventMonster)
@@ -108,7 +110,7 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp
     val conf = EventLoggingListenerSuite.getLoggingConf(logFilePath)
     val replayer = new ReplayListenerBus()
 
-    val eventMonster = new EventMonster(conf)
+    val eventMonster = new EventBufferingListener
     replayer.addListener(eventMonster)
 
     // Verify the replay returns the events given the input maybe truncated.
@@ -145,7 +147,7 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp
 
     val conf = EventLoggingListenerSuite.getLoggingConf(logFilePath)
     val logData = fileSystem.open(logFilePath)
-    val eventMonster = new EventMonster(conf)
+    val eventMonster = new EventBufferingListener
     try {
       val replayer = new ReplayListenerBus()
       replayer.addListener(eventMonster)
@@ -207,7 +209,7 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp
 
     // Replay events
     val logData = EventLoggingListener.openEventLog(eventLog.getPath(), fileSystem)
-    val eventMonster = new EventMonster(conf)
+    val eventMonster = new EventBufferingListener
     try {
       val replayer = new ReplayListenerBus()
       replayer.addListener(eventMonster)
@@ -219,11 +221,11 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp
     // Verify the same events are replayed in the same order
     assert(sc.eventLogger.isDefined)
     val originalEvents = sc.eventLogger.get.loggedEvents
+      .map(JsonProtocol.sparkEventFromJson(_))
     val replayedEvents = eventMonster.loggedEvents
+      .map(JsonProtocol.sparkEventFromJson(_))
     originalEvents.zip(replayedEvents).foreach { case (e1, e2) =>
-      // Don't compare the JSON here because accumulators in StageInfo may be out of order
-      JsonProtocolSuite.assertEquals(
-        JsonProtocol.sparkEventFromJson(e1), JsonProtocol.sparkEventFromJson(e2))
+      JsonProtocolSuite.assertEquals(e1, e1)
     }
   }
 
@@ -235,21 +237,15 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp
 
   /**
    * A simple listener that buffers all the events it receives.
-   *
-   * The event buffering functionality must be implemented within EventLoggingListener itself.
-   * This is because of the following race condition: the event may be mutated between being
-   * processed by one listener and being processed by another. Thus, in order to establish
-   * a fair comparison between the original events and the replayed events, both functionalities
-   * must be implemented within one listener (i.e. the EventLoggingListener).
-   *
-   * This child listener inherits only the event buffering functionality, but does not actually
-   * log the events.
    */
-  private class EventMonster(conf: SparkConf)
-    extends EventLoggingListener("test", None, new URI("testdir"), conf) {
+  private class EventBufferingListener extends SparkFirehoseListener {
 
-    override def start() { }
+    private[scheduler] val loggedEvents = new ArrayBuffer[JValue]
 
+    override def onEvent(event: SparkListenerEvent) {
+      val eventJson = JsonProtocol.sparkEventToJson(event)
+      loggedEvents += eventJson
+    }
   }
 
   /*


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org