You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Archit jain (Jira)" <ji...@apache.org> on 2021/08/16 05:47:00 UTC

[jira] [Commented] (SPARK-31460) spark-sql-kafka source in spark 2.4.4 causes reading stream failure frequently

    [ https://issues.apache.org/jira/browse/SPARK-31460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17399530#comment-17399530 ] 

Archit jain commented on SPARK-31460:
-------------------------------------

[~akanksha_goel1] Did you get the resolution of the above issue?

I am also getting the same issue.

> spark-sql-kafka source in spark 2.4.4 causes reading stream failure frequently
> ------------------------------------------------------------------------------
>
>                 Key: SPARK-31460
>                 URL: https://issues.apache.org/jira/browse/SPARK-31460
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.4.4
>            Reporter: vinay
>            Priority: Major
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> In spark 2.4.4 , it provides a source "spark-sql-kafka-0-10_2.11".
>  
> When I wanted to read from my kafka-0.10.2.11 cluster, it throws out an error "*java.util.concurrent.TimeoutException: Cannot fetch record for offset xxxxx in 1000 milliseconds*"  frequently, and the job thus failed.
>  
> I see this issue was seen before in 2.3 according to ticket 23829 and an upgrade to spark 2.4 was supposed to solve this.
>  
> {code:java}
> compile group: 'org.apache.spark', name: 'spark-sql-kafka-0-10_2.11', version: '2.4.4'{code}
> Here is the error stack.
> {code:java}
> org.apache.spark.SparkException: Writing job aborted.
>  org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2Exec.scala:92)
>  org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>  org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>  org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
>  org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>  org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>  org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>  org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
>  org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:296)
>  org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3389)
>  org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2788)
>  org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2788)
>  org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370)
>  org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
>  org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
>  org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
>  org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
>  org.apache.spark.sql.Dataset.collect(Dataset.scala:2788)
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5$$anonfun$apply$17.apply(MicroBatchExecution.scala:540)
>  org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
>  org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
>  org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5.apply(MicroBatchExecution.scala:535)
>  org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
>  org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:534)
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:198)
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
>  org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
>  org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
>  org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
>  org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
>  org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)
> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 44426.0 failed 4 times, most recent failure: Lost task 0.3 in stage 44426.0 (TID 209753, 10.244.3.161, executor 5): java.util.concurrent.TimeoutException: Cannot fetch record for offset 7700744 in 1000 milliseconds
>  org.apache.spark.sql.kafka010.InternalKafkaConsumer.fetchData(KafkaDataConsumer.scala:488)
> org.apache.spark.sql.kafka010.InternalKafkaConsumer.org$apache$spark$sql$kafka010$InternalKafkaConsumer$$fetchRecord(KafkaDataConsumer.scala:371)
>  org.apache.spark.sql.kafka010.InternalKafkaConsumer$$anonfun$get$1.apply(KafkaDataConsumer.scala:251)
>  org.apache.spark.sql.kafka010.InternalKafkaConsumer$$anonfun$get$1.apply(KafkaDataConsumer.scala:234)
>  org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
>  org.apache.spark.sql.kafka010.InternalKafkaConsumer.runUninterruptiblyIfPossible(KafkaDataConsumer.scala:209)
>  org.apache.spark.sql.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:234)
>  org.apache.spark.sql.kafka010.KafkaDataConsumer$class.get(KafkaDataConsumer.scala:64)
>  org.apache.spark.sql.kafka010.KafkaDataConsumer$NonCachedKafkaDataConsumer.get(KafkaDataConsumer.scala:506)
>  org.apache.spark.sql.kafka010.KafkaMicroBatchInputPartitionReader.next(KafkaMicroBatchReader.scala:357)
>  org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:49)
>  org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
>  org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
>  org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>  org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
>  org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:117)
>  org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:116)
>  org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
>  org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:146)
>  org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67)
>  org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66)
>  org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>  org.apache.spark.scheduler.Task.run(Task.scala:123)
>  org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
>  org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
>  org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
>  java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  java.lang.Thread.run(Thread.java:748)
> Driver stacktrace:
>  org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
>  org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
>  org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
>  scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>  scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>  org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
>  org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
>  org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
>  scala.Option.foreach(Option.scala:257)
>  org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
>  org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
>  org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
>  org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
>  org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
>  org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
>  org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
>  org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2Exec.scala:64)... 35 more
> Caused by: java.util.concurrent.TimeoutException: Cannot fetch record for offset 7700744 in 1000 milliseconds
>  org.apache.spark.sql.kafka010.InternalKafkaConsumer.fetchData(KafkaDataConsumer.scala:488)
> org.apache.spark.sql.kafka010.InternalKafkaConsumer.org$apache$spark$sql$kafka010$InternalKafkaConsumer$$fetchRecord(KafkaDataConsumer.scala:371)
>  org.apache.spark.sql.kafka010.InternalKafkaConsumer$$anonfun$get$1.apply(KafkaDataConsumer.scala:251)
>  org.apache.spark.sql.kafka010.InternalKafkaConsumer$$anonfun$get$1.apply(KafkaDataConsumer.scala:234)
>  org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
>  org.apache.spark.sql.kafka010.InternalKafkaConsumer.runUninterruptiblyIfPossible(KafkaDataConsumer.scala:209)
>  org.apache.spark.sql.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:234)
>  org.apache.spark.sql.kafka010.KafkaDataConsumer$class.get(KafkaDataConsumer.scala:64)
>  org.apache.spark.sql.kafka010.KafkaDataConsumer$NonCachedKafkaDataConsumer.get(KafkaDataConsumer.scala:506)
>  org.apache.spark.sql.kafka010.KafkaMicroBatchInputPartitionReader.next(KafkaMicroBatchReader.scala:357)
>  org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:49)
>  org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
>  org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
>  org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>  org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
>  org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:117)
>  org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:116)
>  org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
>  org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:146)
>  org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67)
>  org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66)
>  org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>  org.apache.spark.scheduler.Task.run(Task.scala:123)
>  org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
>  org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
>  org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
>  java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org