You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Joseph K. Bradley (JIRA)" <ji...@apache.org> on 2019/02/21 19:44:00 UTC

[jira] [Updated] (SPARK-26960) Reduce flakiness of Spark ML Listener test suite by waiting for listener bus to clear

     [ https://issues.apache.org/jira/browse/SPARK-26960?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Joseph K. Bradley updated SPARK-26960:
--------------------------------------
    Description: 
[SPARK-23674] added SparkListeners for some spark.ml events, as well as a test suite.  I've observed flakiness in the test suite (though I unfortunately only have local logs for failures, not permalinks).  Looking at logs, here's my assessment, which I confirmed with [~zsxwing].

Test failure I saw:
{code}
[info] - pipeline read/write events *** FAILED *** (10 seconds, 253 milliseconds)
[info]   The code passed to eventually never returned normally. Attempted 20 times over 10.012222409 seconds. Last failure message: Unexpected event thrown: SaveInstanceEnd(org.apache.spark.ml.util.DefaultParamsWriter@6daf89c2,/home/jenkins/workspace/mllib/target/tmp/spark-572952d8-5d2d-4a6c-bd4f-940bb0bbc3d5/pipeline/stages/0_writableStage). (MLEventsSuite.scala:190)
[info]   org.scalatest.exceptions.TestFailedDueToTimeoutException:
[info]   at org.scalatest.concurrent.Eventually$class.tryTryAgain$1(Eventually.scala:421)
[info]   at org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:439)
[info]   at org.apache.spark.ml.MLEventsSuite.eventually(MLEventsSuite.scala:39)
[info]   at org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:308)
[info]   at org.apache.spark.ml.MLEventsSuite.eventually(MLEventsSuite.scala:39)
[info]   at org.apache.spark.ml.MLEventsSuite$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(MLEventsSuite.scala:190)
[info]   at org.apache.spark.ml.MLEventsSuite$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(MLEventsSuite.scala:165)
[info]   at org.apache.spark.SparkFunSuite.withTempDir(SparkFunSuite.scala:314)
[info]   at org.apache.spark.ml.MLEventsSuite$$anonfun$1.apply$mcV$sp(MLEventsSuite.scala:165)
[info]   at org.apache.spark.ml.MLEventsSuite$$anonfun$1.apply(MLEventsSuite.scala:161)
[info]   at org.apache.spark.ml.MLEventsSuite$$anonfun$1.apply(MLEventsSuite.scala:161)
[info]   at org.apache.spark.SparkFunSuite$$anonfun$test$1.apply(SparkFunSuite.scala:284)
[info]   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
[info]   at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)
[info]   at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:184)
[info]   at org.apache.spark.SparkFunSuite.runWithCredentials(SparkFunSuite.scala:301)
[info]   at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:165)
[info]   at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:183)
[info]   at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
[info]   at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
[info]   at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
[info]   at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:196)
[info]   at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:64)
[info]   at org.scalatest.BeforeAndAfterEach$class.runTest(BeforeAndAfterEach.scala:221)
[info]   at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:64)
[info]   at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
[info]   at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
[info]   at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:396)
[info]   at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:384)
[info]   at scala.collection.immutable.List.foreach(List.scala:392)
[info]   at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
[info]   at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:379)
[info]   at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
[info]   at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:229)
[info]   at org.scalatest.FunSuite.runTests(FunSuite.scala:1560)
[info]   at org.scalatest.Suite$class.run(Suite.scala:1147)
[info]   at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1560)
[info]   at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
[info]   at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
[info]   at org.scalatest.SuperEngine.runImpl(Engine.scala:521)
[info]   at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:233)
[info]   at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:64)
[info]   at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:213)
[info]   at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:210)
[info]   at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:64)
[info]   at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:314)
[info]   at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:480)
[info]   at sbt.ForkMain$Run$2.call(ForkMain.java:296)
[info]   at sbt.ForkMain$Run$2.call(ForkMain.java:286)
[info]   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
[info]   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[info]   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[info]   at java.lang.Thread.run(Thread.java:748)
[info]   Cause: org.scalatest.exceptions.TestFailedException: Unexpected event thrown: SaveInstanceEnd(org.apache.spark.ml.util.DefaultParamsWriter@6daf89c2,/home/jenkins/workspace/mllib/target/tmp/spark-572952d8-5d2d-4a6c-bd4f-940bb0bbc3d5/pipeline/stages/0_writableStage)
[info]   at org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:528)
[info]   at org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1560)
[info]   at org.scalatest.Assertions$class.fail(Assertions.scala:1089)
[info]   at org.scalatest.FunSuite.fail(FunSuite.scala:1560)
[info]   at org.apache.spark.ml.MLEventsSuite$$anonfun$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$2$$anonfun$apply$mcV$sp$3.apply(MLEventsSuite.scala:202)
[info]   at org.apache.spark.ml.MLEventsSuite$$anonfun$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$2$$anonfun$apply$mcV$sp$3.apply(MLEventsSuite.scala:191)
[info]   at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
[info]   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
[info]   at org.apache.spark.ml.MLEventsSuite$$anonfun$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply$mcV$sp(MLEventsSuite.scala:191)
[info]   at org.apache.spark.ml.MLEventsSuite$$anonfun$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(MLEventsSuite.scala:191)
[info]   at org.apache.spark.ml.MLEventsSuite$$anonfun$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(MLEventsSuite.scala:191)
[info]   at org.scalatest.concurrent.Eventually$class.makeAValiantAttempt$1(Eventually.scala:395)
[info]   at org.scalatest.concurrent.Eventually$class.tryTryAgain$1(Eventually.scala:409)
[info]   at org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:439)
[info]   at org.apache.spark.ml.MLEventsSuite.eventually(MLEventsSuite.scala:39)
[info]   at org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:308)
[info]   at org.apache.spark.ml.MLEventsSuite.eventually(MLEventsSuite.scala:39)
[info]   at org.apache.spark.ml.MLEventsSuite$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(MLEventsSuite.scala:190)
[info]   at org.apache.spark.ml.MLEventsSuite$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(MLEventsSuite.scala:165)
[info]   at org.apache.spark.SparkFunSuite.withTempDir(SparkFunSuite.scala:314)
[info]   at org.apache.spark.ml.MLEventsSuite$$anonfun$1.apply$mcV$sp(MLEventsSuite.scala:165)
[info]   at org.apache.spark.ml.MLEventsSuite$$anonfun$1.apply(MLEventsSuite.scala:161)
[info]   at org.apache.spark.ml.MLEventsSuite$$anonfun$1.apply(MLEventsSuite.scala:161)
[info]   at org.apache.spark.SparkFunSuite$$anonfun$test$1.apply(SparkFunSuite.scala:284)
[info]   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
[info]   at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)
[info]   at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:184)
[info]   at org.apache.spark.SparkFunSuite.runWithCredentials(SparkFunSuite.scala:301)
[info]   at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:165)
[info]   at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:183)
[info]   at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
[info]   at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
[info]   at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
[info]   at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:196)
[info]   at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:64)
[info]   at org.scalatest.BeforeAndAfterEach$class.runTest(BeforeAndAfterEach.scala:221)
[info]   at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:64)
[info]   at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
[info]   at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
[info]   at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:396)
[info]   at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:384)
[info]   at scala.collection.immutable.List.foreach(List.scala:392)
[info]   at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
[info]   at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:379)
[info]   at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
[info]   at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:229)
[info]   at org.scalatest.FunSuite.runTests(FunSuite.scala:1560)
[info]   at org.scalatest.Suite$class.run(Suite.scala:1147)
[info]   at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1560)
[info]   at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
[info]   at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
[info]   at org.scalatest.SuperEngine.runImpl(Engine.scala:521)
[info]   at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:233)
[info]   at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:64)
[info]   at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:213)
[info]   at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:210)
[info]   at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:64)
[info]   at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:314)
[info]   at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:480)
[info]   at sbt.ForkMain$Run$2.call(ForkMain.java:296)
[info]   at sbt.ForkMain$Run$2.call(ForkMain.java:286)
[info]   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
[info]   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[info]   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[info]   at java.lang.Thread.run(Thread.java:748)
{code}

The issue is in the "read/write events" tests, which work as follows:
* write
* wait until we see at least 1 write-related SparkListenerEvent
* read
* wait until we see at least 1 read-related SparkListenerEvent

The problem is that the last step does NOT allow any write-related SparkListenerEvents, but some of those events may be delayed enough that they are seen in this last step. We should ideally add logic before "read" to wait until the listener events are cleared/complete. Looking into other SparkListener tests, we need to use `sc.listenerBus.waitUntilEmpty(TIMEOUT)`.

Relevant test code: https://github.com/apache/spark/blob/91caf0bfce4706a264fcfe222fa500354ce69ff1/mllib/src/test/scala/org/apache/spark/ml/MLEventsSuite.scala#L247

I'll send a patch for this.

  was:
[SPARK-23674] added SparkListeners for some spark.ml events, as well as a test suite.  I've observed flakiness in the test suite (though I unfortunately only have local logs for failures, not permalinks).  Looking at logs, here's my assessment, which I confirmed with [~zsxwing].

Test failure I saw:
{code}
[info] - pipeline read/write events *** FAILED *** (10 seconds, 253 milliseconds)
[info]   The code passed to eventually never returned normally. Attempted 20 times over 10.012222409 seconds. Last failure message: Unexpected event thrown: SaveInstanceEnd(org.apache.spark.ml.util.DefaultParamsWriter@6daf89c2,/home/jenkins/workspace/mllib/target/tmp/spark-572952d8-5d2d-4a6c-bd4f-940bb0bbc3d5/pipeline/stages/0_writableStage). (MLEventsSuite.scala:190)
[info]   org.scalatest.exceptions.TestFailedDueToTimeoutException:
[info]   at org.scalatest.concurrent.Eventually$class.tryTryAgain$1(Eventually.scala:421)
[info]   at org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:439)
[info]   at org.apache.spark.ml.MLEventsSuite.eventually(MLEventsSuite.scala:39)
[info]   at org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:308)
[info]   at org.apache.spark.ml.MLEventsSuite.eventually(MLEventsSuite.scala:39)
[info]   at org.apache.spark.ml.MLEventsSuite$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(MLEventsSuite.scala:190)
[info]   at org.apache.spark.ml.MLEventsSuite$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(MLEventsSuite.scala:165)
[info]   at org.apache.spark.SparkFunSuite.withTempDir(SparkFunSuite.scala:314)
[info]   at org.apache.spark.ml.MLEventsSuite$$anonfun$1.apply$mcV$sp(MLEventsSuite.scala:165)
[info]   at org.apache.spark.ml.MLEventsSuite$$anonfun$1.apply(MLEventsSuite.scala:161)
[info]   at org.apache.spark.ml.MLEventsSuite$$anonfun$1.apply(MLEventsSuite.scala:161)
[info]   at org.apache.spark.SparkFunSuite$$anonfun$test$1.apply(SparkFunSuite.scala:284)
[info]   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
[info]   at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)
[info]   at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:184)
[info]   at org.apache.spark.SparkFunSuite.runWithCredentials(SparkFunSuite.scala:301)
[info]   at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:165)
[info]   at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:183)
[info]   at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
[info]   at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
[info]   at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
[info]   at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:196)
[info]   at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:64)
[info]   at org.scalatest.BeforeAndAfterEach$class.runTest(BeforeAndAfterEach.scala:221)
[info]   at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:64)
[info]   at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
[info]   at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
[info]   at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:396)
[info]   at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:384)
[info]   at scala.collection.immutable.List.foreach(List.scala:392)
[info]   at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
[info]   at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:379)
[info]   at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
[info]   at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:229)
[info]   at org.scalatest.FunSuite.runTests(FunSuite.scala:1560)
[info]   at org.scalatest.Suite$class.run(Suite.scala:1147)
[info]   at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1560)
[info]   at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
[info]   at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
[info]   at org.scalatest.SuperEngine.runImpl(Engine.scala:521)
[info]   at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:233)
[info]   at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:64)
[info]   at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:213)
[info]   at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:210)
[info]   at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:64)
[info]   at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:314)
[info]   at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:480)
[info]   at sbt.ForkMain$Run$2.call(ForkMain.java:296)
[info]   at sbt.ForkMain$Run$2.call(ForkMain.java:286)
[info]   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
[info]   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[info]   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[info]   at java.lang.Thread.run(Thread.java:748)
[info]   Cause: org.scalatest.exceptions.TestFailedException: Unexpected event thrown: SaveInstanceEnd(org.apache.spark.ml.util.DefaultParamsWriter@6daf89c2,/home/jenkins/workspace/mllib/target/tmp/spark-572952d8-5d2d-4a6c-bd4f-940bb0bbc3d5/pipeline/stages/0_writableStage)
[info]   at org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:528)
[info]   at org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1560)
[info]   at org.scalatest.Assertions$class.fail(Assertions.scala:1089)
[info]   at org.scalatest.FunSuite.fail(FunSuite.scala:1560)
[info]   at org.apache.spark.ml.MLEventsSuite$$anonfun$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$2$$anonfun$apply$mcV$sp$3.apply(MLEventsSuite.scala:202)
[info]   at org.apache.spark.ml.MLEventsSuite$$anonfun$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$2$$anonfun$apply$mcV$sp$3.apply(MLEventsSuite.scala:191)
[info]   at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
[info]   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
[info]   at org.apache.spark.ml.MLEventsSuite$$anonfun$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply$mcV$sp(MLEventsSuite.scala:191)
[info]   at org.apache.spark.ml.MLEventsSuite$$anonfun$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(MLEventsSuite.scala:191)
[info]   at org.apache.spark.ml.MLEventsSuite$$anonfun$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(MLEventsSuite.scala:191)
[info]   at org.scalatest.concurrent.Eventually$class.makeAValiantAttempt$1(Eventually.scala:395)
[info]   at org.scalatest.concurrent.Eventually$class.tryTryAgain$1(Eventually.scala:409)
[info]   at org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:439)
[info]   at org.apache.spark.ml.MLEventsSuite.eventually(MLEventsSuite.scala:39)
[info]   at org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:308)
[info]   at org.apache.spark.ml.MLEventsSuite.eventually(MLEventsSuite.scala:39)
[info]   at org.apache.spark.ml.MLEventsSuite$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(MLEventsSuite.scala:190)
[info]   at org.apache.spark.ml.MLEventsSuite$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(MLEventsSuite.scala:165)
[info]   at org.apache.spark.SparkFunSuite.withTempDir(SparkFunSuite.scala:314)
[info]   at org.apache.spark.ml.MLEventsSuite$$anonfun$1.apply$mcV$sp(MLEventsSuite.scala:165)
[info]   at org.apache.spark.ml.MLEventsSuite$$anonfun$1.apply(MLEventsSuite.scala:161)
[info]   at org.apache.spark.ml.MLEventsSuite$$anonfun$1.apply(MLEventsSuite.scala:161)
[info]   at org.apache.spark.SparkFunSuite$$anonfun$test$1.apply(SparkFunSuite.scala:284)
[info]   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
[info]   at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)
[info]   at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:184)
[info]   at org.apache.spark.SparkFunSuite.runWithCredentials(SparkFunSuite.scala:301)
[info]   at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:165)
[info]   at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:183)
[info]   at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
[info]   at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
[info]   at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
[info]   at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:196)
[info]   at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:64)
[info]   at org.scalatest.BeforeAndAfterEach$class.runTest(BeforeAndAfterEach.scala:221)
[info]   at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:64)
[info]   at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
[info]   at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
[info]   at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:396)
[info]   at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:384)
[info]   at scala.collection.immutable.List.foreach(List.scala:392)
[info]   at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
[info]   at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:379)
[info]   at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
[info]   at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:229)
[info]   at org.scalatest.FunSuite.runTests(FunSuite.scala:1560)
[info]   at org.scalatest.Suite$class.run(Suite.scala:1147)
[info]   at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1560)
[info]   at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
[info]   at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
[info]   at org.scalatest.SuperEngine.runImpl(Engine.scala:521)
[info]   at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:233)
[info]   at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:64)
[info]   at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:213)
[info]   at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:210)
[info]   at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:64)
[info]   at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:314)
[info]   at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:480)
[info]   at sbt.ForkMain$Run$2.call(ForkMain.java:296)
[info]   at sbt.ForkMain$Run$2.call(ForkMain.java:286)
[info]   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
[info]   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[info]   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[info]   at java.lang.Thread.run(Thread.java:748)
{code}

The issue is in the "read/write events" tests, which work as follows:
* write
* wait until we see at least 1 write-related SparkListenerEvent
* read
* wait until we see at least 1 read-related SparkListenerEvent

The problem is that the last step does NOT allow any write-related SparkListenerEvents, but some of those events may be delayed enough that they are seen in this last step. We should ideally add logic before "read" to wait until the listener events are cleared/complete. Looking into other SparkListener tests, we need to use `sc.listenerBus.waitUntilEmpty(TIMEOUT)`.

I'll send a patch for this.


> Reduce flakiness of Spark ML Listener test suite by waiting for listener bus to clear
> -------------------------------------------------------------------------------------
>
>                 Key: SPARK-26960
>                 URL: https://issues.apache.org/jira/browse/SPARK-26960
>             Project: Spark
>          Issue Type: Improvement
>          Components: ML, Tests
>    Affects Versions: 3.0.0
>            Reporter: Joseph K. Bradley
>            Assignee: Joseph K. Bradley
>            Priority: Major
>
> [SPARK-23674] added SparkListeners for some spark.ml events, as well as a test suite.  I've observed flakiness in the test suite (though I unfortunately only have local logs for failures, not permalinks).  Looking at logs, here's my assessment, which I confirmed with [~zsxwing].
> Test failure I saw:
> {code}
> [info] - pipeline read/write events *** FAILED *** (10 seconds, 253 milliseconds)
> [info]   The code passed to eventually never returned normally. Attempted 20 times over 10.012222409 seconds. Last failure message: Unexpected event thrown: SaveInstanceEnd(org.apache.spark.ml.util.DefaultParamsWriter@6daf89c2,/home/jenkins/workspace/mllib/target/tmp/spark-572952d8-5d2d-4a6c-bd4f-940bb0bbc3d5/pipeline/stages/0_writableStage). (MLEventsSuite.scala:190)
> [info]   org.scalatest.exceptions.TestFailedDueToTimeoutException:
> [info]   at org.scalatest.concurrent.Eventually$class.tryTryAgain$1(Eventually.scala:421)
> [info]   at org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:439)
> [info]   at org.apache.spark.ml.MLEventsSuite.eventually(MLEventsSuite.scala:39)
> [info]   at org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:308)
> [info]   at org.apache.spark.ml.MLEventsSuite.eventually(MLEventsSuite.scala:39)
> [info]   at org.apache.spark.ml.MLEventsSuite$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(MLEventsSuite.scala:190)
> [info]   at org.apache.spark.ml.MLEventsSuite$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(MLEventsSuite.scala:165)
> [info]   at org.apache.spark.SparkFunSuite.withTempDir(SparkFunSuite.scala:314)
> [info]   at org.apache.spark.ml.MLEventsSuite$$anonfun$1.apply$mcV$sp(MLEventsSuite.scala:165)
> [info]   at org.apache.spark.ml.MLEventsSuite$$anonfun$1.apply(MLEventsSuite.scala:161)
> [info]   at org.apache.spark.ml.MLEventsSuite$$anonfun$1.apply(MLEventsSuite.scala:161)
> [info]   at org.apache.spark.SparkFunSuite$$anonfun$test$1.apply(SparkFunSuite.scala:284)
> [info]   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
> [info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
> [info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
> [info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
> [info]   at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)
> [info]   at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:184)
> [info]   at org.apache.spark.SparkFunSuite.runWithCredentials(SparkFunSuite.scala:301)
> [info]   at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:165)
> [info]   at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:183)
> [info]   at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
> [info]   at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
> [info]   at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
> [info]   at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:196)
> [info]   at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:64)
> [info]   at org.scalatest.BeforeAndAfterEach$class.runTest(BeforeAndAfterEach.scala:221)
> [info]   at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:64)
> [info]   at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
> [info]   at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
> [info]   at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:396)
> [info]   at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:384)
> [info]   at scala.collection.immutable.List.foreach(List.scala:392)
> [info]   at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
> [info]   at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:379)
> [info]   at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
> [info]   at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:229)
> [info]   at org.scalatest.FunSuite.runTests(FunSuite.scala:1560)
> [info]   at org.scalatest.Suite$class.run(Suite.scala:1147)
> [info]   at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1560)
> [info]   at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
> [info]   at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
> [info]   at org.scalatest.SuperEngine.runImpl(Engine.scala:521)
> [info]   at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:233)
> [info]   at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:64)
> [info]   at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:213)
> [info]   at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:210)
> [info]   at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:64)
> [info]   at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:314)
> [info]   at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:480)
> [info]   at sbt.ForkMain$Run$2.call(ForkMain.java:296)
> [info]   at sbt.ForkMain$Run$2.call(ForkMain.java:286)
> [info]   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> [info]   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> [info]   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> [info]   at java.lang.Thread.run(Thread.java:748)
> [info]   Cause: org.scalatest.exceptions.TestFailedException: Unexpected event thrown: SaveInstanceEnd(org.apache.spark.ml.util.DefaultParamsWriter@6daf89c2,/home/jenkins/workspace/mllib/target/tmp/spark-572952d8-5d2d-4a6c-bd4f-940bb0bbc3d5/pipeline/stages/0_writableStage)
> [info]   at org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:528)
> [info]   at org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1560)
> [info]   at org.scalatest.Assertions$class.fail(Assertions.scala:1089)
> [info]   at org.scalatest.FunSuite.fail(FunSuite.scala:1560)
> [info]   at org.apache.spark.ml.MLEventsSuite$$anonfun$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$2$$anonfun$apply$mcV$sp$3.apply(MLEventsSuite.scala:202)
> [info]   at org.apache.spark.ml.MLEventsSuite$$anonfun$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$2$$anonfun$apply$mcV$sp$3.apply(MLEventsSuite.scala:191)
> [info]   at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> [info]   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> [info]   at org.apache.spark.ml.MLEventsSuite$$anonfun$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply$mcV$sp(MLEventsSuite.scala:191)
> [info]   at org.apache.spark.ml.MLEventsSuite$$anonfun$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(MLEventsSuite.scala:191)
> [info]   at org.apache.spark.ml.MLEventsSuite$$anonfun$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(MLEventsSuite.scala:191)
> [info]   at org.scalatest.concurrent.Eventually$class.makeAValiantAttempt$1(Eventually.scala:395)
> [info]   at org.scalatest.concurrent.Eventually$class.tryTryAgain$1(Eventually.scala:409)
> [info]   at org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:439)
> [info]   at org.apache.spark.ml.MLEventsSuite.eventually(MLEventsSuite.scala:39)
> [info]   at org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:308)
> [info]   at org.apache.spark.ml.MLEventsSuite.eventually(MLEventsSuite.scala:39)
> [info]   at org.apache.spark.ml.MLEventsSuite$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(MLEventsSuite.scala:190)
> [info]   at org.apache.spark.ml.MLEventsSuite$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(MLEventsSuite.scala:165)
> [info]   at org.apache.spark.SparkFunSuite.withTempDir(SparkFunSuite.scala:314)
> [info]   at org.apache.spark.ml.MLEventsSuite$$anonfun$1.apply$mcV$sp(MLEventsSuite.scala:165)
> [info]   at org.apache.spark.ml.MLEventsSuite$$anonfun$1.apply(MLEventsSuite.scala:161)
> [info]   at org.apache.spark.ml.MLEventsSuite$$anonfun$1.apply(MLEventsSuite.scala:161)
> [info]   at org.apache.spark.SparkFunSuite$$anonfun$test$1.apply(SparkFunSuite.scala:284)
> [info]   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
> [info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
> [info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
> [info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
> [info]   at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)
> [info]   at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:184)
> [info]   at org.apache.spark.SparkFunSuite.runWithCredentials(SparkFunSuite.scala:301)
> [info]   at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:165)
> [info]   at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:183)
> [info]   at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
> [info]   at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
> [info]   at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
> [info]   at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:196)
> [info]   at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:64)
> [info]   at org.scalatest.BeforeAndAfterEach$class.runTest(BeforeAndAfterEach.scala:221)
> [info]   at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:64)
> [info]   at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
> [info]   at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
> [info]   at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:396)
> [info]   at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:384)
> [info]   at scala.collection.immutable.List.foreach(List.scala:392)
> [info]   at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
> [info]   at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:379)
> [info]   at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
> [info]   at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:229)
> [info]   at org.scalatest.FunSuite.runTests(FunSuite.scala:1560)
> [info]   at org.scalatest.Suite$class.run(Suite.scala:1147)
> [info]   at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1560)
> [info]   at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
> [info]   at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
> [info]   at org.scalatest.SuperEngine.runImpl(Engine.scala:521)
> [info]   at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:233)
> [info]   at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:64)
> [info]   at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:213)
> [info]   at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:210)
> [info]   at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:64)
> [info]   at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:314)
> [info]   at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:480)
> [info]   at sbt.ForkMain$Run$2.call(ForkMain.java:296)
> [info]   at sbt.ForkMain$Run$2.call(ForkMain.java:286)
> [info]   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> [info]   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> [info]   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> [info]   at java.lang.Thread.run(Thread.java:748)
> {code}
> The issue is in the "read/write events" tests, which work as follows:
> * write
> * wait until we see at least 1 write-related SparkListenerEvent
> * read
> * wait until we see at least 1 read-related SparkListenerEvent
> The problem is that the last step does NOT allow any write-related SparkListenerEvents, but some of those events may be delayed enough that they are seen in this last step. We should ideally add logic before "read" to wait until the listener events are cleared/complete. Looking into other SparkListener tests, we need to use `sc.listenerBus.waitUntilEmpty(TIMEOUT)`.
> Relevant test code: https://github.com/apache/spark/blob/91caf0bfce4706a264fcfe222fa500354ce69ff1/mllib/src/test/scala/org/apache/spark/ml/MLEventsSuite.scala#L247
> I'll send a patch for this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org