You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Yang Jie (Jira)" <ji...@apache.org> on 2022/10/19 09:54:00 UTC
[jira] [Commented] (SPARK-40734) KafkaMicroBatchV2SourceWithAdminSuite failed
[ https://issues.apache.org/jira/browse/SPARK-40734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17620167#comment-17620167 ]
Yang Jie commented on SPARK-40734:
----------------------------------
It looks like flaky test,
re-run will succeed, but the inducement for failure has not been found yet
{code:java}
- ensure stream-stream self-join generates only one offset in log and correct metrics *** FAILED ***
Timed out waiting for stream: The code passed to failAfter did not complete within 30 seconds.
java.base/java.lang.Thread.getStackTrace(Thread.java:2550)
org.scalatest.concurrent.TimeLimits$.failAfterImpl(TimeLimits.scala:277)
org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:231)
org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:230)
org.apache.spark.sql.kafka010.KafkaSourceTest.failAfter(KafkaMicroBatchSourceSuite.scala:53)
org.apache.spark.sql.streaming.StreamTest.$anonfun$testStream$7(StreamTest.scala:479)
org.apache.spark.sql.streaming.StreamTest.$anonfun$testStream$7$adapted(StreamTest.scala:478)
scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149)
scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237)
scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230)
Caused by: null
java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1766)
org.apache.spark.sql.execution.streaming.StreamExecution.awaitOffset(StreamExecution.scala:465)
org.apache.spark.sql.streaming.StreamTest.$anonfun$testStream$8(StreamTest.scala:480)
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127)
org.scalatest.concurrent.TimeLimits$.failAfterImpl(TimeLimits.scala:282)
org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:231)
org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:230)
org.apache.spark.sql.kafka010.KafkaSourceTest.failAfter(KafkaMicroBatchSourceSuite.scala:53)
org.apache.spark.sql.streaming.StreamTest.$anonfun$testStream$7(StreamTest.scala:479)
== Progress ==
AssertOnQuery(<condition>, )
AddKafkaData(topics = Set(topic-51), data = WrappedArray(1, 2), message = )
=> CheckAnswer: [1,1,1],[2,2,2]
AddKafkaData(topics = Set(topic-51), data = WrappedArray(6, 3), message = )
CheckAnswer: [1,1,1],[2,2,2],[3,3,3],[1,6,1],[1,1,6],[1,6,6]
AssertOnQuery(<condition>, )
== Stream ==
Output Mode: Append
Stream state: {KafkaV2[Subscribe[topic-51]]: {"topic-51":{"1":0,"0":1}}}
Thread state: alive
Thread stack trace: java.base/java.lang.ProcessImpl.forkAndExec(Native Method)
java.base/java.lang.ProcessImpl.<init>(ProcessImpl.java:319)
java.base/java.lang.ProcessImpl.start(ProcessImpl.java:249)
java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1110)
java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1073)
org.apache.hadoop.util.Shell.runCommand(Shell.java:937)
org.apache.hadoop.util.Shell.run(Shell.java:900)
org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:1212)
org.apache.hadoop.util.Shell.execCommand(Shell.java:1306)
org.apache.hadoop.util.Shell.execCommand(Shell.java:1288)
org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:978)
org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:324)
org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:294)
org.apache.hadoop.fs.RawLocalFileSystem.createOutputStreamWithMode(RawLocalFileSystem.java:439)
org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:428)
org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:459)
org.apache.hadoop.fs.FileSystem.primitiveCreate(FileSystem.java:1305)
org.apache.hadoop.fs.DelegateToFileSystem.createInternal(DelegateToFileSystem.java:102)
org.apache.hadoop.fs.ChecksumFs$ChecksumFSOutputSummer.<init>(ChecksumFs.java:360)
org.apache.hadoop.fs.ChecksumFs.createInternal(ChecksumFs.java:400)
org.apache.hadoop.fs.AbstractFileSystem.create(AbstractFileSystem.java:626)
org.apache.hadoop.fs.FileContext$3.next(FileContext.java:701)
org.apache.hadoop.fs.FileContext$3.next(FileContext.java:697)
org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
org.apache.hadoop.fs.FileContext.create(FileContext.java:703)
org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.createTempFile(CheckpointFileManager.scala:359)
org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.<init>(CheckpointFileManager.scala:140)
org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.<init>(CheckpointFileManager.scala:143)
org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.createAtomic(CheckpointFileManager.scala:365)
org.apache.spark.sql.execution.streaming.HDFSMetadataLog.$anonfun$addNewBatchByStream$2(HDFSMetadataLog.scala:173)
scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
scala.Option.getOrElse(Option.scala:189)
org.apache.spark.sql.execution.streaming.HDFSMetadataLog.addNewBatchByStream(HDFSMetadataLog.scala:171)
org.apache.spark.sql.execution.streaming.HDFSMetadataLog.add(HDFSMetadataLog.scala:116)
org.apache.spark.sql.execution.streaming.OffsetSeqLog.add(OffsetSeqLog.scala:53)
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$13(MicroBatchExecution.scala:509)
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:384)
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:382)
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$1(MicroBatchExecution.scala:507)
scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:726)
org.apache.spark.sql.execution.streaming.MicroBatchExecution.constructNextBatch(MicroBatchExecution.scala:447)
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:252)
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:384)
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:382)
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:233)
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:227)
org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:307)
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:285)
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:208)
== Sink ==
0:
1: [1,1,1]
== Plan ==
== Parsed Logical Plan ==
WriteToMicroBatchDataSource MemorySink, 9d3b23d7-bb0b-4005-9e5e-f73b25f4ea37, Append, 1
+- Project [key#71341, value#71340, value#71344]
+- Join Inner, (key#71341 = key#71345)
:- Project [cast(cast(value#71327 as string) as int) AS value#71340, (cast(cast(value#71327 as string) as int) % 5) AS key#71341]
: +- StreamingDataSourceV2Relation [key#71326, value#71327, topic#71328, partition#71329, offset#71330L, timestamp#71331, timestampType#71332], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@685bf743, KafkaV2[Subscribe[topic-51]], {"topic-51":{"1":0,"0":0}}, {"topic-51":{"1":0,"0":1}}
+- Project [cast(cast(value#71327 as string) as int) AS value#71344, (cast(cast(value#71327 as string) as int) % 5) AS key#71345]
+- StreamingDataSourceV2Relation [key#71326, value#71327, topic#71328, partition#71329, offset#71330L, timestamp#71331, timestampType#71332], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@685bf743, KafkaV2[Subscribe[topic-51]], {"topic-51":{"1":0,"0":0}}, {"topic-51":{"1":0,"0":1}}
== Analyzed Logical Plan ==
WriteToMicroBatchDataSource MemorySink, 9d3b23d7-bb0b-4005-9e5e-f73b25f4ea37, Append, 1
+- Project [key#71341, value#71340, value#71344]
+- Join Inner, (key#71341 = key#71345)
:- Project [cast(cast(value#71327 as string) as int) AS value#71340, (cast(cast(value#71327 as string) as int) % 5) AS key#71341]
: +- StreamingDataSourceV2Relation [key#71326, value#71327, topic#71328, partition#71329, offset#71330L, timestamp#71331, timestampType#71332], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@685bf743, KafkaV2[Subscribe[topic-51]], {"topic-51":{"1":0,"0":0}}, {"topic-51":{"1":0,"0":1}}
+- Project [cast(cast(value#71327 as string) as int) AS value#71344, (cast(cast(value#71327 as string) as int) % 5) AS key#71345]
+- StreamingDataSourceV2Relation [key#71326, value#71327, topic#71328, partition#71329, offset#71330L, timestamp#71331, timestampType#71332], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@685bf743, KafkaV2[Subscribe[topic-51]], {"topic-51":{"1":0,"0":0}}, {"topic-51":{"1":0,"0":1}}
== Optimized Logical Plan ==
WriteToDataSourceV2 MicroBathWrite[epoch: 1, writer: org.apache.spark.sql.execution.streaming.sources.MemoryStreamingWrite@76374379]
+- Project [key#71341, value#71340, value#71344]
+- Join Inner, (key#71341 = key#71345)
:- Project [cast(cast(value#71327 as string) as int) AS value#71340, (cast(cast(value#71327 as string) as int) % 5) AS key#71341]
: +- Filter isnotnull((cast(cast(value#71327 as string) as int) % 5))
: +- StreamingDataSourceV2Relation [key#71326, value#71327, topic#71328, partition#71329, offset#71330L, timestamp#71331, timestampType#71332], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@685bf743, KafkaV2[Subscribe[topic-51]], {"topic-51":{"1":0,"0":0}}, {"topic-51":{"1":0,"0":1}}
+- Project [cast(cast(value#71327 as string) as int) AS value#71344, (cast(cast(value#71327 as string) as int) % 5) AS key#71345]
+- Filter isnotnull((cast(cast(value#71327 as string) as int) % 5))
+- StreamingDataSourceV2Relation [key#71326, value#71327, topic#71328, partition#71329, offset#71330L, timestamp#71331, timestampType#71332], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@685bf743, KafkaV2[Subscribe[topic-51]], {"topic-51":{"1":0,"0":0}}, {"topic-51":{"1":0,"0":1}}
== Physical Plan ==
WriteToDataSourceV2 MicroBathWrite[epoch: 1, writer: org.apache.spark.sql.execution.streaming.sources.MemoryStreamingWrite@76374379], org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$5924/0x00000008023e0000@3079e1be
+- *(3) Project [key#71341, value#71340, value#71344]
+- StreamingSymmetricHashJoin [key#71341], [key#71345], Inner, condition = [ leftOnly = null, rightOnly = null, both = null, full = null ], state info [ checkpoint = file:/Users/yangjie01/SourceCode/git/spark-mine-12/connector/kafka-0-10-sql/target/tmp/streaming.metadata-35e5abc4-7986-478a-a06d-a5029055402f/state, runId = 8f8665b7-883b-4dae-9f2d-972f08219a59, opId = 0, ver = 1, numPartitions = 5], 0, state cleanup [ left = null, right = null ], 2
:- Exchange hashpartitioning(key#71341, 5), ENSURE_REQUIREMENTS, [plan_id=108033]
: +- *(1) Project [cast(cast(value#71327 as string) as int) AS value#71340, (cast(cast(value#71327 as string) as int) % 5) AS key#71341]
: +- *(1) Filter isnotnull((cast(cast(value#71327 as string) as int) % 5))
: +- MicroBatchScan[key#71326, value#71327, topic#71328, partition#71329, offset#71330L, timestamp#71331, timestampType#71332] class org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan
+- Exchange hashpartitioning(key#71345, 5), ENSURE_REQUIREMENTS, [plan_id=108038]
+- *(2) Project [cast(cast(value#71327 as string) as int) AS value#71344, (cast(cast(value#71327 as string) as int) % 5) AS key#71345]
+- *(2) Filter isnotnull((cast(cast(value#71327 as string) as int) % 5))
+- MicroBatchScan[key#71326, value#71327, topic#71328, partition#71329, offset#71330L, timestamp#71331, timestampType#71332] class org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan (StreamTest.scala:460)
- read Kafka transactional messages: read_committed *** FAILED ***
Assert on query failed: Execute: The code passed to eventually never returned normally. Attempted 1694 times over 30.010655291000003 seconds. Last failure message: clock.isStreamWaitingAt(clock.getTimeMillis()) was false.
org.scalatest.enablers.Retrying$$anon$4.tryTryAgain$2(Retrying.scala:219)
org.scalatest.enablers.Retrying$$anon$4.retry(Retrying.scala:226)
org.scalatest.concurrent.Eventually.eventually(Eventually.scala:348)
org.scalatest.concurrent.Eventually.eventually$(Eventually.scala:347)
org.apache.spark.sql.kafka010.KafkaSourceTest.eventually(KafkaMicroBatchSourceSuite.scala:53)
org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase.$anonfun$new$88(KafkaMicroBatchSourceSuite.scala:1063)
org.apache.spark.sql.streaming.StreamTest$Execute$.$anonfun$apply$4(StreamTest.scala:303)
org.apache.spark.sql.streaming.StreamTest$Execute$.$anonfun$apply$4$adapted(StreamTest.scala:303)
org.apache.spark.sql.streaming.StreamTest.$anonfun$testStream$41(StreamTest.scala:661)
org.apache.spark.sql.streaming.StreamTest.verify$1(StreamTest.scala:430)
Caused by: clock.isStreamWaitingAt(clock.getTimeMillis()) was false
org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase.$anonfun$new$89(KafkaMicroBatchSourceSuite.scala:1065)
org.scalatest.enablers.Retrying$$anon$4.makeAValiantAttempt$1(Retrying.scala:184)
org.scalatest.enablers.Retrying$$anon$4.tryTryAgain$2(Retrying.scala:196)
org.scalatest.enablers.Retrying$$anon$4.retry(Retrying.scala:226)
org.scalatest.concurrent.Eventually.eventually(Eventually.scala:348)
org.scalatest.concurrent.Eventually.eventually$(Eventually.scala:347)
== Progress ==
StartStream(ProcessingTimeTrigger(100),org.apache.spark.sql.streaming.util.StreamManualClock@35e37a97,Map(),null)
AssertOnQuery(<condition>, Execute)
CheckAnswer:
AssertOnQuery(<condition>, Run Kafka Producer)
AdvanceManualClock(100)
AssertOnQuery(<condition>, Execute)
CheckNewAnswer:
AssertOnQuery(<condition>, Run Kafka Producer)
AdvanceManualClock(100)
=> AssertOnQuery(<condition>, Execute)
CheckNewAnswer: [0],[1],[2]
AdvanceManualClock(100)
AssertOnQuery(<condition>, Execute)
CheckNewAnswer: [3],[4]
AssertOnQuery(<condition>, Run Kafka Producer)
AdvanceManualClock(100)
AssertOnQuery(<condition>, Execute)
CheckNewAnswer:
AdvanceManualClock(100)
AssertOnQuery(<condition>, Execute)
CheckNewAnswer:
AssertOnQuery(<condition>, Run Kafka Producer)
AdvanceManualClock(100)
AssertOnQuery(<condition>, Execute)
CheckNewAnswer: [12],[13],[14]
AdvanceManualClock(100)
AssertOnQuery(<condition>, Execute)
CheckNewAnswer: [15],[16]
AssertOnQuery(<condition>, Run Kafka Producer)
AdvanceManualClock(100)
AssertOnQuery(<condition>, Execute)
CheckNewAnswer: [18],[20]
AdvanceManualClock(100)
AssertOnQuery(<condition>, Execute)
CheckNewAnswer: [22],[23]
AdvanceManualClock(100)
AssertOnQuery(<condition>, Execute)
CheckNewAnswer:
== Stream ==
Output Mode: Append
Stream state: {KafkaV2[Subscribe[topic-52]]: {"topic-52":{"0":0}}}
Thread state: alive
Thread stack trace: java.base/java.io.FileInputStream.readBytes(Native Method)
java.base/java.io.FileInputStream.read(FileInputStream.java:293)
java.base/java.io.BufferedInputStream.read1(BufferedInputStream.java:308)
java.base/java.io.BufferedInputStream.implRead(BufferedInputStream.java:382)
java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:367)
java.base/sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:333)
java.base/sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:376)
java.base/sun.nio.cs.StreamDecoder.lockedRead(StreamDecoder.java:219)
java.base/sun.nio.cs.StreamDecoder.read(StreamDecoder.java:173)
java.base/java.io.InputStreamReader.read(InputStreamReader.java:189)
java.base/java.io.BufferedReader.fill(BufferedReader.java:161)
java.base/java.io.BufferedReader.read1(BufferedReader.java:226)
java.base/java.io.BufferedReader.implRead(BufferedReader.java:315)
java.base/java.io.BufferedReader.read(BufferedReader.java:297)
org.apache.hadoop.util.Shell$ShellCommandExecutor.parseExecResult(Shell.java:1225)
org.apache.hadoop.util.Shell.runCommand(Shell.java:993)
org.apache.hadoop.util.Shell.run(Shell.java:900)
org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:1212)
org.apache.hadoop.util.Shell.execCommand(Shell.java:1306)
org.apache.hadoop.util.Shell.execCommand(Shell.java:1288)
org.apache.hadoop.fs.FileUtil.readLink(FileUtil.java:212)
org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileLinkStatusInternal(RawLocalFileSystem.java:1113)
org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:1102)
org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatus(RawLocalFileSystem.java:1073)
org.apache.hadoop.fs.FileSystem.rename(FileSystem.java:1590)
org.apache.hadoop.fs.DelegateToFileSystem.renameInternal(DelegateToFileSystem.java:206)
org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:790)
org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:720)
org.apache.hadoop.fs.ChecksumFs.renameInternal(ChecksumFs.java:496)
org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:720)
org.apache.hadoop.fs.FileContext.rename(FileContext.java:1036)
org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:370)
org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:154)
org.apache.spark.sql.execution.streaming.HDFSMetadataLog.$anonfun$addNewBatchByStream$2(HDFSMetadataLog.scala:176)
scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
scala.Option.getOrElse(Option.scala:189)
org.apache.spark.sql.execution.streaming.HDFSMetadataLog.addNewBatchByStream(HDFSMetadataLog.scala:171)
org.apache.spark.sql.execution.streaming.HDFSMetadataLog.add(HDFSMetadataLog.scala:116)
org.apache.spark.sql.execution.streaming.OffsetSeqLog.add(OffsetSeqLog.scala:53)
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$13(MicroBatchExecution.scala:509)
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:384)
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:382)
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$1(MicroBatchExecution.scala:507)
scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:726)
org.apache.spark.sql.execution.streaming.MicroBatchExecution.constructNextBatch(MicroBatchExecution.scala:447)
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:252)
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:384)
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:382)
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:233)
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:227)
org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:307)
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:285)
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:208)
== Sink ==
0:
== Plan ==
== Parsed Logical Plan ==
WriteToMicroBatchDataSource MemorySink, 153f4917-cee4-4d8d-bf51-019ec25b6cd2, Append, 0
+- SerializeFromObject [input[0, int, false] AS value#71573]
+- MapElements org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$Lambda$7385/0x000000080263d938@461d76ed, class scala.Tuple2, [StructField(_1,StringType,true), StructField(_2,StringType,true)], obj#71572: int
+- DeserializeToObject newInstance(class scala.Tuple2), obj#71571: scala.Tuple2
+- Project [cast(key#71547 as string) AS key#71561, cast(value#71548 as string) AS value#71562]
+- StreamingDataSourceV2Relation [key#71547, value#71548, topic#71549, partition#71550, offset#71551L, timestamp#71552, timestampType#71553], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@676d619f, KafkaV2[Subscribe[topic-52]], {"topic-52":{"0":0}}, {"topic-52":{"0":0}}
== Analyzed Logical Plan ==
WriteToMicroBatchDataSource MemorySink, 153f4917-cee4-4d8d-bf51-019ec25b6cd2, Append, 0
+- SerializeFromObject [input[0, int, false] AS value#71573]
+- MapElements org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$Lambda$7385/0x000000080263d938@461d76ed, class scala.Tuple2, [StructField(_1,StringType,true), StructField(_2,StringType,true)], obj#71572: int
+- DeserializeToObject newInstance(class scala.Tuple2), obj#71571: scala.Tuple2
+- Project [cast(key#71547 as string) AS key#71561, cast(value#71548 as string) AS value#71562]
+- StreamingDataSourceV2Relation [key#71547, value#71548, topic#71549, partition#71550, offset#71551L, timestamp#71552, timestampType#71553], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@676d619f, KafkaV2[Subscribe[topic-52]], {"topic-52":{"0":0}}, {"topic-52":{"0":0}}
== Optimized Logical Plan ==
WriteToDataSourceV2 MicroBathWrite[epoch: 0, writer: org.apache.spark.sql.execution.streaming.sources.MemoryStreamingWrite@51b1ac59]
+- SerializeFromObject [input[0, int, false] AS value#71573]
+- MapElements org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$Lambda$7385/0x000000080263d938@461d76ed, class scala.Tuple2, [StructField(_1,StringType,true), StructField(_2,StringType,true)], obj#71572: int
+- DeserializeToObject newInstance(class scala.Tuple2), obj#71571: scala.Tuple2
+- Project [cast(key#71547 as string) AS key#71561, cast(value#71548 as string) AS value#71562]
+- StreamingDataSourceV2Relation [key#71547, value#71548, topic#71549, partition#71550, offset#71551L, timestamp#71552, timestampType#71553], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@676d619f, KafkaV2[Subscribe[topic-52]], {"topic-52":{"0":0}}, {"topic-52":{"0":0}}
== Physical Plan ==
WriteToDataSourceV2 MicroBathWrite[epoch: 0, writer: org.apache.spark.sql.execution.streaming.sources.MemoryStreamingWrite@51b1ac59], org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$5924/0x00000008023e0000@670a07bf
+- *(1) SerializeFromObject [input[0, int, false] AS value#71573]
+- *(1) MapElements org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$Lambda$7385/0x000000080263d938@461d76ed, obj#71572: int
+- *(1) DeserializeToObject newInstance(class scala.Tuple2), obj#71571: scala.Tuple2
+- *(1) Project [cast(key#71547 as string) AS key#71561, cast(value#71548 as string) AS value#71562]
+- MicroBatchScan[key#71547, value#71548, topic#71549, partition#71550, offset#71551L, timestamp#71552, timestampType#71553] class org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan (StreamTest.scala:460) {code}
> KafkaMicroBatchV2SourceWithAdminSuite failed
> --------------------------------------------
>
> Key: SPARK-40734
> URL: https://issues.apache.org/jira/browse/SPARK-40734
> Project: Spark
> Issue Type: Sub-task
> Components: Structured Streaming
> Affects Versions: 3.4.0
> Reporter: Yang Jie
> Priority: Minor
>
> - ensure stream-stream self-join generates only one offset in log and correct metrics *** FAILED ***
> - read Kafka transactional messages: read_committed *** FAILED ***
> Failure reason to be supplemented
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org