You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (Jira)" <ji...@apache.org> on 2020/08/03 13:16:00 UTC

[jira] [Updated] (SPARK-32520) Flaky Test: KafkaSourceStressSuite.stress test with multiple topics and partitions

     [ https://issues.apache.org/jira/browse/SPARK-32520?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hyukjin Kwon updated SPARK-32520:
---------------------------------
    Description: 
{{KafkaSourceStressSuite.stress test with multiple topics and partitions}} seems flaky in GitHub Actions build:
https://github.com/apache/spark/pull/29335/checks?check_run_id=940205463

{code}
KafkaSourceStressSuite:
- stress test with multiple topics and partitions *** FAILED *** (2 minutes, 7 seconds)
  Timed out waiting for stream: The code passed to failAfter did not complete within 30 seconds.
  java.lang.Thread.getStackTrace(Thread.java:1559)
  	org.scalatest.concurrent.TimeLimits.failAfterImpl(TimeLimits.scala:234)
  	org.scalatest.concurrent.TimeLimits.failAfterImpl$(TimeLimits.scala:233)
  	org.apache.spark.sql.kafka010.KafkaSourceTest.failAfterImpl(KafkaMicroBatchSourceSuite.scala:53)
  	org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:230)
  	org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:229)
  	org.apache.spark.sql.kafka010.KafkaSourceTest.failAfter(KafkaMicroBatchSourceSuite.scala:53)
  	org.apache.spark.sql.streaming.StreamTest.$anonfun$testStream$7(StreamTest.scala:471)
  	org.apache.spark.sql.streaming.StreamTest.$anonfun$testStream$7$adapted(StreamTest.scala:470)
  	scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149)

  	Caused by: 	null
  	java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2156)
  		org.apache.spark.sql.execution.streaming.StreamExecution.awaitOffset(StreamExecution.scala:483)
  		org.apache.spark.sql.streaming.StreamTest.$anonfun$testStream$8(StreamTest.scala:472)
  		scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
  		org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127)
  		org.scalatest.concurrent.TimeLimits.failAfterImpl(TimeLimits.scala:239)
  		org.scalatest.concurrent.TimeLimits.failAfterImpl$(TimeLimits.scala:233)
  		org.apache.spark.sql.kafka010.KafkaSourceTest.failAfterImpl(KafkaMicroBatchSourceSuite.scala:53)
  		org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:230)
  		org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:229)

  == Progress ==
     AssertOnQuery(<condition>, )
     AddKafkaData(topics = Set(stress4, stress2, stress1, stress5, stress3), data = empty Range 0 until 0, message = )
     CheckAnswer:
     StopStream
     AddKafkaData(topics = Set(stress4, stress2, stress1, stress5, stress3), data = Range 0 until 8, message = )
     StartStream(ProcessingTimeTrigger(0),org.apache.spark.util.SystemClock@7dce5824,Map(),null)
     CheckAnswer: [1],[2],[3],[4],[5],[6],[7],[8]
     CheckAnswer: [1],[2],[3],[4],[5],[6],[7],[8]
     StopStream
     StartStream(ProcessingTimeTrigger(0),org.apache.spark.util.SystemClock@7255955e,Map(),null)
     AddKafkaData(topics = Set(stress4, stress6, stress2, stress1, stress5, stress3), data = Range 8 until 9, message = Add topic stress7)
     AddKafkaData(topics = Set(stress4, stress6, stress2, stress1, stress5, stress3), data = Range 9 until 10, message = )
     AddKafkaData(topics = Set(stress4, stress6, stress2, stress1, stress5, stress3), data = Range 10 until 15, message = Add partition)
     AddKafkaData(topics = Set(stress4, stress6, stress2, stress8, stress1, stress5, stress3), data = empty Range 15 until 15, message = Add topic stress9)
     AddKafkaData(topics = Set(stress4, stress6, stress2, stress8, stress1, stress3), data = Range 15 until 16, message = Delete topic stress5)
     AddKafkaData(topics = Set(stress4, stress6, stress2, stress8, stress1, stress3, stress10), data = Range 16 until 23, message = Add topic stress11)
     CheckAnswer: [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23]
     StopStream
     AddKafkaData(topics = Set(stress4, stress6, stress2, stress8, stress1, stress3, stress10), data = Range 23 until 32, message = Add partition)
     AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress8, stress1, stress3, stress10), data = Range 32 until 37, message = Add topic stress13)
     StartStream(ProcessingTimeTrigger(0),org.apache.spark.util.SystemClock@73a476e3,Map(),null)
     AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress8, stress1, stress3, stress10), data = Range 37 until 38, message = )
     AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress1, stress3, stress10), data = Range 38 until 41, message = Delete topic stress8)
  => CheckAnswer: [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41]
     StopStream
     AddKafkaData(topics = Set(stress14, stress4, stress6, stress12, stress2, stress1, stress3, stress10), data = empty Range 41 until 41, message = Add topic stress15)
     AddKafkaData(topics = Set(stress14, stress4, stress6, stress12, stress2, stress1, stress3, stress10), data = Range 41 until 43, message = )
     AddKafkaData(topics = Set(stress14, stress4, stress6, stress12, stress2, stress1, stress3, stress10, stress16), data = Range 43 until 48, message = Add topic stress17)
     StartStream(ProcessingTimeTrigger(0),org.apache.spark.util.SystemClock@46dd79a7,Map(),null)
     AddKafkaData(topics = Set(stress14, stress4, stress6, stress12, stress2, stress1, stress18, stress3, stress10, stress16), data = Range 48 until 52, message = Add topic stress19)
     CheckAnswer: [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52]
     StopStream
     StartStream(ProcessingTimeTrigger(0),org.apache.spark.util.SystemClock@6b1792a,Map(),null)
     AddKafkaData(topics = Set(stress14, stress4, stress6, stress12, stress2, stress1, stress18, stress3, stress10, stress16), data = Range 52 until 55, message = )
     AddKafkaData(topics = Set(stress14, stress4, stress6, stress12, stress2, stress1, stress18, stress3, stress10, stress16), data = Range 55 until 57, message = )
     AddKafkaData(topics = Set(stress14, stress4, stress6, stress12, stress2, stress1, stress18, stress3, stress10), data = empty Range 57 until 57, message = Delete topic stress16)
     AddKafkaData(topics = Set(stress14, stress4, stress6, stress12, stress2, stress1, stress18, stress3, stress10), data = Range 57 until 66, message = )
     CheckAnswer: [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66]
     StopStream
     StartStream(ProcessingTimeTrigger(0),org.apache.spark.util.SystemClock@6c98dafc,Map(),null)
     CheckAnswer: [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66]
     AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress1, stress18, stress3, stress10), data = Range 66 until 71, message = Delete topic stress14)
     CheckAnswer: [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71]
     AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress1, stress18, stress3, stress10), data = Range 71 until 75, message = Delete topic stress1)
     AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress1, stress18, stress3, stress10), data = Range 75 until 77, message = )
     AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress1, stress18, stress3, stress10), data = Range 77 until 86, message = Add partition)
     CheckAnswer: [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86]
     StopStream
     AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress20, stress1, stress18, stress3, stress10), data = Range 86 until 93, message = Add topic stress21)
     AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress20, stress1, stress18, stress22, stress3, stress10), data = Range 93 until 102, message = Add topic stress23)
     AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress20, stress1, stress18, stress22, stress3, stress10), data = Range 102 until 103, message = Add partition)
     AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress20, stress1, stress18, stress22, stress3, stress10), data = Range 103 until 104, message = )
     AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress20, stress1, stress18, stress22, stress3, stress10), data = Range 104 until 113, message = )
     StartStream(ProcessingTimeTrigger(0),org.apache.spark.util.SystemClock@7d64fb6b,Map(),null)
     AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress20, stress1, stress18, stress22, stress3, stress10), data = Range 113 until 116, message = Add partition)
     CheckAnswer: [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93],[94],[95],[96],[97],[98],[99],[100],[101],[102],[103],[104],[105],[106],[107],[108],[109],[110],[111],[112],[113],[114],[115],[116]
     StopStream
     AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress20, stress1, stress18, stress22, stress3, stress10), data = Range 116 until 123, message = )
     AddKafkaData(topics = Set(stress24, stress4, stress6, stress12, stress2, stress20, stress1, stress18, stress22, stress3, stress10), data = Range 123 until 125, message = Add topic stress25)
     StartStream(ProcessingTimeTrigger(0),org.apache.spark.util.SystemClock@50d9b521,Map(),null)
     CheckAnswer: [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93],[94],[95],[96],[97],[98],[99],[100],[101],[102],[103],[104],[105],[106],[107],[108],[109],[110],[111],[112],[113],[114],[115],[116],[117],[118],[119],[120],[121],[122],[123],[124],[125]

  == Stream ==
  Output Mode: Append
  Stream state: {KafkaV2[SubscribePattern[stress.*]]: {"stress8":{"2":0,"4":0,"1":0,"3":0,"0":0},"stress10":{"2":2,"5":1,"4":0,"7":0,"1":4,"3":1,"6":1,"0":3},"stress1":{"2":0,"5":0,"4":1,"7":1,"1":2,"3":2,"6":1,"0":8},"stress4":{"8":0,"2":2,"5":0,"4":0,"7":0,"10":0,"1":2,"9":0,"3":0,"6":0,"0":1},"stress3":{"8":0,"2":0,"5":2,"4":2,"7":0,"1":1,"3":1,"6":0,"0":0},"stress6":{"8":0,"2":0,"5":1,"4":0,"7":0,"1":3,"3":1,"6":0,"0":1},"stress12":{"2":0,"1":0,"3":0,"0":0},"stress2":{"8":0,"11":0,"2":0,"5":0,"4":1,"7":0,"1":7,"10":0,"9":0,"3":0,"12":0,"6":0,"0":6}}}
  Thread state: alive
  Thread stack trace: scala.runtime.Statics.anyHash(Statics.java:122)
  scala.collection.immutable.HashMap.elemHashCode(HashMap.scala:87)
  scala.collection.immutable.HashMap.computeHash(HashMap.scala:96)
  scala.collection.immutable.HashMap.$plus(HashMap.scala:65)
  scala.collection.immutable.HashMap.$plus(HashMap.scala:39)
  scala.collection.mutable.MapBuilder.$plus$eq(MapBuilder.scala:32)
  scala.collection.mutable.MapBuilder.$plus$eq(MapBuilder.scala:28)
  scala.collection.generic.Growable.$anonfun$$plus$plus$eq$1(Growable.scala:62)
  scala.collection.generic.Growable$$Lambda$8/681384962.apply(Unknown Source)
  scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
  scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
  scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
  scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
  scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
  scala.collection.mutable.MapBuilder.$plus$plus$eq(MapBuilder.scala:28)
  scala.collection.generic.GenMapFactory.apply(GenMapFactory.scala:51)
  scala.sys.package$.env(package.scala:64)
  org.apache.spark.util.Utils$.isTesting(Utils.scala:1908)
  org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.assertNotAnalysisRule(AnalysisHelper.scala:134)
  org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.assertNotAnalysisRule$(AnalysisHelper.scala:133)
  org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.assertNotAnalysisRule(LogicalPlan.scala:29)
  org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformUp(AnalysisHelper.scala:157)
  org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformUp$(AnalysisHelper.scala:156)
  org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformUp(LogicalPlan.scala:29)
  org.apache.spark.sql.catalyst.optimizer.OptimizeLimitZero$.apply(Optimizer.scala:1802)
  org.apache.spark.sql.catalyst.optimizer.OptimizeLimitZero$.apply(Optimizer.scala:1797)
  org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:149)
  org.apache.spark.sql.catalyst.rules.RuleExecutor$$Lambda$1616/668661558.apply(Unknown Source)
  scala.collection.IndexedSeqOptimized.foldLeft(IndexedSeqOptimized.scala:60)
  scala.collection.IndexedSeqOptimized.foldLeft$(IndexedSeqOptimized.scala:68)
  scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:38)
  org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:146)
  org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:138)
  org.apache.spark.sql.catalyst.rules.RuleExecutor$$Lambda$1615/260639590.apply(Unknown Source)
  scala.collection.immutable.List.foreach(List.scala:392)
  org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:138)
  org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:116)
  org.apache.spark.sql.catalyst.rules.RuleExecutor$$Lambda$1704/154914836.apply(Unknown Source)
  org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
  org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:116)
  org.apache.spark.sql.execution.streaming.IncrementalExecution.$anonfun$optimizedPlan$1(IncrementalExecution.scala:81)
  org.apache.spark.sql.execution.streaming.IncrementalExecution$$Lambda$4150/1689084884.apply(Unknown Source)
  org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
  org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:135)
  org.apache.spark.sql.execution.QueryExecution$$Lambda$1693/650691670.apply(Unknown Source)
  org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
  org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:135)
  org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan$lzycompute(IncrementalExecution.scala:81)
  org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan(IncrementalExecution.scala:79)
  org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:87)
  org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:105)
  org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:102)
  org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$14(MicroBatchExecution.scala:563)
  org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda$4148/1474636575.apply(Unknown Source)
  org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
  org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
  org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
  org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:553)
  org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:223)
  org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda$4069/138532033.apply$mcV$sp(Unknown Source)
  scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
  org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
  org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
  org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
  org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:191)
  org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda$4067/940382466.apply$mcZ$sp(Unknown Source)
  org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
  org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:185)
  org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:334)
  org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:245)


  == Sink ==
  0:
  1: [1] [3] [5] [7] [2] [4] [6] [8]
  2: [10]
  3: [9]
  4:
  5: [11] [14] [12] [13] [15]
  6: [16]
  7: [19] [21] [17] [20] [22] [18] [23]
  8: [30] [33] [25] [27] [32] [37] [35] [29] [34] [36] [28] [31] [24] [26]
  9: [38]


  == Plan ==
  == Parsed Logical Plan ==
  WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@5dacee39
  +- SerializeFromObject [input[0, int, false] AS value#26274]
     +- MapElements org.apache.spark.sql.kafka010.KafkaSourceStressSuite$$Lambda$6404/733551634@78134533, class scala.Tuple2, [StructField(_1,StringType,true), StructField(_2,StringType,true)], obj#26273: int
        +- DeserializeToObject newInstance(class scala.Tuple2), obj#26272: scala.Tuple2
           +- Project [cast(key#26248 as string) AS key#26262, cast(value#26249 as string) AS value#26263]
              +- StreamingDataSourceV2Relation [key#26248, value#26249, topic#26250, partition#26251, offset#26252L, timestamp#26253, timestampType#26254], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@7749904a, KafkaV2[SubscribePattern[stress.*]], {"stress8":{"2":0,"4":0,"1":0,"3":0,"0":0},"stress10":{"2":2,"5":1,"4":0,"7":0,"1":4,"3":1,"6":1,"0":3},"stress1":{"2":0,"5":0,"4":1,"7":1,"1":2,"3":2,"6":1,"0":8},"stress4":{"8":0,"2":2,"5":0,"4":0,"7":0,"10":0,"1":2,"9":0,"3":0,"6":0,"0":1},"stress3":{"8":0,"2":0,"5":2,"4":2,"7":0,"1":1,"3":1,"6":0,"0":0},"stress6":{"8":0,"2":0,"5":1,"4":0,"7":0,"1":3,"3":1,"6":0,"0":1},"stress12":{"2":0,"1":0,"3":0,"0":0},"stress2":{"8":0,"11":0,"2":0,"5":0,"4":1,"7":0,"1":7,"10":0,"9":0,"3":0,"12":0,"6":0,"0":6}}, {"stress10":{"2":2,"5":1,"4":0,"7":0,"1":4,"3":1,"6":1,"0":3},"stress1":{"2":0,"5":0,"4":1,"7":1,"1":2,"3":2,"6":1,"0":8},"stress4":{"8":0,"2":2,"5":0,"4":0,"7":0,"10":0,"1":2,"9":0,"3":0,"6":0,"0":1},"stress3":{"8":0,"2":0,"5":2,"4":2,"7":0,"1":1,"3":1,"6":0,"0":0},"stress6":{"8":1,"2":0,"5":1,"4":0,"7":0,"1":3,"3":1,"6":1,"0":2},"stress12":{"2":0,"1":0,"3":0,"0":0},"stress2":{"8":0,"11":0,"2":0,"5":0,"4":1,"7":0,"1":7,"10":0,"9":0,"3":0,"12":0,"6":0,"0":6}}

  == Analyzed Logical Plan ==

  WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@5dacee39
  +- SerializeFromObject [input[0, int, false] AS value#26274]
     +- MapElements org.apache.spark.sql.kafka010.KafkaSourceStressSuite$$Lambda$6404/733551634@78134533, class scala.Tuple2, [StructField(_1,StringType,true), StructField(_2,StringType,true)], obj#26273: int
        +- DeserializeToObject newInstance(class scala.Tuple2), obj#26272: scala.Tuple2
           +- Project [cast(key#26248 as string) AS key#26262, cast(value#26249 as string) AS value#26263]
              +- StreamingDataSourceV2Relation [key#26248, value#26249, topic#26250, partition#26251, offset#26252L, timestamp#26253, timestampType#26254], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@7749904a, KafkaV2[SubscribePattern[stress.*]], {"stress8":{"2":0,"4":0,"1":0,"3":0,"0":0},"stress10":{"2":2,"5":1,"4":0,"7":0,"1":4,"3":1,"6":1,"0":3},"stress1":{"2":0,"5":0,"4":1,"7":1,"1":2,"3":2,"6":1,"0":8},"stress4":{"8":0,"2":2,"5":0,"4":0,"7":0,"10":0,"1":2,"9":0,"3":0,"6":0,"0":1},"stress3":{"8":0,"2":0,"5":2,"4":2,"7":0,"1":1,"3":1,"6":0,"0":0},"stress6":{"8":0,"2":0,"5":1,"4":0,"7":0,"1":3,"3":1,"6":0,"0":1},"stress12":{"2":0,"1":0,"3":0,"0":0},"stress2":{"8":0,"11":0,"2":0,"5":0,"4":1,"7":0,"1":7,"10":0,"9":0,"3":0,"12":0,"6":0,"0":6}}, {"stress10":{"2":2,"5":1,"4":0,"7":0,"1":4,"3":1,"6":1,"0":3},"stress1":{"2":0,"5":0,"4":1,"7":1,"1":2,"3":2,"6":1,"0":8},"stress4":{"8":0,"2":2,"5":0,"4":0,"7":0,"10":0,"1":2,"9":0,"3":0,"6":0,"0":1},"stress3":{"8":0,"2":0,"5":2,"4":2,"7":0,"1":1,"3":1,"6":0,"0":0},"stress6":{"8":1,"2":0,"5":1,"4":0,"7":0,"1":3,"3":1,"6":1,"0":2},"stress12":{"2":0,"1":0,"3":0,"0":0},"stress2":{"8":0,"11":0,"2":0,"5":0,"4":1,"7":0,"1":7,"10":0,"9":0,"3":0,"12":0,"6":0,"0":6}}

  == Optimized Logical Plan ==
  WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@5dacee39
  +- SerializeFromObject [input[0, int, false] AS value#26274]
     +- MapElements org.apache.spark.sql.kafka010.KafkaSourceStressSuite$$Lambda$6404/733551634@78134533, class scala.Tuple2, [StructField(_1,StringType,true), StructField(_2,StringType,true)], obj#26273: int
        +- DeserializeToObject newInstance(class scala.Tuple2), obj#26272: scala.Tuple2
           +- Project [cast(key#26248 as string) AS key#26262, cast(value#26249 as string) AS value#26263]
              +- StreamingDataSourceV2Relation [key#26248, value#26249, topic#26250, partition#26251, offset#26252L, timestamp#26253, timestampType#26254], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@7749904a, KafkaV2[SubscribePattern[stress.*]], {"stress8":{"2":0,"4":0,"1":0,"3":0,"0":0},"stress10":{"2":2,"5":1,"4":0,"7":0,"1":4,"3":1,"6":1,"0":3},"stress1":{"2":0,"5":0,"4":1,"7":1,"1":2,"3":2,"6":1,"0":8},"stress4":{"8":0,"2":2,"5":0,"4":0,"7":0,"10":0,"1":2,"9":0,"3":0,"6":0,"0":1},"stress3":{"8":0,"2":0,"5":2,"4":2,"7":0,"1":1,"3":1,"6":0,"0":0},"stress6":{"8":0,"2":0,"5":1,"4":0,"7":0,"1":3,"3":1,"6":0,"0":1},"stress12":{"2":0,"1":0,"3":0,"0":0},"stress2":{"8":0,"11":0,"2":0,"5":0,"4":1,"7":0,"1":7,"10":0,"9":0,"3":0,"12":0,"6":0,"0":6}}, {"stress10":{"2":2,"5":1,"4":0,"7":0,"1":4,"3":1,"6":1,"0":3},"stress1":{"2":0,"5":0,"4":1,"7":1,"1":2,"3":2,"6":1,"0":8},"stress4":{"8":0,"2":2,"5":0,"4":0,"7":0,"10":0,"1":2,"9":0,"3":0,"6":0,"0":1},"stress3":{"8":0,"2":0,"5":2,"4":2,"7":0,"1":1,"3":1,"6":0,"0":0},"stress6":{"8":1,"2":0,"5":1,"4":0,"7":0,"1":3,"3":1,"6":1,"0":2},"stress12":{"2":0,"1":0,"3":0,"0":0},"stress2":{"8":0,"11":0,"2":0,"5":0,"4":1,"7":0,"1":7,"10":0,"9":0,"3":0,"12":0,"6":0,"0":6}}

  == Physical Plan ==
  WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@5dacee39
  +- *(1) SerializeFromObject [input[0, int, false] AS value#26274]
     +- *(1) MapElements org.apache.spark.sql.kafka010.KafkaSourceStressSuite$$Lambda$6404/733551634@78134533, obj#26273: int
        +- *(1) DeserializeToObject newInstance(class scala.Tuple2), obj#26272: scala.Tuple2
           +- *(1) Project [cast(key#26248 as string) AS key#26262, cast(value#26249 as string) AS value#26263]
              +- *(1) Project [key#26248, value#26249, topic#26250, partition#26251, offset#26252L, timestamp#26253, timestampType#26254]
                 +- MicroBatchScan[key#26248, value#26249, topic#26250, partition#26251, offset#26252L, timestamp#26253, timestampType#26254] class org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan (StreamTest.scala:452)
  org.scalatest.exceptions.TestFailedException:
  at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
  at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
  at org.scalatest.funsuite.AnyFunSuite.newAssertionFailedException(AnyFunSuite.scala:1562)
  at org.scalatest.Assertions.fail(Assertions.scala:933)
  at org.scalatest.Assertions.fail$(Assertions.scala:929)
  at org.scalatest.funsuite.AnyFunSuite.fail(AnyFunSuite.scala:1562)
  at org.apache.spark.sql.streaming.StreamTest.failTest$1(StreamTest.scala:452)
  at org.apache.spark.sql.streaming.StreamTest.liftedTree1$1(StreamTest.scala:788)
  at org.apache.spark.sql.streaming.StreamTest.testStream(StreamTest.scala:764)
  at org.apache.spark.sql.streaming.StreamTest.testStream$(StreamTest.scala:334)
  at org.apache.spark.sql.kafka010.KafkaSourceTest.testStream(KafkaMicroBatchSourceSuite.scala:53)
  at org.apache.spark.sql.streaming.StreamTest.runStressTest(StreamTest.scala:876)
  at org.apache.spark.sql.streaming.StreamTest.runStressTest$(StreamTest.scala:828)
  at org.apache.spark.sql.kafka010.KafkaSourceTest.runStressTest(KafkaMicroBatchSourceSuite.scala:53)
  at org.apache.spark.sql.kafka010.KafkaSourceStressSuite.$anonfun$new$157(KafkaMicroBatchSourceSuite.scala:1906)
  at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
  at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
  at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
  at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
  at org.scalatest.Transformer.apply(Transformer.scala:22)
  at org.scalatest.Transformer.apply(Transformer.scala:20)
  at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:189)
  at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:158)
  at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:187)
  at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:199)
  at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
  at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:199)
  at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:181)
  at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:60)
  at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)
  at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)
  at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:60)
  at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:232)
  at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
  at scala.collection.immutable.List.foreach(List.scala:392)
  at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
  at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
  at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
  at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:232)
  at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:231)
  at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1562)
  at org.scalatest.Suite.run(Suite.scala:1112)
  at org.scalatest.Suite.run$(Suite.scala:1094)
  at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1562)
  at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:236)
  at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
  at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:236)
  at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:235)
  at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:60)
  at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
  at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
  at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
  at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:60)
  at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:318)
  at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:513)
  at sbt.ForkMain$Run$2.call(ForkMain.java:296)
  at sbt.ForkMain$Run$2.call(ForkMain.java:286)
  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748)
{code}

This seems a bit frequent.

  was:
{{KafkaSourceStressSuite.stress test with multiple topics and partitions}} seems flaky in GitHub Actions build.

{code}
KafkaSourceStressSuite:
- stress test with multiple topics and partitions *** FAILED *** (2 minutes, 7 seconds)
  Timed out waiting for stream: The code passed to failAfter did not complete within 30 seconds.
  java.lang.Thread.getStackTrace(Thread.java:1559)
  	org.scalatest.concurrent.TimeLimits.failAfterImpl(TimeLimits.scala:234)
  	org.scalatest.concurrent.TimeLimits.failAfterImpl$(TimeLimits.scala:233)
  	org.apache.spark.sql.kafka010.KafkaSourceTest.failAfterImpl(KafkaMicroBatchSourceSuite.scala:53)
  	org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:230)
  	org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:229)
  	org.apache.spark.sql.kafka010.KafkaSourceTest.failAfter(KafkaMicroBatchSourceSuite.scala:53)
  	org.apache.spark.sql.streaming.StreamTest.$anonfun$testStream$7(StreamTest.scala:471)
  	org.apache.spark.sql.streaming.StreamTest.$anonfun$testStream$7$adapted(StreamTest.scala:470)
  	scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149)

  	Caused by: 	null
  	java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2156)
  		org.apache.spark.sql.execution.streaming.StreamExecution.awaitOffset(StreamExecution.scala:483)
  		org.apache.spark.sql.streaming.StreamTest.$anonfun$testStream$8(StreamTest.scala:472)
  		scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
  		org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127)
  		org.scalatest.concurrent.TimeLimits.failAfterImpl(TimeLimits.scala:239)
  		org.scalatest.concurrent.TimeLimits.failAfterImpl$(TimeLimits.scala:233)
  		org.apache.spark.sql.kafka010.KafkaSourceTest.failAfterImpl(KafkaMicroBatchSourceSuite.scala:53)
  		org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:230)
  		org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:229)

  == Progress ==
     AssertOnQuery(<condition>, )
     AddKafkaData(topics = Set(stress4, stress2, stress1, stress5, stress3), data = empty Range 0 until 0, message = )
     CheckAnswer:
     StopStream
     AddKafkaData(topics = Set(stress4, stress2, stress1, stress5, stress3), data = Range 0 until 8, message = )
     StartStream(ProcessingTimeTrigger(0),org.apache.spark.util.SystemClock@7dce5824,Map(),null)
     CheckAnswer: [1],[2],[3],[4],[5],[6],[7],[8]
     CheckAnswer: [1],[2],[3],[4],[5],[6],[7],[8]
     StopStream
     StartStream(ProcessingTimeTrigger(0),org.apache.spark.util.SystemClock@7255955e,Map(),null)
     AddKafkaData(topics = Set(stress4, stress6, stress2, stress1, stress5, stress3), data = Range 8 until 9, message = Add topic stress7)
     AddKafkaData(topics = Set(stress4, stress6, stress2, stress1, stress5, stress3), data = Range 9 until 10, message = )
     AddKafkaData(topics = Set(stress4, stress6, stress2, stress1, stress5, stress3), data = Range 10 until 15, message = Add partition)
     AddKafkaData(topics = Set(stress4, stress6, stress2, stress8, stress1, stress5, stress3), data = empty Range 15 until 15, message = Add topic stress9)
     AddKafkaData(topics = Set(stress4, stress6, stress2, stress8, stress1, stress3), data = Range 15 until 16, message = Delete topic stress5)
     AddKafkaData(topics = Set(stress4, stress6, stress2, stress8, stress1, stress3, stress10), data = Range 16 until 23, message = Add topic stress11)
     CheckAnswer: [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23]
     StopStream
     AddKafkaData(topics = Set(stress4, stress6, stress2, stress8, stress1, stress3, stress10), data = Range 23 until 32, message = Add partition)
     AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress8, stress1, stress3, stress10), data = Range 32 until 37, message = Add topic stress13)
     StartStream(ProcessingTimeTrigger(0),org.apache.spark.util.SystemClock@73a476e3,Map(),null)
     AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress8, stress1, stress3, stress10), data = Range 37 until 38, message = )
     AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress1, stress3, stress10), data = Range 38 until 41, message = Delete topic stress8)
  => CheckAnswer: [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41]
     StopStream
     AddKafkaData(topics = Set(stress14, stress4, stress6, stress12, stress2, stress1, stress3, stress10), data = empty Range 41 until 41, message = Add topic stress15)
     AddKafkaData(topics = Set(stress14, stress4, stress6, stress12, stress2, stress1, stress3, stress10), data = Range 41 until 43, message = )
     AddKafkaData(topics = Set(stress14, stress4, stress6, stress12, stress2, stress1, stress3, stress10, stress16), data = Range 43 until 48, message = Add topic stress17)
     StartStream(ProcessingTimeTrigger(0),org.apache.spark.util.SystemClock@46dd79a7,Map(),null)
     AddKafkaData(topics = Set(stress14, stress4, stress6, stress12, stress2, stress1, stress18, stress3, stress10, stress16), data = Range 48 until 52, message = Add topic stress19)
     CheckAnswer: [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52]
     StopStream
     StartStream(ProcessingTimeTrigger(0),org.apache.spark.util.SystemClock@6b1792a,Map(),null)
     AddKafkaData(topics = Set(stress14, stress4, stress6, stress12, stress2, stress1, stress18, stress3, stress10, stress16), data = Range 52 until 55, message = )
     AddKafkaData(topics = Set(stress14, stress4, stress6, stress12, stress2, stress1, stress18, stress3, stress10, stress16), data = Range 55 until 57, message = )
     AddKafkaData(topics = Set(stress14, stress4, stress6, stress12, stress2, stress1, stress18, stress3, stress10), data = empty Range 57 until 57, message = Delete topic stress16)
     AddKafkaData(topics = Set(stress14, stress4, stress6, stress12, stress2, stress1, stress18, stress3, stress10), data = Range 57 until 66, message = )
     CheckAnswer: [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66]
     StopStream
     StartStream(ProcessingTimeTrigger(0),org.apache.spark.util.SystemClock@6c98dafc,Map(),null)
     CheckAnswer: [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66]
     AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress1, stress18, stress3, stress10), data = Range 66 until 71, message = Delete topic stress14)
     CheckAnswer: [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71]
     AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress1, stress18, stress3, stress10), data = Range 71 until 75, message = Delete topic stress1)
     AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress1, stress18, stress3, stress10), data = Range 75 until 77, message = )
     AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress1, stress18, stress3, stress10), data = Range 77 until 86, message = Add partition)
     CheckAnswer: [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86]
     StopStream
     AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress20, stress1, stress18, stress3, stress10), data = Range 86 until 93, message = Add topic stress21)
     AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress20, stress1, stress18, stress22, stress3, stress10), data = Range 93 until 102, message = Add topic stress23)
     AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress20, stress1, stress18, stress22, stress3, stress10), data = Range 102 until 103, message = Add partition)
     AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress20, stress1, stress18, stress22, stress3, stress10), data = Range 103 until 104, message = )
     AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress20, stress1, stress18, stress22, stress3, stress10), data = Range 104 until 113, message = )
     StartStream(ProcessingTimeTrigger(0),org.apache.spark.util.SystemClock@7d64fb6b,Map(),null)
     AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress20, stress1, stress18, stress22, stress3, stress10), data = Range 113 until 116, message = Add partition)
     CheckAnswer: [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93],[94],[95],[96],[97],[98],[99],[100],[101],[102],[103],[104],[105],[106],[107],[108],[109],[110],[111],[112],[113],[114],[115],[116]
     StopStream
     AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress20, stress1, stress18, stress22, stress3, stress10), data = Range 116 until 123, message = )
     AddKafkaData(topics = Set(stress24, stress4, stress6, stress12, stress2, stress20, stress1, stress18, stress22, stress3, stress10), data = Range 123 until 125, message = Add topic stress25)
     StartStream(ProcessingTimeTrigger(0),org.apache.spark.util.SystemClock@50d9b521,Map(),null)
     CheckAnswer: [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93],[94],[95],[96],[97],[98],[99],[100],[101],[102],[103],[104],[105],[106],[107],[108],[109],[110],[111],[112],[113],[114],[115],[116],[117],[118],[119],[120],[121],[122],[123],[124],[125]

  == Stream ==
  Output Mode: Append
  Stream state: {KafkaV2[SubscribePattern[stress.*]]: {"stress8":{"2":0,"4":0,"1":0,"3":0,"0":0},"stress10":{"2":2,"5":1,"4":0,"7":0,"1":4,"3":1,"6":1,"0":3},"stress1":{"2":0,"5":0,"4":1,"7":1,"1":2,"3":2,"6":1,"0":8},"stress4":{"8":0,"2":2,"5":0,"4":0,"7":0,"10":0,"1":2,"9":0,"3":0,"6":0,"0":1},"stress3":{"8":0,"2":0,"5":2,"4":2,"7":0,"1":1,"3":1,"6":0,"0":0},"stress6":{"8":0,"2":0,"5":1,"4":0,"7":0,"1":3,"3":1,"6":0,"0":1},"stress12":{"2":0,"1":0,"3":0,"0":0},"stress2":{"8":0,"11":0,"2":0,"5":0,"4":1,"7":0,"1":7,"10":0,"9":0,"3":0,"12":0,"6":0,"0":6}}}
  Thread state: alive
  Thread stack trace: scala.runtime.Statics.anyHash(Statics.java:122)
  scala.collection.immutable.HashMap.elemHashCode(HashMap.scala:87)
  scala.collection.immutable.HashMap.computeHash(HashMap.scala:96)
  scala.collection.immutable.HashMap.$plus(HashMap.scala:65)
  scala.collection.immutable.HashMap.$plus(HashMap.scala:39)
  scala.collection.mutable.MapBuilder.$plus$eq(MapBuilder.scala:32)
  scala.collection.mutable.MapBuilder.$plus$eq(MapBuilder.scala:28)
  scala.collection.generic.Growable.$anonfun$$plus$plus$eq$1(Growable.scala:62)
  scala.collection.generic.Growable$$Lambda$8/681384962.apply(Unknown Source)
  scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
  scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
  scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
  scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
  scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
  scala.collection.mutable.MapBuilder.$plus$plus$eq(MapBuilder.scala:28)
  scala.collection.generic.GenMapFactory.apply(GenMapFactory.scala:51)
  scala.sys.package$.env(package.scala:64)
  org.apache.spark.util.Utils$.isTesting(Utils.scala:1908)
  org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.assertNotAnalysisRule(AnalysisHelper.scala:134)
  org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.assertNotAnalysisRule$(AnalysisHelper.scala:133)
  org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.assertNotAnalysisRule(LogicalPlan.scala:29)
  org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformUp(AnalysisHelper.scala:157)
  org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformUp$(AnalysisHelper.scala:156)
  org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformUp(LogicalPlan.scala:29)
  org.apache.spark.sql.catalyst.optimizer.OptimizeLimitZero$.apply(Optimizer.scala:1802)
  org.apache.spark.sql.catalyst.optimizer.OptimizeLimitZero$.apply(Optimizer.scala:1797)
  org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:149)
  org.apache.spark.sql.catalyst.rules.RuleExecutor$$Lambda$1616/668661558.apply(Unknown Source)
  scala.collection.IndexedSeqOptimized.foldLeft(IndexedSeqOptimized.scala:60)
  scala.collection.IndexedSeqOptimized.foldLeft$(IndexedSeqOptimized.scala:68)
  scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:38)
  org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:146)
  org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:138)
  org.apache.spark.sql.catalyst.rules.RuleExecutor$$Lambda$1615/260639590.apply(Unknown Source)
  scala.collection.immutable.List.foreach(List.scala:392)
  org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:138)
  org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:116)
  org.apache.spark.sql.catalyst.rules.RuleExecutor$$Lambda$1704/154914836.apply(Unknown Source)
  org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
  org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:116)
  org.apache.spark.sql.execution.streaming.IncrementalExecution.$anonfun$optimizedPlan$1(IncrementalExecution.scala:81)
  org.apache.spark.sql.execution.streaming.IncrementalExecution$$Lambda$4150/1689084884.apply(Unknown Source)
  org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
  org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:135)
  org.apache.spark.sql.execution.QueryExecution$$Lambda$1693/650691670.apply(Unknown Source)
  org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
  org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:135)
  org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan$lzycompute(IncrementalExecution.scala:81)
  org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan(IncrementalExecution.scala:79)
  org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:87)
  org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:105)
  org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:102)
  org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$14(MicroBatchExecution.scala:563)
  org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda$4148/1474636575.apply(Unknown Source)
  org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
  org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
  org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
  org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:553)
  org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:223)
  org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda$4069/138532033.apply$mcV$sp(Unknown Source)
  scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
  org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
  org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
  org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
  org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:191)
  org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda$4067/940382466.apply$mcZ$sp(Unknown Source)
  org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
  org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:185)
  org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:334)
  org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:245)


  == Sink ==
  0:
  1: [1] [3] [5] [7] [2] [4] [6] [8]
  2: [10]
  3: [9]
  4:
  5: [11] [14] [12] [13] [15]
  6: [16]
  7: [19] [21] [17] [20] [22] [18] [23]
  8: [30] [33] [25] [27] [32] [37] [35] [29] [34] [36] [28] [31] [24] [26]
  9: [38]


  == Plan ==
  == Parsed Logical Plan ==
  WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@5dacee39
  +- SerializeFromObject [input[0, int, false] AS value#26274]
     +- MapElements org.apache.spark.sql.kafka010.KafkaSourceStressSuite$$Lambda$6404/733551634@78134533, class scala.Tuple2, [StructField(_1,StringType,true), StructField(_2,StringType,true)], obj#26273: int
        +- DeserializeToObject newInstance(class scala.Tuple2), obj#26272: scala.Tuple2
           +- Project [cast(key#26248 as string) AS key#26262, cast(value#26249 as string) AS value#26263]
              +- StreamingDataSourceV2Relation [key#26248, value#26249, topic#26250, partition#26251, offset#26252L, timestamp#26253, timestampType#26254], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@7749904a, KafkaV2[SubscribePattern[stress.*]], {"stress8":{"2":0,"4":0,"1":0,"3":0,"0":0},"stress10":{"2":2,"5":1,"4":0,"7":0,"1":4,"3":1,"6":1,"0":3},"stress1":{"2":0,"5":0,"4":1,"7":1,"1":2,"3":2,"6":1,"0":8},"stress4":{"8":0,"2":2,"5":0,"4":0,"7":0,"10":0,"1":2,"9":0,"3":0,"6":0,"0":1},"stress3":{"8":0,"2":0,"5":2,"4":2,"7":0,"1":1,"3":1,"6":0,"0":0},"stress6":{"8":0,"2":0,"5":1,"4":0,"7":0,"1":3,"3":1,"6":0,"0":1},"stress12":{"2":0,"1":0,"3":0,"0":0},"stress2":{"8":0,"11":0,"2":0,"5":0,"4":1,"7":0,"1":7,"10":0,"9":0,"3":0,"12":0,"6":0,"0":6}}, {"stress10":{"2":2,"5":1,"4":0,"7":0,"1":4,"3":1,"6":1,"0":3},"stress1":{"2":0,"5":0,"4":1,"7":1,"1":2,"3":2,"6":1,"0":8},"stress4":{"8":0,"2":2,"5":0,"4":0,"7":0,"10":0,"1":2,"9":0,"3":0,"6":0,"0":1},"stress3":{"8":0,"2":0,"5":2,"4":2,"7":0,"1":1,"3":1,"6":0,"0":0},"stress6":{"8":1,"2":0,"5":1,"4":0,"7":0,"1":3,"3":1,"6":1,"0":2},"stress12":{"2":0,"1":0,"3":0,"0":0},"stress2":{"8":0,"11":0,"2":0,"5":0,"4":1,"7":0,"1":7,"10":0,"9":0,"3":0,"12":0,"6":0,"0":6}}

  == Analyzed Logical Plan ==

  WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@5dacee39
  +- SerializeFromObject [input[0, int, false] AS value#26274]
     +- MapElements org.apache.spark.sql.kafka010.KafkaSourceStressSuite$$Lambda$6404/733551634@78134533, class scala.Tuple2, [StructField(_1,StringType,true), StructField(_2,StringType,true)], obj#26273: int
        +- DeserializeToObject newInstance(class scala.Tuple2), obj#26272: scala.Tuple2
           +- Project [cast(key#26248 as string) AS key#26262, cast(value#26249 as string) AS value#26263]
              +- StreamingDataSourceV2Relation [key#26248, value#26249, topic#26250, partition#26251, offset#26252L, timestamp#26253, timestampType#26254], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@7749904a, KafkaV2[SubscribePattern[stress.*]], {"stress8":{"2":0,"4":0,"1":0,"3":0,"0":0},"stress10":{"2":2,"5":1,"4":0,"7":0,"1":4,"3":1,"6":1,"0":3},"stress1":{"2":0,"5":0,"4":1,"7":1,"1":2,"3":2,"6":1,"0":8},"stress4":{"8":0,"2":2,"5":0,"4":0,"7":0,"10":0,"1":2,"9":0,"3":0,"6":0,"0":1},"stress3":{"8":0,"2":0,"5":2,"4":2,"7":0,"1":1,"3":1,"6":0,"0":0},"stress6":{"8":0,"2":0,"5":1,"4":0,"7":0,"1":3,"3":1,"6":0,"0":1},"stress12":{"2":0,"1":0,"3":0,"0":0},"stress2":{"8":0,"11":0,"2":0,"5":0,"4":1,"7":0,"1":7,"10":0,"9":0,"3":0,"12":0,"6":0,"0":6}}, {"stress10":{"2":2,"5":1,"4":0,"7":0,"1":4,"3":1,"6":1,"0":3},"stress1":{"2":0,"5":0,"4":1,"7":1,"1":2,"3":2,"6":1,"0":8},"stress4":{"8":0,"2":2,"5":0,"4":0,"7":0,"10":0,"1":2,"9":0,"3":0,"6":0,"0":1},"stress3":{"8":0,"2":0,"5":2,"4":2,"7":0,"1":1,"3":1,"6":0,"0":0},"stress6":{"8":1,"2":0,"5":1,"4":0,"7":0,"1":3,"3":1,"6":1,"0":2},"stress12":{"2":0,"1":0,"3":0,"0":0},"stress2":{"8":0,"11":0,"2":0,"5":0,"4":1,"7":0,"1":7,"10":0,"9":0,"3":0,"12":0,"6":0,"0":6}}

  == Optimized Logical Plan ==
  WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@5dacee39
  +- SerializeFromObject [input[0, int, false] AS value#26274]
     +- MapElements org.apache.spark.sql.kafka010.KafkaSourceStressSuite$$Lambda$6404/733551634@78134533, class scala.Tuple2, [StructField(_1,StringType,true), StructField(_2,StringType,true)], obj#26273: int
        +- DeserializeToObject newInstance(class scala.Tuple2), obj#26272: scala.Tuple2
           +- Project [cast(key#26248 as string) AS key#26262, cast(value#26249 as string) AS value#26263]
              +- StreamingDataSourceV2Relation [key#26248, value#26249, topic#26250, partition#26251, offset#26252L, timestamp#26253, timestampType#26254], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@7749904a, KafkaV2[SubscribePattern[stress.*]], {"stress8":{"2":0,"4":0,"1":0,"3":0,"0":0},"stress10":{"2":2,"5":1,"4":0,"7":0,"1":4,"3":1,"6":1,"0":3},"stress1":{"2":0,"5":0,"4":1,"7":1,"1":2,"3":2,"6":1,"0":8},"stress4":{"8":0,"2":2,"5":0,"4":0,"7":0,"10":0,"1":2,"9":0,"3":0,"6":0,"0":1},"stress3":{"8":0,"2":0,"5":2,"4":2,"7":0,"1":1,"3":1,"6":0,"0":0},"stress6":{"8":0,"2":0,"5":1,"4":0,"7":0,"1":3,"3":1,"6":0,"0":1},"stress12":{"2":0,"1":0,"3":0,"0":0},"stress2":{"8":0,"11":0,"2":0,"5":0,"4":1,"7":0,"1":7,"10":0,"9":0,"3":0,"12":0,"6":0,"0":6}}, {"stress10":{"2":2,"5":1,"4":0,"7":0,"1":4,"3":1,"6":1,"0":3},"stress1":{"2":0,"5":0,"4":1,"7":1,"1":2,"3":2,"6":1,"0":8},"stress4":{"8":0,"2":2,"5":0,"4":0,"7":0,"10":0,"1":2,"9":0,"3":0,"6":0,"0":1},"stress3":{"8":0,"2":0,"5":2,"4":2,"7":0,"1":1,"3":1,"6":0,"0":0},"stress6":{"8":1,"2":0,"5":1,"4":0,"7":0,"1":3,"3":1,"6":1,"0":2},"stress12":{"2":0,"1":0,"3":0,"0":0},"stress2":{"8":0,"11":0,"2":0,"5":0,"4":1,"7":0,"1":7,"10":0,"9":0,"3":0,"12":0,"6":0,"0":6}}

  == Physical Plan ==
  WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@5dacee39
  +- *(1) SerializeFromObject [input[0, int, false] AS value#26274]
     +- *(1) MapElements org.apache.spark.sql.kafka010.KafkaSourceStressSuite$$Lambda$6404/733551634@78134533, obj#26273: int
        +- *(1) DeserializeToObject newInstance(class scala.Tuple2), obj#26272: scala.Tuple2
           +- *(1) Project [cast(key#26248 as string) AS key#26262, cast(value#26249 as string) AS value#26263]
              +- *(1) Project [key#26248, value#26249, topic#26250, partition#26251, offset#26252L, timestamp#26253, timestampType#26254]
                 +- MicroBatchScan[key#26248, value#26249, topic#26250, partition#26251, offset#26252L, timestamp#26253, timestampType#26254] class org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan (StreamTest.scala:452)
  org.scalatest.exceptions.TestFailedException:
  at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
  at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
  at org.scalatest.funsuite.AnyFunSuite.newAssertionFailedException(AnyFunSuite.scala:1562)
  at org.scalatest.Assertions.fail(Assertions.scala:933)
  at org.scalatest.Assertions.fail$(Assertions.scala:929)
  at org.scalatest.funsuite.AnyFunSuite.fail(AnyFunSuite.scala:1562)
  at org.apache.spark.sql.streaming.StreamTest.failTest$1(StreamTest.scala:452)
  at org.apache.spark.sql.streaming.StreamTest.liftedTree1$1(StreamTest.scala:788)
  at org.apache.spark.sql.streaming.StreamTest.testStream(StreamTest.scala:764)
  at org.apache.spark.sql.streaming.StreamTest.testStream$(StreamTest.scala:334)
  at org.apache.spark.sql.kafka010.KafkaSourceTest.testStream(KafkaMicroBatchSourceSuite.scala:53)
  at org.apache.spark.sql.streaming.StreamTest.runStressTest(StreamTest.scala:876)
  at org.apache.spark.sql.streaming.StreamTest.runStressTest$(StreamTest.scala:828)
  at org.apache.spark.sql.kafka010.KafkaSourceTest.runStressTest(KafkaMicroBatchSourceSuite.scala:53)
  at org.apache.spark.sql.kafka010.KafkaSourceStressSuite.$anonfun$new$157(KafkaMicroBatchSourceSuite.scala:1906)
  at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
  at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
  at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
  at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
  at org.scalatest.Transformer.apply(Transformer.scala:22)
  at org.scalatest.Transformer.apply(Transformer.scala:20)
  at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:189)
  at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:158)
  at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:187)
  at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:199)
  at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
  at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:199)
  at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:181)
  at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:60)
  at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)
  at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)
  at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:60)
  at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:232)
  at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
  at scala.collection.immutable.List.foreach(List.scala:392)
  at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
  at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
  at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
  at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:232)
  at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:231)
  at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1562)
  at org.scalatest.Suite.run(Suite.scala:1112)
  at org.scalatest.Suite.run$(Suite.scala:1094)
  at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1562)
  at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:236)
  at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
  at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:236)
  at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:235)
  at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:60)
  at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
  at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
  at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
  at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:60)
  at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:318)
  at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:513)
  at sbt.ForkMain$Run$2.call(ForkMain.java:296)
  at sbt.ForkMain$Run$2.call(ForkMain.java:286)
  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748)
{code}


> Flaky Test: KafkaSourceStressSuite.stress test with multiple topics and partitions
> ----------------------------------------------------------------------------------
>
>                 Key: SPARK-32520
>                 URL: https://issues.apache.org/jira/browse/SPARK-32520
>             Project: Spark
>          Issue Type: Sub-task
>          Components: Structured Streaming, Tests
>    Affects Versions: 3.1.0
>            Reporter: Hyukjin Kwon
>            Priority: Major
>
> {{KafkaSourceStressSuite.stress test with multiple topics and partitions}} seems flaky in GitHub Actions build:
> https://github.com/apache/spark/pull/29335/checks?check_run_id=940205463
> {code}
> KafkaSourceStressSuite:
> - stress test with multiple topics and partitions *** FAILED *** (2 minutes, 7 seconds)
>   Timed out waiting for stream: The code passed to failAfter did not complete within 30 seconds.
>   java.lang.Thread.getStackTrace(Thread.java:1559)
>   	org.scalatest.concurrent.TimeLimits.failAfterImpl(TimeLimits.scala:234)
>   	org.scalatest.concurrent.TimeLimits.failAfterImpl$(TimeLimits.scala:233)
>   	org.apache.spark.sql.kafka010.KafkaSourceTest.failAfterImpl(KafkaMicroBatchSourceSuite.scala:53)
>   	org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:230)
>   	org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:229)
>   	org.apache.spark.sql.kafka010.KafkaSourceTest.failAfter(KafkaMicroBatchSourceSuite.scala:53)
>   	org.apache.spark.sql.streaming.StreamTest.$anonfun$testStream$7(StreamTest.scala:471)
>   	org.apache.spark.sql.streaming.StreamTest.$anonfun$testStream$7$adapted(StreamTest.scala:470)
>   	scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149)
>   	Caused by: 	null
>   	java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2156)
>   		org.apache.spark.sql.execution.streaming.StreamExecution.awaitOffset(StreamExecution.scala:483)
>   		org.apache.spark.sql.streaming.StreamTest.$anonfun$testStream$8(StreamTest.scala:472)
>   		scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>   		org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127)
>   		org.scalatest.concurrent.TimeLimits.failAfterImpl(TimeLimits.scala:239)
>   		org.scalatest.concurrent.TimeLimits.failAfterImpl$(TimeLimits.scala:233)
>   		org.apache.spark.sql.kafka010.KafkaSourceTest.failAfterImpl(KafkaMicroBatchSourceSuite.scala:53)
>   		org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:230)
>   		org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:229)
>   == Progress ==
>      AssertOnQuery(<condition>, )
>      AddKafkaData(topics = Set(stress4, stress2, stress1, stress5, stress3), data = empty Range 0 until 0, message = )
>      CheckAnswer:
>      StopStream
>      AddKafkaData(topics = Set(stress4, stress2, stress1, stress5, stress3), data = Range 0 until 8, message = )
>      StartStream(ProcessingTimeTrigger(0),org.apache.spark.util.SystemClock@7dce5824,Map(),null)
>      CheckAnswer: [1],[2],[3],[4],[5],[6],[7],[8]
>      CheckAnswer: [1],[2],[3],[4],[5],[6],[7],[8]
>      StopStream
>      StartStream(ProcessingTimeTrigger(0),org.apache.spark.util.SystemClock@7255955e,Map(),null)
>      AddKafkaData(topics = Set(stress4, stress6, stress2, stress1, stress5, stress3), data = Range 8 until 9, message = Add topic stress7)
>      AddKafkaData(topics = Set(stress4, stress6, stress2, stress1, stress5, stress3), data = Range 9 until 10, message = )
>      AddKafkaData(topics = Set(stress4, stress6, stress2, stress1, stress5, stress3), data = Range 10 until 15, message = Add partition)
>      AddKafkaData(topics = Set(stress4, stress6, stress2, stress8, stress1, stress5, stress3), data = empty Range 15 until 15, message = Add topic stress9)
>      AddKafkaData(topics = Set(stress4, stress6, stress2, stress8, stress1, stress3), data = Range 15 until 16, message = Delete topic stress5)
>      AddKafkaData(topics = Set(stress4, stress6, stress2, stress8, stress1, stress3, stress10), data = Range 16 until 23, message = Add topic stress11)
>      CheckAnswer: [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23]
>      StopStream
>      AddKafkaData(topics = Set(stress4, stress6, stress2, stress8, stress1, stress3, stress10), data = Range 23 until 32, message = Add partition)
>      AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress8, stress1, stress3, stress10), data = Range 32 until 37, message = Add topic stress13)
>      StartStream(ProcessingTimeTrigger(0),org.apache.spark.util.SystemClock@73a476e3,Map(),null)
>      AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress8, stress1, stress3, stress10), data = Range 37 until 38, message = )
>      AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress1, stress3, stress10), data = Range 38 until 41, message = Delete topic stress8)
>   => CheckAnswer: [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41]
>      StopStream
>      AddKafkaData(topics = Set(stress14, stress4, stress6, stress12, stress2, stress1, stress3, stress10), data = empty Range 41 until 41, message = Add topic stress15)
>      AddKafkaData(topics = Set(stress14, stress4, stress6, stress12, stress2, stress1, stress3, stress10), data = Range 41 until 43, message = )
>      AddKafkaData(topics = Set(stress14, stress4, stress6, stress12, stress2, stress1, stress3, stress10, stress16), data = Range 43 until 48, message = Add topic stress17)
>      StartStream(ProcessingTimeTrigger(0),org.apache.spark.util.SystemClock@46dd79a7,Map(),null)
>      AddKafkaData(topics = Set(stress14, stress4, stress6, stress12, stress2, stress1, stress18, stress3, stress10, stress16), data = Range 48 until 52, message = Add topic stress19)
>      CheckAnswer: [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52]
>      StopStream
>      StartStream(ProcessingTimeTrigger(0),org.apache.spark.util.SystemClock@6b1792a,Map(),null)
>      AddKafkaData(topics = Set(stress14, stress4, stress6, stress12, stress2, stress1, stress18, stress3, stress10, stress16), data = Range 52 until 55, message = )
>      AddKafkaData(topics = Set(stress14, stress4, stress6, stress12, stress2, stress1, stress18, stress3, stress10, stress16), data = Range 55 until 57, message = )
>      AddKafkaData(topics = Set(stress14, stress4, stress6, stress12, stress2, stress1, stress18, stress3, stress10), data = empty Range 57 until 57, message = Delete topic stress16)
>      AddKafkaData(topics = Set(stress14, stress4, stress6, stress12, stress2, stress1, stress18, stress3, stress10), data = Range 57 until 66, message = )
>      CheckAnswer: [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66]
>      StopStream
>      StartStream(ProcessingTimeTrigger(0),org.apache.spark.util.SystemClock@6c98dafc,Map(),null)
>      CheckAnswer: [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66]
>      AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress1, stress18, stress3, stress10), data = Range 66 until 71, message = Delete topic stress14)
>      CheckAnswer: [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71]
>      AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress1, stress18, stress3, stress10), data = Range 71 until 75, message = Delete topic stress1)
>      AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress1, stress18, stress3, stress10), data = Range 75 until 77, message = )
>      AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress1, stress18, stress3, stress10), data = Range 77 until 86, message = Add partition)
>      CheckAnswer: [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86]
>      StopStream
>      AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress20, stress1, stress18, stress3, stress10), data = Range 86 until 93, message = Add topic stress21)
>      AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress20, stress1, stress18, stress22, stress3, stress10), data = Range 93 until 102, message = Add topic stress23)
>      AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress20, stress1, stress18, stress22, stress3, stress10), data = Range 102 until 103, message = Add partition)
>      AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress20, stress1, stress18, stress22, stress3, stress10), data = Range 103 until 104, message = )
>      AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress20, stress1, stress18, stress22, stress3, stress10), data = Range 104 until 113, message = )
>      StartStream(ProcessingTimeTrigger(0),org.apache.spark.util.SystemClock@7d64fb6b,Map(),null)
>      AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress20, stress1, stress18, stress22, stress3, stress10), data = Range 113 until 116, message = Add partition)
>      CheckAnswer: [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93],[94],[95],[96],[97],[98],[99],[100],[101],[102],[103],[104],[105],[106],[107],[108],[109],[110],[111],[112],[113],[114],[115],[116]
>      StopStream
>      AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress20, stress1, stress18, stress22, stress3, stress10), data = Range 116 until 123, message = )
>      AddKafkaData(topics = Set(stress24, stress4, stress6, stress12, stress2, stress20, stress1, stress18, stress22, stress3, stress10), data = Range 123 until 125, message = Add topic stress25)
>      StartStream(ProcessingTimeTrigger(0),org.apache.spark.util.SystemClock@50d9b521,Map(),null)
>      CheckAnswer: [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93],[94],[95],[96],[97],[98],[99],[100],[101],[102],[103],[104],[105],[106],[107],[108],[109],[110],[111],[112],[113],[114],[115],[116],[117],[118],[119],[120],[121],[122],[123],[124],[125]
>   == Stream ==
>   Output Mode: Append
>   Stream state: {KafkaV2[SubscribePattern[stress.*]]: {"stress8":{"2":0,"4":0,"1":0,"3":0,"0":0},"stress10":{"2":2,"5":1,"4":0,"7":0,"1":4,"3":1,"6":1,"0":3},"stress1":{"2":0,"5":0,"4":1,"7":1,"1":2,"3":2,"6":1,"0":8},"stress4":{"8":0,"2":2,"5":0,"4":0,"7":0,"10":0,"1":2,"9":0,"3":0,"6":0,"0":1},"stress3":{"8":0,"2":0,"5":2,"4":2,"7":0,"1":1,"3":1,"6":0,"0":0},"stress6":{"8":0,"2":0,"5":1,"4":0,"7":0,"1":3,"3":1,"6":0,"0":1},"stress12":{"2":0,"1":0,"3":0,"0":0},"stress2":{"8":0,"11":0,"2":0,"5":0,"4":1,"7":0,"1":7,"10":0,"9":0,"3":0,"12":0,"6":0,"0":6}}}
>   Thread state: alive
>   Thread stack trace: scala.runtime.Statics.anyHash(Statics.java:122)
>   scala.collection.immutable.HashMap.elemHashCode(HashMap.scala:87)
>   scala.collection.immutable.HashMap.computeHash(HashMap.scala:96)
>   scala.collection.immutable.HashMap.$plus(HashMap.scala:65)
>   scala.collection.immutable.HashMap.$plus(HashMap.scala:39)
>   scala.collection.mutable.MapBuilder.$plus$eq(MapBuilder.scala:32)
>   scala.collection.mutable.MapBuilder.$plus$eq(MapBuilder.scala:28)
>   scala.collection.generic.Growable.$anonfun$$plus$plus$eq$1(Growable.scala:62)
>   scala.collection.generic.Growable$$Lambda$8/681384962.apply(Unknown Source)
>   scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
>   scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
>   scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
>   scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
>   scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
>   scala.collection.mutable.MapBuilder.$plus$plus$eq(MapBuilder.scala:28)
>   scala.collection.generic.GenMapFactory.apply(GenMapFactory.scala:51)
>   scala.sys.package$.env(package.scala:64)
>   org.apache.spark.util.Utils$.isTesting(Utils.scala:1908)
>   org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.assertNotAnalysisRule(AnalysisHelper.scala:134)
>   org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.assertNotAnalysisRule$(AnalysisHelper.scala:133)
>   org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.assertNotAnalysisRule(LogicalPlan.scala:29)
>   org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformUp(AnalysisHelper.scala:157)
>   org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformUp$(AnalysisHelper.scala:156)
>   org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformUp(LogicalPlan.scala:29)
>   org.apache.spark.sql.catalyst.optimizer.OptimizeLimitZero$.apply(Optimizer.scala:1802)
>   org.apache.spark.sql.catalyst.optimizer.OptimizeLimitZero$.apply(Optimizer.scala:1797)
>   org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:149)
>   org.apache.spark.sql.catalyst.rules.RuleExecutor$$Lambda$1616/668661558.apply(Unknown Source)
>   scala.collection.IndexedSeqOptimized.foldLeft(IndexedSeqOptimized.scala:60)
>   scala.collection.IndexedSeqOptimized.foldLeft$(IndexedSeqOptimized.scala:68)
>   scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:38)
>   org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:146)
>   org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:138)
>   org.apache.spark.sql.catalyst.rules.RuleExecutor$$Lambda$1615/260639590.apply(Unknown Source)
>   scala.collection.immutable.List.foreach(List.scala:392)
>   org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:138)
>   org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:116)
>   org.apache.spark.sql.catalyst.rules.RuleExecutor$$Lambda$1704/154914836.apply(Unknown Source)
>   org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
>   org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:116)
>   org.apache.spark.sql.execution.streaming.IncrementalExecution.$anonfun$optimizedPlan$1(IncrementalExecution.scala:81)
>   org.apache.spark.sql.execution.streaming.IncrementalExecution$$Lambda$4150/1689084884.apply(Unknown Source)
>   org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
>   org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:135)
>   org.apache.spark.sql.execution.QueryExecution$$Lambda$1693/650691670.apply(Unknown Source)
>   org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
>   org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:135)
>   org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan$lzycompute(IncrementalExecution.scala:81)
>   org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan(IncrementalExecution.scala:79)
>   org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:87)
>   org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:105)
>   org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:102)
>   org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$14(MicroBatchExecution.scala:563)
>   org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda$4148/1474636575.apply(Unknown Source)
>   org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
>   org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
>   org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
>   org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:553)
>   org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:223)
>   org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda$4069/138532033.apply$mcV$sp(Unknown Source)
>   scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>   org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
>   org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
>   org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
>   org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:191)
>   org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda$4067/940382466.apply$mcZ$sp(Unknown Source)
>   org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
>   org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:185)
>   org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:334)
>   org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:245)
>   == Sink ==
>   0:
>   1: [1] [3] [5] [7] [2] [4] [6] [8]
>   2: [10]
>   3: [9]
>   4:
>   5: [11] [14] [12] [13] [15]
>   6: [16]
>   7: [19] [21] [17] [20] [22] [18] [23]
>   8: [30] [33] [25] [27] [32] [37] [35] [29] [34] [36] [28] [31] [24] [26]
>   9: [38]
>   == Plan ==
>   == Parsed Logical Plan ==
>   WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@5dacee39
>   +- SerializeFromObject [input[0, int, false] AS value#26274]
>      +- MapElements org.apache.spark.sql.kafka010.KafkaSourceStressSuite$$Lambda$6404/733551634@78134533, class scala.Tuple2, [StructField(_1,StringType,true), StructField(_2,StringType,true)], obj#26273: int
>         +- DeserializeToObject newInstance(class scala.Tuple2), obj#26272: scala.Tuple2
>            +- Project [cast(key#26248 as string) AS key#26262, cast(value#26249 as string) AS value#26263]
>               +- StreamingDataSourceV2Relation [key#26248, value#26249, topic#26250, partition#26251, offset#26252L, timestamp#26253, timestampType#26254], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@7749904a, KafkaV2[SubscribePattern[stress.*]], {"stress8":{"2":0,"4":0,"1":0,"3":0,"0":0},"stress10":{"2":2,"5":1,"4":0,"7":0,"1":4,"3":1,"6":1,"0":3},"stress1":{"2":0,"5":0,"4":1,"7":1,"1":2,"3":2,"6":1,"0":8},"stress4":{"8":0,"2":2,"5":0,"4":0,"7":0,"10":0,"1":2,"9":0,"3":0,"6":0,"0":1},"stress3":{"8":0,"2":0,"5":2,"4":2,"7":0,"1":1,"3":1,"6":0,"0":0},"stress6":{"8":0,"2":0,"5":1,"4":0,"7":0,"1":3,"3":1,"6":0,"0":1},"stress12":{"2":0,"1":0,"3":0,"0":0},"stress2":{"8":0,"11":0,"2":0,"5":0,"4":1,"7":0,"1":7,"10":0,"9":0,"3":0,"12":0,"6":0,"0":6}}, {"stress10":{"2":2,"5":1,"4":0,"7":0,"1":4,"3":1,"6":1,"0":3},"stress1":{"2":0,"5":0,"4":1,"7":1,"1":2,"3":2,"6":1,"0":8},"stress4":{"8":0,"2":2,"5":0,"4":0,"7":0,"10":0,"1":2,"9":0,"3":0,"6":0,"0":1},"stress3":{"8":0,"2":0,"5":2,"4":2,"7":0,"1":1,"3":1,"6":0,"0":0},"stress6":{"8":1,"2":0,"5":1,"4":0,"7":0,"1":3,"3":1,"6":1,"0":2},"stress12":{"2":0,"1":0,"3":0,"0":0},"stress2":{"8":0,"11":0,"2":0,"5":0,"4":1,"7":0,"1":7,"10":0,"9":0,"3":0,"12":0,"6":0,"0":6}}
>   == Analyzed Logical Plan ==
>   WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@5dacee39
>   +- SerializeFromObject [input[0, int, false] AS value#26274]
>      +- MapElements org.apache.spark.sql.kafka010.KafkaSourceStressSuite$$Lambda$6404/733551634@78134533, class scala.Tuple2, [StructField(_1,StringType,true), StructField(_2,StringType,true)], obj#26273: int
>         +- DeserializeToObject newInstance(class scala.Tuple2), obj#26272: scala.Tuple2
>            +- Project [cast(key#26248 as string) AS key#26262, cast(value#26249 as string) AS value#26263]
>               +- StreamingDataSourceV2Relation [key#26248, value#26249, topic#26250, partition#26251, offset#26252L, timestamp#26253, timestampType#26254], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@7749904a, KafkaV2[SubscribePattern[stress.*]], {"stress8":{"2":0,"4":0,"1":0,"3":0,"0":0},"stress10":{"2":2,"5":1,"4":0,"7":0,"1":4,"3":1,"6":1,"0":3},"stress1":{"2":0,"5":0,"4":1,"7":1,"1":2,"3":2,"6":1,"0":8},"stress4":{"8":0,"2":2,"5":0,"4":0,"7":0,"10":0,"1":2,"9":0,"3":0,"6":0,"0":1},"stress3":{"8":0,"2":0,"5":2,"4":2,"7":0,"1":1,"3":1,"6":0,"0":0},"stress6":{"8":0,"2":0,"5":1,"4":0,"7":0,"1":3,"3":1,"6":0,"0":1},"stress12":{"2":0,"1":0,"3":0,"0":0},"stress2":{"8":0,"11":0,"2":0,"5":0,"4":1,"7":0,"1":7,"10":0,"9":0,"3":0,"12":0,"6":0,"0":6}}, {"stress10":{"2":2,"5":1,"4":0,"7":0,"1":4,"3":1,"6":1,"0":3},"stress1":{"2":0,"5":0,"4":1,"7":1,"1":2,"3":2,"6":1,"0":8},"stress4":{"8":0,"2":2,"5":0,"4":0,"7":0,"10":0,"1":2,"9":0,"3":0,"6":0,"0":1},"stress3":{"8":0,"2":0,"5":2,"4":2,"7":0,"1":1,"3":1,"6":0,"0":0},"stress6":{"8":1,"2":0,"5":1,"4":0,"7":0,"1":3,"3":1,"6":1,"0":2},"stress12":{"2":0,"1":0,"3":0,"0":0},"stress2":{"8":0,"11":0,"2":0,"5":0,"4":1,"7":0,"1":7,"10":0,"9":0,"3":0,"12":0,"6":0,"0":6}}
>   == Optimized Logical Plan ==
>   WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@5dacee39
>   +- SerializeFromObject [input[0, int, false] AS value#26274]
>      +- MapElements org.apache.spark.sql.kafka010.KafkaSourceStressSuite$$Lambda$6404/733551634@78134533, class scala.Tuple2, [StructField(_1,StringType,true), StructField(_2,StringType,true)], obj#26273: int
>         +- DeserializeToObject newInstance(class scala.Tuple2), obj#26272: scala.Tuple2
>            +- Project [cast(key#26248 as string) AS key#26262, cast(value#26249 as string) AS value#26263]
>               +- StreamingDataSourceV2Relation [key#26248, value#26249, topic#26250, partition#26251, offset#26252L, timestamp#26253, timestampType#26254], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@7749904a, KafkaV2[SubscribePattern[stress.*]], {"stress8":{"2":0,"4":0,"1":0,"3":0,"0":0},"stress10":{"2":2,"5":1,"4":0,"7":0,"1":4,"3":1,"6":1,"0":3},"stress1":{"2":0,"5":0,"4":1,"7":1,"1":2,"3":2,"6":1,"0":8},"stress4":{"8":0,"2":2,"5":0,"4":0,"7":0,"10":0,"1":2,"9":0,"3":0,"6":0,"0":1},"stress3":{"8":0,"2":0,"5":2,"4":2,"7":0,"1":1,"3":1,"6":0,"0":0},"stress6":{"8":0,"2":0,"5":1,"4":0,"7":0,"1":3,"3":1,"6":0,"0":1},"stress12":{"2":0,"1":0,"3":0,"0":0},"stress2":{"8":0,"11":0,"2":0,"5":0,"4":1,"7":0,"1":7,"10":0,"9":0,"3":0,"12":0,"6":0,"0":6}}, {"stress10":{"2":2,"5":1,"4":0,"7":0,"1":4,"3":1,"6":1,"0":3},"stress1":{"2":0,"5":0,"4":1,"7":1,"1":2,"3":2,"6":1,"0":8},"stress4":{"8":0,"2":2,"5":0,"4":0,"7":0,"10":0,"1":2,"9":0,"3":0,"6":0,"0":1},"stress3":{"8":0,"2":0,"5":2,"4":2,"7":0,"1":1,"3":1,"6":0,"0":0},"stress6":{"8":1,"2":0,"5":1,"4":0,"7":0,"1":3,"3":1,"6":1,"0":2},"stress12":{"2":0,"1":0,"3":0,"0":0},"stress2":{"8":0,"11":0,"2":0,"5":0,"4":1,"7":0,"1":7,"10":0,"9":0,"3":0,"12":0,"6":0,"0":6}}
>   == Physical Plan ==
>   WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@5dacee39
>   +- *(1) SerializeFromObject [input[0, int, false] AS value#26274]
>      +- *(1) MapElements org.apache.spark.sql.kafka010.KafkaSourceStressSuite$$Lambda$6404/733551634@78134533, obj#26273: int
>         +- *(1) DeserializeToObject newInstance(class scala.Tuple2), obj#26272: scala.Tuple2
>            +- *(1) Project [cast(key#26248 as string) AS key#26262, cast(value#26249 as string) AS value#26263]
>               +- *(1) Project [key#26248, value#26249, topic#26250, partition#26251, offset#26252L, timestamp#26253, timestampType#26254]
>                  +- MicroBatchScan[key#26248, value#26249, topic#26250, partition#26251, offset#26252L, timestamp#26253, timestampType#26254] class org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan (StreamTest.scala:452)
>   org.scalatest.exceptions.TestFailedException:
>   at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
>   at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
>   at org.scalatest.funsuite.AnyFunSuite.newAssertionFailedException(AnyFunSuite.scala:1562)
>   at org.scalatest.Assertions.fail(Assertions.scala:933)
>   at org.scalatest.Assertions.fail$(Assertions.scala:929)
>   at org.scalatest.funsuite.AnyFunSuite.fail(AnyFunSuite.scala:1562)
>   at org.apache.spark.sql.streaming.StreamTest.failTest$1(StreamTest.scala:452)
>   at org.apache.spark.sql.streaming.StreamTest.liftedTree1$1(StreamTest.scala:788)
>   at org.apache.spark.sql.streaming.StreamTest.testStream(StreamTest.scala:764)
>   at org.apache.spark.sql.streaming.StreamTest.testStream$(StreamTest.scala:334)
>   at org.apache.spark.sql.kafka010.KafkaSourceTest.testStream(KafkaMicroBatchSourceSuite.scala:53)
>   at org.apache.spark.sql.streaming.StreamTest.runStressTest(StreamTest.scala:876)
>   at org.apache.spark.sql.streaming.StreamTest.runStressTest$(StreamTest.scala:828)
>   at org.apache.spark.sql.kafka010.KafkaSourceTest.runStressTest(KafkaMicroBatchSourceSuite.scala:53)
>   at org.apache.spark.sql.kafka010.KafkaSourceStressSuite.$anonfun$new$157(KafkaMicroBatchSourceSuite.scala:1906)
>   at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>   at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
>   at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
>   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
>   at org.scalatest.Transformer.apply(Transformer.scala:22)
>   at org.scalatest.Transformer.apply(Transformer.scala:20)
>   at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:189)
>   at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:158)
>   at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:187)
>   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:199)
>   at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
>   at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:199)
>   at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:181)
>   at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:60)
>   at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)
>   at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)
>   at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:60)
>   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:232)
>   at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
>   at scala.collection.immutable.List.foreach(List.scala:392)
>   at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
>   at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
>   at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
>   at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:232)
>   at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:231)
>   at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1562)
>   at org.scalatest.Suite.run(Suite.scala:1112)
>   at org.scalatest.Suite.run$(Suite.scala:1094)
>   at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1562)
>   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:236)
>   at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
>   at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:236)
>   at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:235)
>   at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:60)
>   at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
>   at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
>   at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
>   at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:60)
>   at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:318)
>   at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:513)
>   at sbt.ForkMain$Run$2.call(ForkMain.java:296)
>   at sbt.ForkMain$Run$2.call(ForkMain.java:286)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
> {code}
> This seems a bit frequent.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org