You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Archit jain (Jira)" <ji...@apache.org> on 2021/08/16 05:56:00 UTC

[jira] [Created] (SPARK-36517) spark-sql-kafka source in spark 2.4.2 causes reading stream failure frequently

Archit jain created SPARK-36517:
-----------------------------------

             Summary: spark-sql-kafka source in spark 2.4.2 causes reading stream failure frequently
                 Key: SPARK-36517
                 URL: https://issues.apache.org/jira/browse/SPARK-36517
             Project: Spark
          Issue Type: Bug
          Components: Structured Streaming
    Affects Versions: 2.4.2
            Reporter: Archit jain


 

Hi Team,

I am getting the below error intermittently **due to which I am facing a data loss issue in the application. The packets consumed are not equal to the packets produced.


*[ERROR] o.a.s.e.Executor:91 [Executor task launch worker for task 6503931] - Exception in task 17.0 in stage 89080.0 (TID 6503931) java.util.concurrent.TimeoutException: Cannot fetch record for offset 43751946 in 7168 milliseconds*

*Stack Trace:*
{code:java}
[ERROR] o.a.s.e.Executor:91 [Executor task launch worker for task 6503931] - Exception in task 17.0 in stage 89080.0 (TID 6503931)
java.util.concurrent.TimeoutException: Cannot fetch record for offset 43751946 in 7168 milliseconds
	at org.apache.spark.sql.kafka010.InternalKafkaConsumer.fetchData(KafkaDataConsumer.scala:489)
	at org.apache.spark.sql.kafka010.InternalKafkaConsumer.fetchRecord(KafkaDataConsumer.scala:361)
	at org.apache.spark.sql.kafka010.InternalKafkaConsumer.$anonfun$get$1(KafkaDataConsumer.scala:251)
	at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
	at org.apache.spark.sql.kafka010.InternalKafkaConsumer.runUninterruptiblyIfPossible(KafkaDataConsumer.scala:209)
	at org.apache.spark.sql.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:234)
	at org.apache.spark.sql.kafka010.KafkaDataConsumer.get(KafkaDataConsumer.scala:64)
	at org.apache.spark.sql.kafka010.KafkaDataConsumer.get$(KafkaDataConsumer.scala:59)
	at org.apache.spark.sql.kafka010.KafkaDataConsumer$NonCachedKafkaDataConsumer.get(KafkaDataConsumer.scala:506)
	at org.apache.spark.sql.kafka010.KafkaMicroBatchInputPartitionReader.next(KafkaMicroBatchReader.scala:357)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:49)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:636)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$2(WriteToDataSourceV2Exec.scala:117)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
	at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:116)
	at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.$anonfun$doExecute$2(WriteToDataSourceV2Exec.scala:67)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
{code}
I am using the below spark and kafka versions:
{code:java}
<spark.version>2.4.2</spark.version>
<kafka.version>2.2.1</kafka.version>
<scala.tools.version>2.12</scala.tools.version>{code}
My pom looks like this:
{code:java}
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql-kafka-0-10_${scala.tools.version}</artifactId>
    <version>${spark.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_${scala.tools.version}</artifactId>
    <version>${spark.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>${kafka.version}</version>
</dependency>
{code}
 

 

Can you please help here.

 

Thanks

Archit Jain

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org