You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Shivu Sondur (JIRA)" <ji...@apache.org> on 2019/04/09 03:54:00 UTC

[jira] [Commented] (SPARK-27409) Micro-batch support for Kafka Source in Spark 2.3

    [ https://issues.apache.org/jira/browse/SPARK-27409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16812991#comment-16812991 ] 

Shivu Sondur commented on SPARK-27409:
--------------------------------------

i am checking this

> Micro-batch support for Kafka Source in Spark 2.3
> -------------------------------------------------
>
>                 Key: SPARK-27409
>                 URL: https://issues.apache.org/jira/browse/SPARK-27409
>             Project: Spark
>          Issue Type: Question
>          Components: Structured Streaming
>    Affects Versions: 2.3.2
>            Reporter: Prabhjot Singh Bharaj
>            Priority: Major
>
> It seems with this change - [https://github.com/apache/spark/commit/0a441d2edb0a3f6c6c7c370db8917e1c07f211e7#diff-eeac5bdf3a1ecd7b9f8aaf10fff37f05R50] in Spark 2.3 for Kafka Source Provider, a Kafka source can not be run in micro-batch mode but only in continuous mode. Is that understanding correct ?
> {code:java}
> E Py4JJavaError: An error occurred while calling o217.load.
> E : org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
> E at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:717)
> E at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:566)
> E at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:549)
> E at org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:62)
> E at org.apache.spark.sql.kafka010.KafkaOffsetReader.createConsumer(KafkaOffsetReader.scala:314)
> E at org.apache.spark.sql.kafka010.KafkaOffsetReader.<init>(KafkaOffsetReader.scala:78)
> E at org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:130)
> E at org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:43)
> E at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:185)
> E at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> E at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> E at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> E at java.lang.reflect.Method.invoke(Method.java:498)
> E at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> E at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> E at py4j.Gateway.invoke(Gateway.java:282)
> E at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> E at py4j.commands.CallCommand.execute(CallCommand.java:79)
> E at py4j.GatewayConnection.run(GatewayConnection.java:238)
> E at java.lang.Thread.run(Thread.java:748)
> E Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: java.io.FileNotFoundException: non-existent (No such file or directory)
> E at org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:44)
> E at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:93)
> E at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:51)
> E at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:84)
> E at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:657)
> E ... 19 more
> E Caused by: org.apache.kafka.common.KafkaException: java.io.FileNotFoundException: non-existent (No such file or directory)
> E at org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:121)
> E at org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:41)
> E ... 23 more
> E Caused by: java.io.FileNotFoundException: non-existent (No such file or directory)
> E at java.io.FileInputStream.open0(Native Method)
> E at java.io.FileInputStream.open(FileInputStream.java:195)
> E at java.io.FileInputStream.<init>(FileInputStream.java:138)
> E at java.io.FileInputStream.<init>(FileInputStream.java:93)
> E at org.apache.kafka.common.security.ssl.SslFactory$SecurityStore.load(SslFactory.java:216)
> E at org.apache.kafka.common.security.ssl.SslFactory$SecurityStore.access$000(SslFactory.java:201)
> E at org.apache.kafka.common.security.ssl.SslFactory.createSSLContext(SslFactory.java:137)
> E at org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:119)
> E ... 24 more{code}
>  When running a simple data stream loader for kafka without an SSL cert, it goes through this code block - 
>  
> {code:java}
> ...
> ...
> org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:130)
> E at org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:43)
> E at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:185)
> ...
> ...{code}
>  
> Note that I haven't selected `trigger=continuous...` when creating the dataframe, still the code is going through the continuous path. My understanding was that `continuous` is optional and not the default.
>  
> Please clarify.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org