You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Prabhjot Singh Bharaj (JIRA)" <ji...@apache.org> on 2019/04/08 18:51:00 UTC

[jira] [Created] (SPARK-27409) Micro-batch support for Kafka Source in Spark 2.3

Prabhjot Singh Bharaj created SPARK-27409:
---------------------------------------------

             Summary: Micro-batch support for Kafka Source in Spark 2.3
                 Key: SPARK-27409
                 URL: https://issues.apache.org/jira/browse/SPARK-27409
             Project: Spark
          Issue Type: Question
          Components: Structured Streaming
    Affects Versions: 2.3.2
            Reporter: Prabhjot Singh Bharaj


It seems with this change - [https://github.com/apache/spark/commit/0a441d2edb0a3f6c6c7c370db8917e1c07f211e7#diff-eeac5bdf3a1ecd7b9f8aaf10fff37f05R50] in Spark 2.3 for Kafka Source Provider, a Kafka source can not be run in micro-batch mode but only in continuous mode. Is that understanding correct ?
{code:java}
E Py4JJavaError: An error occurred while calling o217.load.
E : org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
E at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:717)
E at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:566)
E at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:549)
E at org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:62)
E at org.apache.spark.sql.kafka010.KafkaOffsetReader.createConsumer(KafkaOffsetReader.scala:314)
E at org.apache.spark.sql.kafka010.KafkaOffsetReader.<init>(KafkaOffsetReader.scala:78)
E at org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:130)
E at org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:43)
E at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:185)
E at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
E at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
E at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
E at java.lang.reflect.Method.invoke(Method.java:498)
E at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
E at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
E at py4j.Gateway.invoke(Gateway.java:282)
E at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
E at py4j.commands.CallCommand.execute(CallCommand.java:79)
E at py4j.GatewayConnection.run(GatewayConnection.java:238)
E at java.lang.Thread.run(Thread.java:748)
E Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: java.io.FileNotFoundException: non-existent (No such file or directory)
E at org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:44)
E at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:93)
E at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:51)
E at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:84)
E at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:657)
E ... 19 more
E Caused by: org.apache.kafka.common.KafkaException: java.io.FileNotFoundException: non-existent (No such file or directory)
E at org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:121)
E at org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:41)
E ... 23 more
E Caused by: java.io.FileNotFoundException: non-existent (No such file or directory)
E at java.io.FileInputStream.open0(Native Method)
E at java.io.FileInputStream.open(FileInputStream.java:195)
E at java.io.FileInputStream.<init>(FileInputStream.java:138)
E at java.io.FileInputStream.<init>(FileInputStream.java:93)
E at org.apache.kafka.common.security.ssl.SslFactory$SecurityStore.load(SslFactory.java:216)
E at org.apache.kafka.common.security.ssl.SslFactory$SecurityStore.access$000(SslFactory.java:201)
E at org.apache.kafka.common.security.ssl.SslFactory.createSSLContext(SslFactory.java:137)
E at org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:119)
E ... 24 more{code}
 When running a simple data stream loader for kafka without an SSL cert, it goes through this code block - 
 
{code:java}
...
...
org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:130)
E at org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:43)
E at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:185)
...
...{code}
 
Note that I haven't selected `trigger=continuous...` when creating the dataframe, still the code is going through the continuous path. My understanding was that `continuous` is optional and not the default.
 
Please clarify.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org