You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Jungtaek Lim (Jira)" <ji...@apache.org> on 2022/02/07 09:24:00 UTC

[jira] [Assigned] (SPARK-38046) Fix KafkaSource/KafkaMicroBatch flaky test due to non-deterministic timing

     [ https://issues.apache.org/jira/browse/SPARK-38046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jungtaek Lim reassigned SPARK-38046:
------------------------------------

    Assignee: Boyang Jerry Peng

> Fix KafkaSource/KafkaMicroBatch flaky test due to non-deterministic timing
> --------------------------------------------------------------------------
>
>                 Key: SPARK-38046
>                 URL: https://issues.apache.org/jira/browse/SPARK-38046
>             Project: Spark
>          Issue Type: Improvement
>          Components: Tests
>    Affects Versions: 3.2.0
>            Reporter: Boyang Jerry Peng
>            Assignee: Boyang Jerry Peng
>            Priority: Major
>
> There is a test call "compositeReadLimit"
>  
> [https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala#L460]
>  
> that is flaky.  The problem is because the Kakfa connector is always getting the actual system time and not advancing it manually, thus leaving room for non-deterministic behaviors especially since the source determines if "maxTriggerDelayMs" is satisfied by comparing the last trigger time with the current system time.  One can simply "sleep" at points in the test to generate different outcomes.
>  
> Example output when test fails:
>  
> {code:java}
> - compositeReadLimit *** FAILED *** (7 seconds, 862 milliseconds)
>   == Results ==
>   !== Correct Answer - 0 ==   == Spark Answer - 14 ==
>   !struct<>                   struct<value:int>
>   !                           [112]
>   !                           [113]
>   !                           [114]
>   !                           [115]
>   !                           [116]
>   !                           [117]
>   !                           [118]
>   !                           [119]
>   !                           [120]
>   !                           [16]
>   !                           [17]
>   !                           [18]
>   !                           [19]
>   !                           [20]
>       
>   
>   == Progress ==
>      StartStream(ProcessingTimeTrigger(100),org.apache.spark.sql.streaming.util.StreamManualClock@30075210,Map(),null)
>      AssertOnQuery(<condition>, )
>      CheckAnswer: [1],[10],[100],[101],[102],[103],[104],[105],[106],[107],[11],[108],[109],[110],[111],[12],[13],[14],[15]
>      AdvanceManualClock(100)
>      AssertOnQuery(<condition>, )
>   => CheckNewAnswer: 
>      Assert(<condition>, )
>      AdvanceManualClock(100)
>      AssertOnQuery(<condition>, )
>      CheckAnswer: [1],[10],[100],[101],[102],[103],[104],[105],[106],[107],[11],[108],[109],[110],[111],[112],[113],[114],[115],[116],[12],[117],[118],[119],[120],[121],[13],[14],[15],[16],[17],[18],[19],[2],[20],[21],[22],[23],[24]
>      AdvanceManualClock(100)
>      AssertOnQuery(<condition>, )
>      CheckNewAnswer: 
>      Assert(<condition>, )
>      AdvanceManualClock(100)
>      AssertOnQuery(<condition>, )
>      CheckAnswer: [1],[10],[100],[101],[102],[103],[104],[105],[106],[107],[11],[108],[109],[110],[111],[112],[113],[114],[115],[116],[117],[118],[119],[12],[120],[121],[122],[123],[124],[125],[126],[127],[128],[13],[14],[15],[16],[17],[18],[19],[2],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30]
>   
>   == Stream ==
>   Output Mode: Append
>   Stream state: {KafkaSourceV1[Subscribe[topic-41]]: {"topic-41":{"2":1,"1":11,"0":21}}}
>   Thread state: alive
>   Thread stack trace: java.lang.Object.wait(Native Method)
>   org.apache.spark.util.ManualClock.waitTillTime(ManualClock.scala:67)
>   org.apache.spark.sql.streaming.util.StreamManualClock.waitTillTime(StreamManualClock.scala:34)
>   org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:76)
>   org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:222)
>   org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:350)
>   org.apache.spark.sql.execution.streaming.StreamExecution$$Lambda$3081/1859014229.apply$mcV$sp(Unknown Source)
>   scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>   org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:857)
>   org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:325)
>   org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:252)
>   
>   
>   == Sink ==
>   0: [100] [101] [102] [103] [104] [105] [106] [107] [108] [109] [110] [111] [1] [10] [11] [12] [13] [14] [15]
>   1: [112] [113] [114] [115] [116] [117] [118] [119] [120] [16] [17] [18] [19] [20]
>   
>   
>   == Plan ==
>   == Parsed Logical Plan ==
>   WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@48d73c40
>   +- SerializeFromObject [input[0, int, false] AS value#7455]
>      +- MapElements org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$Lambda$6114/1322446561@4df69693, class scala.Tuple2, [StructField(_1,StringType,true), StructField(_2,StringType,true)], obj#7454: int
>         +- DeserializeToObject newInstance(class scala.Tuple2), obj#7453: scala.Tuple2
>            +- Project [cast(key#7429 as string) AS key#7443, cast(value#7430 as string) AS value#7444]
>               +- Project [key#7501 AS key#7429, value#7502 AS value#7430, topic#7503 AS topic#7431, partition#7504 AS partition#7432, offset#7505L AS offset#7433L, timestamp#7506 AS timestamp#7434, timestampType#7507 AS timestampType#7435]
>                  +- LogicalRDD [key#7501, value#7502, topic#7503, partition#7504, offset#7505L, timestamp#7506, timestampType#7507], true
>   
>   == Analyzed Logical Plan ==
>   WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@48d73c40
>   +- SerializeFromObject [input[0, int, false] AS value#7455]
>      +- MapElements org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$Lambda$6114/1322446561@4df69693, class scala.Tuple2, [StructField(_1,StringType,true), StructField(_2,StringType,true)], obj#7454: int
>         +- DeserializeToObject newInstance(class scala.Tuple2), obj#7453: scala.Tuple2
>            +- Project [cast(key#7429 as string) AS key#7443, cast(value#7430 as string) AS value#7444]
>               +- Project [key#7501 AS key#7429, value#7502 AS value#7430, topic#7503 AS topic#7431, partition#7504 AS partition#7432, offset#7505L AS offset#7433L, timestamp#7506 AS timestamp#7434, timestampType#7507 AS timestampType#7435]
>                  +- LogicalRDD [key#7501, value#7502, topic#7503, partition#7504, offset#7505L, timestamp#7506, timestampType#7507], true
>   
>   == Optimized Logical Plan ==
>   WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@48d73c40
>   +- SerializeFromObject [input[0, int, false] AS value#7455]
>      +- MapElements org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$Lambda$6114/1322446561@4df69693, class scala.Tuple2, [StructField(_1,StringType,true), StructField(_2,StringType,true)], obj#7454: int
>         +- DeserializeToObject newInstance(class scala.Tuple2), obj#7453: scala.Tuple2
>            +- Project [cast(key#7501 as string) AS key#7443, cast(value#7502 as string) AS value#7444]
>               +- LogicalRDD [key#7501, value#7502, topic#7503, partition#7504, offset#7505L, timestamp#7506, timestampType#7507], true
>   
>   == Physical Plan ==
>   WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@48d73c40, org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$3704/659759030@5d8e6616
>   +- *(1) SerializeFromObject [input[0, int, false] AS value#7455]
>      +- *(1) MapElements org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$Lambda$6114/1322446561@4df69693, obj#7454: int
>         +- *(1) DeserializeToObject newInstance(class scala.Tuple2), obj#7453: scala.Tuple2
>            +- *(1) Project [cast(key#7501 as string) AS key#7443, cast(value#7502 as string) AS value#7444]
>               +- *(1) Scan ExistingRDD kafka[key#7501,value#7502,topic#7503,partition#7504,offset#7505L,timestamp#7506,timestampType#7507] (StreamTest.scala:466)
>   org.scalatest.exceptions.TestFailedException:
>   at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530)
>   at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529)
>   at org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1560)
>   at org.scalatest.Assertions.fail(Assertions.scala:1091)
>   at org.scalatest.Assertions.fail$(Assertions.scala:1087)
>   at org.scalatest.FunSuite.fail(FunSuite.scala:1560)
>   at org.apache.spark.sql.streaming.StreamTest.failTest$1(StreamTest.scala:466)
>   at org.apache.spark.sql.streamin2022-01-07 10:25:45,899 INFO  logger - [Controller id=0 epoch=1] Changed partition topic-42-0 from NewPartition to OnlinePartition with state LeaderAndIsr(leader=0, leaderEpoch=0, isr=List(0), zkVersion=0){code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org