You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Binzi Cao (JIRA)" <ji...@apache.org> on 2018/09/04 10:50:00 UTC

[jira] [Comment Edited] (SPARK-24189) Spark Strcutured Streaming not working with the Kafka Transactions

    [ https://issues.apache.org/jira/browse/SPARK-24189?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16602889#comment-16602889 ] 

Binzi Cao edited comment on SPARK-24189 at 9/4/18 10:49 AM:
------------------------------------------------------------

It seems I'm hitting a similar issue. I managed to set the kafka isolation level with
{code:java}
.option("kafka.isolation.level", "read_committed")
{code}
and using
{code:java}
kafka-client 1.0.0 
Spark version: 2.3.1{code}
and I'm seeing this issue:
{code:java}
[error] org.apache.spark.sql.streaming.StreamingQueryException: Job aborted due to stage failure: Task 17 in stage 0.0 failed 1 times, most recent failure: Lost task 17.0 in stage 0.0 (TID 17, localhost, executor driver): java.util.concurrent.TimeoutException: Cannot fetch record for offset 145 in 2000 milliseconds
[error]         at org.apache.spark.sql.kafka010.InternalKafkaConsumer.org$apache$spark$sql$kafka010$InternalKafkaConsumer$$fetchData(KafkaDataConsumer.scala:271)
[error]         at org.apache.spark.sql.kafka010.InternalKafkaConsumer$$anonfun$get$1.apply(KafkaDataConsumer.scala:163)
[error]         at org.apache.spark.sql.kafka010.InternalKafkaConsumer$$anonfun$get$1.apply(KafkaDataConsumer.scala:147)
[error]         at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
[error]         at org.apache.spark.sql.kafka010.InternalKafkaConsumer.runUninterruptiblyIfPossible(KafkaDataConsumer.scala:109)
[error]         at org.apache.spark.sql.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:147)
[error]         at org.apache.spark.sql.kafka010.KafkaDataConsumer$class.get(KafkaDataConsumer.scala:54)
[error]         at org.apache.spark.sql.kafka010.KafkaDataConsumer$CachedKafkaDataConsumer.get(KafkaDataConsumer.scala:362)
[error]         at org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:152)
[error]         at org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:143)
[error]         at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
[error]         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
[error]         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
[error]         at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(generated.java:205)
[error]         at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
[error]         at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
[error]         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
[error]         at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.agg_doAggregateWithKeys_0$(generated.java:230)


{code}
So it looks like it is not working with a topic with kafka transactions at all.

The exception was thrown here:
 [https://github.com/apache/spark/blob/v2.3.1/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala#L271-L272]

Setting
{code:java}
 failOnDataLoss=false
{code}
can't fix the issue, as the exception is never caught in the KafkaDataConsumer.scala code.


was (Author: caobinzi):
It seems I'm hitting a similar issuel. I managed to set the kafka isolation level with
{code:java}
.option("kafka.isolation.level", "read_committed")
{code}
and using
{code:java}
kafka-client 1.0.0 
Spark version: 2.3.1{code}
and I'm seeing this issue:
{code:java}
[error] org.apache.spark.sql.streaming.StreamingQueryException: Job aborted due to stage failure: Task 17 in stage 0.0 failed 1 times, most recent failure: Lost task 17.0 in stage 0.0 (TID 17, localhost, executor driver): java.util.concurrent.TimeoutException: Cannot fetch record for offset 145 in 2000 milliseconds
[error]         at org.apache.spark.sql.kafka010.InternalKafkaConsumer.org$apache$spark$sql$kafka010$InternalKafkaConsumer$$fetchData(KafkaDataConsumer.scala:271)
[error]         at org.apache.spark.sql.kafka010.InternalKafkaConsumer$$anonfun$get$1.apply(KafkaDataConsumer.scala:163)
[error]         at org.apache.spark.sql.kafka010.InternalKafkaConsumer$$anonfun$get$1.apply(KafkaDataConsumer.scala:147)
[error]         at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
[error]         at org.apache.spark.sql.kafka010.InternalKafkaConsumer.runUninterruptiblyIfPossible(KafkaDataConsumer.scala:109)
[error]         at org.apache.spark.sql.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:147)
[error]         at org.apache.spark.sql.kafka010.KafkaDataConsumer$class.get(KafkaDataConsumer.scala:54)
[error]         at org.apache.spark.sql.kafka010.KafkaDataConsumer$CachedKafkaDataConsumer.get(KafkaDataConsumer.scala:362)
[error]         at org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:152)
[error]         at org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:143)
[error]         at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
[error]         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
[error]         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
[error]         at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(generated.java:205)
[error]         at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
[error]         at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
[error]         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
[error]         at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.agg_doAggregateWithKeys_0$(generated.java:230)


{code}
So it looks like it is not working with a topic with kafka transactions at all.

The exception was thrown here:
 [https://github.com/apache/spark/blob/v2.3.1/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala#L271-L272]

Setting
{code:java}
 failOnDataLoss=false
{code}
can't fix the issue, as the exception is never caught in the KafkaDataConsumer.scala code.

> Spark Strcutured Streaming not working with the Kafka Transactions
> ------------------------------------------------------------------
>
>                 Key: SPARK-24189
>                 URL: https://issues.apache.org/jira/browse/SPARK-24189
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.3.0
>            Reporter: bharath kumar avusherla
>            Priority: Major
>
> Was trying to read kafka transactional topic using Spark Structured Streaming 2.3.0 with the  kafka option isolation-level = "read_committed", but spark reading the data immediately without waiting for the data in topic to be committed. In spark documentation it was mentioned as Structured Streaming supports Kafka version 0.10 or higher. I am using below command to test the scenario.
> val df = spark
>  .readStream
>  .format("kafka")
>  .option("kafka.bootstrap.servers", "localhost:9092")
>  .option("subscribe", "test-topic")
>  .option("isolation-level","read_committed")
>  .load()
> Can you please let me know if the transactional read is supported in SPark 2.3.0 strcutured Streaming or am i missing anything.
>  
> Thank you.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org