You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Yang Jie (Jira)" <ji...@apache.org> on 2022/10/19 09:54:00 UTC

[jira] [Commented] (SPARK-40734) KafkaMicroBatchV2SourceWithAdminSuite failed

    [ https://issues.apache.org/jira/browse/SPARK-40734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17620167#comment-17620167 ] 

Yang Jie commented on SPARK-40734:
----------------------------------

It looks like flaky test, 

re-run will succeed, but the inducement for failure has not been found yet

 
{code:java}
- ensure stream-stream self-join generates only one offset in log and correct metrics *** FAILED ***
  Timed out waiting for stream: The code passed to failAfter did not complete within 30 seconds.
  java.base/java.lang.Thread.getStackTrace(Thread.java:2550)
  	org.scalatest.concurrent.TimeLimits$.failAfterImpl(TimeLimits.scala:277)
  	org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:231)
  	org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:230)
  	org.apache.spark.sql.kafka010.KafkaSourceTest.failAfter(KafkaMicroBatchSourceSuite.scala:53)
  	org.apache.spark.sql.streaming.StreamTest.$anonfun$testStream$7(StreamTest.scala:479)
  	org.apache.spark.sql.streaming.StreamTest.$anonfun$testStream$7$adapted(StreamTest.scala:478)
  	scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149)
  	scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237)
  	scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230)
  
  	Caused by: 	null
  	java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1766)
  		org.apache.spark.sql.execution.streaming.StreamExecution.awaitOffset(StreamExecution.scala:465)
  		org.apache.spark.sql.streaming.StreamTest.$anonfun$testStream$8(StreamTest.scala:480)
  		scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
  		org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127)
  		org.scalatest.concurrent.TimeLimits$.failAfterImpl(TimeLimits.scala:282)
  		org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:231)
  		org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:230)
  		org.apache.spark.sql.kafka010.KafkaSourceTest.failAfter(KafkaMicroBatchSourceSuite.scala:53)
  		org.apache.spark.sql.streaming.StreamTest.$anonfun$testStream$7(StreamTest.scala:479)
  
  
  == Progress ==
     AssertOnQuery(<condition>, )
     AddKafkaData(topics = Set(topic-51), data = WrappedArray(1, 2), message = )
  => CheckAnswer: [1,1,1],[2,2,2]
     AddKafkaData(topics = Set(topic-51), data = WrappedArray(6, 3), message = )
     CheckAnswer: [1,1,1],[2,2,2],[3,3,3],[1,6,1],[1,1,6],[1,6,6]
     AssertOnQuery(<condition>, )
  
  == Stream ==
  Output Mode: Append
  Stream state: {KafkaV2[Subscribe[topic-51]]: {"topic-51":{"1":0,"0":1}}}
  Thread state: alive
  Thread stack trace: java.base/java.lang.ProcessImpl.forkAndExec(Native Method)
  java.base/java.lang.ProcessImpl.<init>(ProcessImpl.java:319)
  java.base/java.lang.ProcessImpl.start(ProcessImpl.java:249)
  java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1110)
  java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1073)
  org.apache.hadoop.util.Shell.runCommand(Shell.java:937)
  org.apache.hadoop.util.Shell.run(Shell.java:900)
  org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:1212)
  org.apache.hadoop.util.Shell.execCommand(Shell.java:1306)
  org.apache.hadoop.util.Shell.execCommand(Shell.java:1288)
  org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:978)
  org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:324)
  org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:294)
  org.apache.hadoop.fs.RawLocalFileSystem.createOutputStreamWithMode(RawLocalFileSystem.java:439)
  org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:428)
  org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:459)
  org.apache.hadoop.fs.FileSystem.primitiveCreate(FileSystem.java:1305)
  org.apache.hadoop.fs.DelegateToFileSystem.createInternal(DelegateToFileSystem.java:102)
  org.apache.hadoop.fs.ChecksumFs$ChecksumFSOutputSummer.<init>(ChecksumFs.java:360)
  org.apache.hadoop.fs.ChecksumFs.createInternal(ChecksumFs.java:400)
  org.apache.hadoop.fs.AbstractFileSystem.create(AbstractFileSystem.java:626)
  org.apache.hadoop.fs.FileContext$3.next(FileContext.java:701)
  org.apache.hadoop.fs.FileContext$3.next(FileContext.java:697)
  org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
  org.apache.hadoop.fs.FileContext.create(FileContext.java:703)
  org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.createTempFile(CheckpointFileManager.scala:359)
  org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.<init>(CheckpointFileManager.scala:140)
  org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.<init>(CheckpointFileManager.scala:143)
  org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.createAtomic(CheckpointFileManager.scala:365)
  org.apache.spark.sql.execution.streaming.HDFSMetadataLog.$anonfun$addNewBatchByStream$2(HDFSMetadataLog.scala:173)
  scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
  scala.Option.getOrElse(Option.scala:189)
  org.apache.spark.sql.execution.streaming.HDFSMetadataLog.addNewBatchByStream(HDFSMetadataLog.scala:171)
  org.apache.spark.sql.execution.streaming.HDFSMetadataLog.add(HDFSMetadataLog.scala:116)
  org.apache.spark.sql.execution.streaming.OffsetSeqLog.add(OffsetSeqLog.scala:53)
  org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$13(MicroBatchExecution.scala:509)
  scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
  org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:384)
  org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:382)
  org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
  org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$1(MicroBatchExecution.scala:507)
  scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
  org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:726)
  org.apache.spark.sql.execution.streaming.MicroBatchExecution.constructNextBatch(MicroBatchExecution.scala:447)
  org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:252)
  scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
  org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:384)
  org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:382)
  org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
  org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:233)
  org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
  org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:227)
  org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:307)
  scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
  org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
  org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:285)
  org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:208)
  
  
  == Sink ==
  0: 
  1: [1,1,1]
  
  
  == Plan ==
  == Parsed Logical Plan ==
  WriteToMicroBatchDataSource MemorySink, 9d3b23d7-bb0b-4005-9e5e-f73b25f4ea37, Append, 1
  +- Project [key#71341, value#71340, value#71344]
     +- Join Inner, (key#71341 = key#71345)
        :- Project [cast(cast(value#71327 as string) as int) AS value#71340, (cast(cast(value#71327 as string) as int) % 5) AS key#71341]
        :  +- StreamingDataSourceV2Relation [key#71326, value#71327, topic#71328, partition#71329, offset#71330L, timestamp#71331, timestampType#71332], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@685bf743, KafkaV2[Subscribe[topic-51]], {"topic-51":{"1":0,"0":0}}, {"topic-51":{"1":0,"0":1}}
        +- Project [cast(cast(value#71327 as string) as int) AS value#71344, (cast(cast(value#71327 as string) as int) % 5) AS key#71345]
           +- StreamingDataSourceV2Relation [key#71326, value#71327, topic#71328, partition#71329, offset#71330L, timestamp#71331, timestampType#71332], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@685bf743, KafkaV2[Subscribe[topic-51]], {"topic-51":{"1":0,"0":0}}, {"topic-51":{"1":0,"0":1}}
  
  == Analyzed Logical Plan ==
  WriteToMicroBatchDataSource MemorySink, 9d3b23d7-bb0b-4005-9e5e-f73b25f4ea37, Append, 1
  +- Project [key#71341, value#71340, value#71344]
     +- Join Inner, (key#71341 = key#71345)
        :- Project [cast(cast(value#71327 as string) as int) AS value#71340, (cast(cast(value#71327 as string) as int) % 5) AS key#71341]
        :  +- StreamingDataSourceV2Relation [key#71326, value#71327, topic#71328, partition#71329, offset#71330L, timestamp#71331, timestampType#71332], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@685bf743, KafkaV2[Subscribe[topic-51]], {"topic-51":{"1":0,"0":0}}, {"topic-51":{"1":0,"0":1}}
        +- Project [cast(cast(value#71327 as string) as int) AS value#71344, (cast(cast(value#71327 as string) as int) % 5) AS key#71345]
           +- StreamingDataSourceV2Relation [key#71326, value#71327, topic#71328, partition#71329, offset#71330L, timestamp#71331, timestampType#71332], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@685bf743, KafkaV2[Subscribe[topic-51]], {"topic-51":{"1":0,"0":0}}, {"topic-51":{"1":0,"0":1}}
  
  == Optimized Logical Plan ==
  WriteToDataSourceV2 MicroBathWrite[epoch: 1, writer: org.apache.spark.sql.execution.streaming.sources.MemoryStreamingWrite@76374379]
  +- Project [key#71341, value#71340, value#71344]
     +- Join Inner, (key#71341 = key#71345)
        :- Project [cast(cast(value#71327 as string) as int) AS value#71340, (cast(cast(value#71327 as string) as int) % 5) AS key#71341]
        :  +- Filter isnotnull((cast(cast(value#71327 as string) as int) % 5))
        :     +- StreamingDataSourceV2Relation [key#71326, value#71327, topic#71328, partition#71329, offset#71330L, timestamp#71331, timestampType#71332], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@685bf743, KafkaV2[Subscribe[topic-51]], {"topic-51":{"1":0,"0":0}}, {"topic-51":{"1":0,"0":1}}
        +- Project [cast(cast(value#71327 as string) as int) AS value#71344, (cast(cast(value#71327 as string) as int) % 5) AS key#71345]
           +- Filter isnotnull((cast(cast(value#71327 as string) as int) % 5))
              +- StreamingDataSourceV2Relation [key#71326, value#71327, topic#71328, partition#71329, offset#71330L, timestamp#71331, timestampType#71332], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@685bf743, KafkaV2[Subscribe[topic-51]], {"topic-51":{"1":0,"0":0}}, {"topic-51":{"1":0,"0":1}}
  
  == Physical Plan ==
  WriteToDataSourceV2 MicroBathWrite[epoch: 1, writer: org.apache.spark.sql.execution.streaming.sources.MemoryStreamingWrite@76374379], org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$5924/0x00000008023e0000@3079e1be
  +- *(3) Project [key#71341, value#71340, value#71344]
     +- StreamingSymmetricHashJoin [key#71341], [key#71345], Inner, condition = [ leftOnly = null, rightOnly = null, both = null, full = null ], state info [ checkpoint = file:/Users/yangjie01/SourceCode/git/spark-mine-12/connector/kafka-0-10-sql/target/tmp/streaming.metadata-35e5abc4-7986-478a-a06d-a5029055402f/state, runId = 8f8665b7-883b-4dae-9f2d-972f08219a59, opId = 0, ver = 1, numPartitions = 5], 0, state cleanup [ left = null, right = null ], 2
        :- Exchange hashpartitioning(key#71341, 5), ENSURE_REQUIREMENTS, [plan_id=108033]
        :  +- *(1) Project [cast(cast(value#71327 as string) as int) AS value#71340, (cast(cast(value#71327 as string) as int) % 5) AS key#71341]
        :     +- *(1) Filter isnotnull((cast(cast(value#71327 as string) as int) % 5))
        :        +- MicroBatchScan[key#71326, value#71327, topic#71328, partition#71329, offset#71330L, timestamp#71331, timestampType#71332] class org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan
        +- Exchange hashpartitioning(key#71345, 5), ENSURE_REQUIREMENTS, [plan_id=108038]
           +- *(2) Project [cast(cast(value#71327 as string) as int) AS value#71344, (cast(cast(value#71327 as string) as int) % 5) AS key#71345]
              +- *(2) Filter isnotnull((cast(cast(value#71327 as string) as int) % 5))
                 +- MicroBatchScan[key#71326, value#71327, topic#71328, partition#71329, offset#71330L, timestamp#71331, timestampType#71332] class org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan (StreamTest.scala:460)
- read Kafka transactional messages: read_committed *** FAILED ***
  Assert on query failed: Execute: The code passed to eventually never returned normally. Attempted 1694 times over 30.010655291000003 seconds. Last failure message: clock.isStreamWaitingAt(clock.getTimeMillis()) was false.
  org.scalatest.enablers.Retrying$$anon$4.tryTryAgain$2(Retrying.scala:219)
  	org.scalatest.enablers.Retrying$$anon$4.retry(Retrying.scala:226)
  	org.scalatest.concurrent.Eventually.eventually(Eventually.scala:348)
  	org.scalatest.concurrent.Eventually.eventually$(Eventually.scala:347)
  	org.apache.spark.sql.kafka010.KafkaSourceTest.eventually(KafkaMicroBatchSourceSuite.scala:53)
  	org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase.$anonfun$new$88(KafkaMicroBatchSourceSuite.scala:1063)
  	org.apache.spark.sql.streaming.StreamTest$Execute$.$anonfun$apply$4(StreamTest.scala:303)
  	org.apache.spark.sql.streaming.StreamTest$Execute$.$anonfun$apply$4$adapted(StreamTest.scala:303)
  	org.apache.spark.sql.streaming.StreamTest.$anonfun$testStream$41(StreamTest.scala:661)
  	org.apache.spark.sql.streaming.StreamTest.verify$1(StreamTest.scala:430)
  
  	Caused by: 	clock.isStreamWaitingAt(clock.getTimeMillis()) was false
  	org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
  		org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
  		org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
  		org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
  		org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase.$anonfun$new$89(KafkaMicroBatchSourceSuite.scala:1065)
  		org.scalatest.enablers.Retrying$$anon$4.makeAValiantAttempt$1(Retrying.scala:184)
  		org.scalatest.enablers.Retrying$$anon$4.tryTryAgain$2(Retrying.scala:196)
  		org.scalatest.enablers.Retrying$$anon$4.retry(Retrying.scala:226)
  		org.scalatest.concurrent.Eventually.eventually(Eventually.scala:348)
  		org.scalatest.concurrent.Eventually.eventually$(Eventually.scala:347)
  
  
  == Progress ==
     StartStream(ProcessingTimeTrigger(100),org.apache.spark.sql.streaming.util.StreamManualClock@35e37a97,Map(),null)
     AssertOnQuery(<condition>, Execute)
     CheckAnswer: 
     AssertOnQuery(<condition>, Run Kafka Producer)
     AdvanceManualClock(100)
     AssertOnQuery(<condition>, Execute)
     CheckNewAnswer: 
     AssertOnQuery(<condition>, Run Kafka Producer)
     AdvanceManualClock(100)
  => AssertOnQuery(<condition>, Execute)
     CheckNewAnswer: [0],[1],[2]
     AdvanceManualClock(100)
     AssertOnQuery(<condition>, Execute)
     CheckNewAnswer: [3],[4]
     AssertOnQuery(<condition>, Run Kafka Producer)
     AdvanceManualClock(100)
     AssertOnQuery(<condition>, Execute)
     CheckNewAnswer: 
     AdvanceManualClock(100)
     AssertOnQuery(<condition>, Execute)
     CheckNewAnswer: 
     AssertOnQuery(<condition>, Run Kafka Producer)
     AdvanceManualClock(100)
     AssertOnQuery(<condition>, Execute)
     CheckNewAnswer: [12],[13],[14]
     AdvanceManualClock(100)
     AssertOnQuery(<condition>, Execute)
     CheckNewAnswer: [15],[16]
     AssertOnQuery(<condition>, Run Kafka Producer)
     AdvanceManualClock(100)
     AssertOnQuery(<condition>, Execute)
     CheckNewAnswer: [18],[20]
     AdvanceManualClock(100)
     AssertOnQuery(<condition>, Execute)
     CheckNewAnswer: [22],[23]
     AdvanceManualClock(100)
     AssertOnQuery(<condition>, Execute)
     CheckNewAnswer: 
  
  == Stream ==
  Output Mode: Append
  Stream state: {KafkaV2[Subscribe[topic-52]]: {"topic-52":{"0":0}}}
  Thread state: alive
  Thread stack trace: java.base/java.io.FileInputStream.readBytes(Native Method)
  java.base/java.io.FileInputStream.read(FileInputStream.java:293)
  java.base/java.io.BufferedInputStream.read1(BufferedInputStream.java:308)
  java.base/java.io.BufferedInputStream.implRead(BufferedInputStream.java:382)
  java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:367)
  java.base/sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:333)
  java.base/sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:376)
  java.base/sun.nio.cs.StreamDecoder.lockedRead(StreamDecoder.java:219)
  java.base/sun.nio.cs.StreamDecoder.read(StreamDecoder.java:173)
  java.base/java.io.InputStreamReader.read(InputStreamReader.java:189)
  java.base/java.io.BufferedReader.fill(BufferedReader.java:161)
  java.base/java.io.BufferedReader.read1(BufferedReader.java:226)
  java.base/java.io.BufferedReader.implRead(BufferedReader.java:315)
  java.base/java.io.BufferedReader.read(BufferedReader.java:297)
  org.apache.hadoop.util.Shell$ShellCommandExecutor.parseExecResult(Shell.java:1225)
  org.apache.hadoop.util.Shell.runCommand(Shell.java:993)
  org.apache.hadoop.util.Shell.run(Shell.java:900)
  org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:1212)
  org.apache.hadoop.util.Shell.execCommand(Shell.java:1306)
  org.apache.hadoop.util.Shell.execCommand(Shell.java:1288)
  org.apache.hadoop.fs.FileUtil.readLink(FileUtil.java:212)
  org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileLinkStatusInternal(RawLocalFileSystem.java:1113)
  org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:1102)
  org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatus(RawLocalFileSystem.java:1073)
  org.apache.hadoop.fs.FileSystem.rename(FileSystem.java:1590)
  org.apache.hadoop.fs.DelegateToFileSystem.renameInternal(DelegateToFileSystem.java:206)
  org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:790)
  org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:720)
  org.apache.hadoop.fs.ChecksumFs.renameInternal(ChecksumFs.java:496)
  org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:720)
  org.apache.hadoop.fs.FileContext.rename(FileContext.java:1036)
  org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:370)
  org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:154)
  org.apache.spark.sql.execution.streaming.HDFSMetadataLog.$anonfun$addNewBatchByStream$2(HDFSMetadataLog.scala:176)
  scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
  scala.Option.getOrElse(Option.scala:189)
  org.apache.spark.sql.execution.streaming.HDFSMetadataLog.addNewBatchByStream(HDFSMetadataLog.scala:171)
  org.apache.spark.sql.execution.streaming.HDFSMetadataLog.add(HDFSMetadataLog.scala:116)
  org.apache.spark.sql.execution.streaming.OffsetSeqLog.add(OffsetSeqLog.scala:53)
  org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$13(MicroBatchExecution.scala:509)
  scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
  org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:384)
  org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:382)
  org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
  org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$1(MicroBatchExecution.scala:507)
  scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
  org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:726)
  org.apache.spark.sql.execution.streaming.MicroBatchExecution.constructNextBatch(MicroBatchExecution.scala:447)
  org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:252)
  scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
  org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:384)
  org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:382)
  org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
  org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:233)
  org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
  org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:227)
  org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:307)
  scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
  org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
  org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:285)
  org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:208)
  
  
  == Sink ==
  0: 
  
  
  == Plan ==
  == Parsed Logical Plan ==
  WriteToMicroBatchDataSource MemorySink, 153f4917-cee4-4d8d-bf51-019ec25b6cd2, Append, 0
  +- SerializeFromObject [input[0, int, false] AS value#71573]
     +- MapElements org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$Lambda$7385/0x000000080263d938@461d76ed, class scala.Tuple2, [StructField(_1,StringType,true), StructField(_2,StringType,true)], obj#71572: int
        +- DeserializeToObject newInstance(class scala.Tuple2), obj#71571: scala.Tuple2
           +- Project [cast(key#71547 as string) AS key#71561, cast(value#71548 as string) AS value#71562]
              +- StreamingDataSourceV2Relation [key#71547, value#71548, topic#71549, partition#71550, offset#71551L, timestamp#71552, timestampType#71553], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@676d619f, KafkaV2[Subscribe[topic-52]], {"topic-52":{"0":0}}, {"topic-52":{"0":0}}
  
  == Analyzed Logical Plan ==
  WriteToMicroBatchDataSource MemorySink, 153f4917-cee4-4d8d-bf51-019ec25b6cd2, Append, 0
  +- SerializeFromObject [input[0, int, false] AS value#71573]
     +- MapElements org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$Lambda$7385/0x000000080263d938@461d76ed, class scala.Tuple2, [StructField(_1,StringType,true), StructField(_2,StringType,true)], obj#71572: int
        +- DeserializeToObject newInstance(class scala.Tuple2), obj#71571: scala.Tuple2
           +- Project [cast(key#71547 as string) AS key#71561, cast(value#71548 as string) AS value#71562]
              +- StreamingDataSourceV2Relation [key#71547, value#71548, topic#71549, partition#71550, offset#71551L, timestamp#71552, timestampType#71553], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@676d619f, KafkaV2[Subscribe[topic-52]], {"topic-52":{"0":0}}, {"topic-52":{"0":0}}
  
  == Optimized Logical Plan ==
  WriteToDataSourceV2 MicroBathWrite[epoch: 0, writer: org.apache.spark.sql.execution.streaming.sources.MemoryStreamingWrite@51b1ac59]
  +- SerializeFromObject [input[0, int, false] AS value#71573]
     +- MapElements org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$Lambda$7385/0x000000080263d938@461d76ed, class scala.Tuple2, [StructField(_1,StringType,true), StructField(_2,StringType,true)], obj#71572: int
        +- DeserializeToObject newInstance(class scala.Tuple2), obj#71571: scala.Tuple2
           +- Project [cast(key#71547 as string) AS key#71561, cast(value#71548 as string) AS value#71562]
              +- StreamingDataSourceV2Relation [key#71547, value#71548, topic#71549, partition#71550, offset#71551L, timestamp#71552, timestampType#71553], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@676d619f, KafkaV2[Subscribe[topic-52]], {"topic-52":{"0":0}}, {"topic-52":{"0":0}}
  
  == Physical Plan ==
  WriteToDataSourceV2 MicroBathWrite[epoch: 0, writer: org.apache.spark.sql.execution.streaming.sources.MemoryStreamingWrite@51b1ac59], org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$5924/0x00000008023e0000@670a07bf
  +- *(1) SerializeFromObject [input[0, int, false] AS value#71573]
     +- *(1) MapElements org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$Lambda$7385/0x000000080263d938@461d76ed, obj#71572: int
        +- *(1) DeserializeToObject newInstance(class scala.Tuple2), obj#71571: scala.Tuple2
           +- *(1) Project [cast(key#71547 as string) AS key#71561, cast(value#71548 as string) AS value#71562]
              +- MicroBatchScan[key#71547, value#71548, topic#71549, partition#71550, offset#71551L, timestamp#71552, timestampType#71553] class org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan (StreamTest.scala:460) {code}
 

 

> KafkaMicroBatchV2SourceWithAdminSuite failed
> --------------------------------------------
>
>                 Key: SPARK-40734
>                 URL: https://issues.apache.org/jira/browse/SPARK-40734
>             Project: Spark
>          Issue Type: Sub-task
>          Components: Structured Streaming
>    Affects Versions: 3.4.0
>            Reporter: Yang Jie
>            Priority: Minor
>
> - ensure stream-stream self-join generates only one offset in log and correct metrics *** FAILED ***
> - read Kafka transactional messages: read_committed *** FAILED ***
> Failure reason to be supplemented



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org