You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Prabhjot Singh Bharaj (JIRA)" <ji...@apache.org> on 2019/04/11 15:11:00 UTC

[jira] [Comment Edited] (SPARK-27409) Micro-batch support for Kafka Source in Spark 2.3

    [ https://issues.apache.org/jira/browse/SPARK-27409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16815491#comment-16815491 ] 

Prabhjot Singh Bharaj edited comment on SPARK-27409 at 4/11/19 3:10 PM:
------------------------------------------------------------------------

{code:java}
In [5]: df = sc.sql.readStream.format('kafka').option('kafka.bootstrap.servers', 'localhost:9093').option("kafka.security.protocol", "SSL",)\
 ...: .option("kafka.ssl.keystore.password", "").option("kafka.ssl.keystore.type", "PKCS12").option("kafka.ssl.keystore.location", 'non-existent').option('subscribe', 'no existing topic').load()


2019-04-11 15:04:49.903 INFO ConsumerConfig: ConsumerConfig values:
 auto.commit.interval.ms = 5000
 auto.offset.reset = earliest
 bootstrap.servers = [localhost:9093]
 check.crcs = true
 client.id =
 connections.max.idle.ms = 540000
 enable.auto.commit = false
 exclude.internal.topics = true
 fetch.max.bytes = 52428800
 fetch.max.wait.ms = 500
 fetch.min.bytes = 1
 group.id = spark-kafka-source-a944d4a9-0257-492a-88b4-bba9decebb28-876181411-driver-0
 heartbeat.interval.ms = 3000
 interceptor.classes = null
 key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
 max.partition.fetch.bytes = 1048576
 max.poll.interval.ms = 300000
 max.poll.records = 1
 metadata.max.age.ms = 300000
 metric.reporters = []
 metrics.num.samples = 2
 metrics.recording.level = INFO
 metrics.sample.window.ms = 30000
 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
 receive.buffer.bytes = 65536
 reconnect.backoff.ms = 50
 request.timeout.ms = 305000
 retry.backoff.ms = 100
 sasl.jaas.config = null
 sasl.kerberos.kinit.cmd = /usr/bin/kinit
 sasl.kerberos.min.time.before.relogin = 60000
 sasl.kerberos.service.name = null
 sasl.kerberos.ticket.renew.jitter = 0.05
 sasl.kerberos.ticket.renew.window.factor = 0.8
 sasl.mechanism = GSSAPI
 security.protocol = SSL
 send.buffer.bytes = 131072
 session.timeout.ms = 10000
 ssl.cipher.suites = null
 ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
 ssl.endpoint.identification.algorithm = null
 ssl.key.password = null
 ssl.keymanager.algorithm = SunX509
 ssl.keystore.location = non-existent
 ssl.keystore.password = [hidden]
 ssl.keystore.type = PKCS12
 ssl.protocol = TLS
 ssl.provider = null
 ssl.secure.random.implementation = null
 ssl.trustmanager.algorithm = PKIX
 ssl.truststore.location = null
 ssl.truststore.password = null
 ssl.truststore.type = JKS
 value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
/<redacted>/ in <module>()
----> 1 df = sc.sql.readStream.format('kafka').option('kafka.bootstrap.servers', 'localhost:9093').option("kafka.security.protocol", "SSL",).option("kafka.ssl.keystore.password", "").option("kafka.ssl.keystore.type", "PKCS12").option("kafka.ssl.keystore.location", 'non-existent').option('subscribe', 'no existing topic').load()

/<redacted>/python/pyspark/sql/streaming.pyc in load(self, path, format, schema, **options)
 401 return self._df(self._jreader.load(path))
 402 else:
--> 403 return self._df(self._jreader.load())
 404
 405 @since(2.0)

/<redacted>/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
 1255 answer = self.gateway_client.send_command(command)
 1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
 1258
 1259 for temp_arg in temp_args:

/<redacted>/python/pyspark/sql/utils.pyc in deco(*a, **kw)
 61 def deco(*a, **kw):
 62 try:
---> 63 return f(*a, **kw)
 64 except py4j.protocol.Py4JJavaError as e:
 65 s = e.java_exception.toString()

/<redacted>/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
 326 raise Py4JJavaError(
 327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
 329 else:
 330 raise Py4JError(

Py4JJavaError: An error occurred while calling o147.load.
: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
 at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:717)
 at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:566)
 at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:549)
 at org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:62)
 at org.apache.spark.sql.kafka010.KafkaOffsetReader.createConsumer(KafkaOffsetReader.scala:314)
 at org.apache.spark.sql.kafka010.KafkaOffsetReader.<init>(KafkaOffsetReader.scala:78)
 at org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:130)
 at org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:43)
 at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:185)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
 at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
 at py4j.Gateway.invoke(Gateway.java:282)
 at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
 at py4j.commands.CallCommand.execute(CallCommand.java:79)
 at py4j.GatewayConnection.run(GatewayConnection.java:238)
 at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: java.io.FileNotFoundException: non-existent (No such file or directory)
 at org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:44)
 at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:93)
 at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:51)
 at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:84)
 at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:657)
 ... 19 more
Caused by: org.apache.kafka.common.KafkaException: java.io.FileNotFoundException: non-existent (No such file or directory)
 at org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:121)
 at org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:41)
 ... 23 more
Caused by: java.io.FileNotFoundException: non-existent (No such file or directory)
 at java.io.FileInputStream.open0(Native Method)
 at java.io.FileInputStream.open(FileInputStream.java:195)
 at java.io.FileInputStream.<init>(FileInputStream.java:138)
 at java.io.FileInputStream.<init>(FileInputStream.java:93)
 at org.apache.kafka.common.security.ssl.SslFactory$SecurityStore.load(SslFactory.java:216)
 at org.apache.kafka.common.security.ssl.SslFactory$SecurityStore.access$000(SslFactory.java:201)
 at org.apache.kafka.common.security.ssl.SslFactory.createSSLContext(SslFactory.java:137)
 at org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:119)
 ... 24 more{code}
 

Going through these logs, notice the calls to `createContinuousReader`. This seems like a bug or something thats not documented. My understanding was that it should go to the `createMicroBatchReader` but its not.




was (Author: pbharaj):
{code:java}
In [5]: df = sc.sql.readStream.format('kafka').option('kafka.bootstrap.servers', 'localhost:9093').option("kafka.security.protocol", "SSL",)\
 ...: .option("kafka.ssl.keystore.password", "").option("kafka.ssl.keystore.type", "PKCS12").option("kafka.ssl.keystore.location", 'non-existent').option('subscribe', 'no existing topic').load()


2019-04-11 15:04:49.903 INFO ConsumerConfig: ConsumerConfig values:
 auto.commit.interval.ms = 5000
 auto.offset.reset = earliest
 bootstrap.servers = [localhost:9093]
 check.crcs = true
 client.id =
 connections.max.idle.ms = 540000
 enable.auto.commit = false
 exclude.internal.topics = true
 fetch.max.bytes = 52428800
 fetch.max.wait.ms = 500
 fetch.min.bytes = 1
 group.id = spark-kafka-source-a944d4a9-0257-492a-88b4-bba9decebb28-876181411-driver-0
 heartbeat.interval.ms = 3000
 interceptor.classes = null
 key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
 max.partition.fetch.bytes = 1048576
 max.poll.interval.ms = 300000
 max.poll.records = 1
 metadata.max.age.ms = 300000
 metric.reporters = []
 metrics.num.samples = 2
 metrics.recording.level = INFO
 metrics.sample.window.ms = 30000
 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
 receive.buffer.bytes = 65536
 reconnect.backoff.ms = 50
 request.timeout.ms = 305000
 retry.backoff.ms = 100
 sasl.jaas.config = null
 sasl.kerberos.kinit.cmd = /usr/bin/kinit
 sasl.kerberos.min.time.before.relogin = 60000
 sasl.kerberos.service.name = null
 sasl.kerberos.ticket.renew.jitter = 0.05
 sasl.kerberos.ticket.renew.window.factor = 0.8
 sasl.mechanism = GSSAPI
 security.protocol = SSL
 send.buffer.bytes = 131072
 session.timeout.ms = 10000
 ssl.cipher.suites = null
 ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
 ssl.endpoint.identification.algorithm = null
 ssl.key.password = null
 ssl.keymanager.algorithm = SunX509
 ssl.keystore.location = non-existent
 ssl.keystore.password = [hidden]
 ssl.keystore.type = PKCS12
 ssl.protocol = TLS
 ssl.provider = null
 ssl.secure.random.implementation = null
 ssl.trustmanager.algorithm = PKIX
 ssl.truststore.location = null
 ssl.truststore.password = null
 ssl.truststore.type = JKS
 value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
/<redacted>/ in <module>()
----> 1 df = sc.sql.readStream.format('kafka').option('kafka.bootstrap.servers', 'localhost:9093').option("kafka.security.protocol", "SSL",).option("kafka.ssl.keystore.password", "").option("kafka.ssl.keystore.type", "PKCS12").option("kafka.ssl.keystore.location", 'non-existent').option('subscribe', 'no existing topic').load()

/<redacted>/python/pyspark/sql/streaming.pyc in load(self, path, format, schema, **options)
 401 return self._df(self._jreader.load(path))
 402 else:
--> 403 return self._df(self._jreader.load())
 404
 405 @since(2.0)

/<redacted>/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
 1255 answer = self.gateway_client.send_command(command)
 1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
 1258
 1259 for temp_arg in temp_args:

/<redacted>/python/pyspark/sql/utils.pyc in deco(*a, **kw)
 61 def deco(*a, **kw):
 62 try:
---> 63 return f(*a, **kw)
 64 except py4j.protocol.Py4JJavaError as e:
 65 s = e.java_exception.toString()

/<redacted>/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
 326 raise Py4JJavaError(
 327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
 329 else:
 330 raise Py4JError(

Py4JJavaError: An error occurred while calling o147.load.
: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
 at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:717)
 at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:566)
 at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:549)
 at org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:62)
 at org.apache.spark.sql.kafka010.KafkaOffsetReader.createConsumer(KafkaOffsetReader.scala:314)
 at org.apache.spark.sql.kafka010.KafkaOffsetReader.<init>(KafkaOffsetReader.scala:78)
 at org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:130)
 at org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:43)
 at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:185)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
 at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
 at py4j.Gateway.invoke(Gateway.java:282)
 at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
 at py4j.commands.CallCommand.execute(CallCommand.java:79)
 at py4j.GatewayConnection.run(GatewayConnection.java:238)
 at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: java.io.FileNotFoundException: non-existent (No such file or directory)
 at org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:44)
 at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:93)
 at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:51)
 at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:84)
 at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:657)
 ... 19 more
Caused by: org.apache.kafka.common.KafkaException: java.io.FileNotFoundException: non-existent (No such file or directory)
 at org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:121)
 at org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:41)
 ... 23 more
Caused by: java.io.FileNotFoundException: non-existent (No such file or directory)
 at java.io.FileInputStream.open0(Native Method)
 at java.io.FileInputStream.open(FileInputStream.java:195)
 at java.io.FileInputStream.<init>(FileInputStream.java:138)
 at java.io.FileInputStream.<init>(FileInputStream.java:93)
 at org.apache.kafka.common.security.ssl.SslFactory$SecurityStore.load(SslFactory.java:216)
 at org.apache.kafka.common.security.ssl.SslFactory$SecurityStore.access$000(SslFactory.java:201)
 at org.apache.kafka.common.security.ssl.SslFactory.createSSLContext(SslFactory.java:137)
 at org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:119)
 ... 24 more{code}
 

Going through these logs, notice the calls to `createContinuousReader`. This seems like a bug or something thats not documented. My understanding was that it should go to the `createMicroBatchReader` but its not.

`

> Micro-batch support for Kafka Source in Spark 2.3
> -------------------------------------------------
>
>                 Key: SPARK-27409
>                 URL: https://issues.apache.org/jira/browse/SPARK-27409
>             Project: Spark
>          Issue Type: Question
>          Components: Structured Streaming
>    Affects Versions: 2.3.2
>            Reporter: Prabhjot Singh Bharaj
>            Priority: Major
>
> It seems with this change - [https://github.com/apache/spark/commit/0a441d2edb0a3f6c6c7c370db8917e1c07f211e7#diff-eeac5bdf3a1ecd7b9f8aaf10fff37f05R50] in Spark 2.3 for Kafka Source Provider, a Kafka source can not be run in micro-batch mode but only in continuous mode. Is that understanding correct ?
> {code:java}
> E Py4JJavaError: An error occurred while calling o217.load.
> E : org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
> E at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:717)
> E at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:566)
> E at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:549)
> E at org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:62)
> E at org.apache.spark.sql.kafka010.KafkaOffsetReader.createConsumer(KafkaOffsetReader.scala:314)
> E at org.apache.spark.sql.kafka010.KafkaOffsetReader.<init>(KafkaOffsetReader.scala:78)
> E at org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:130)
> E at org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:43)
> E at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:185)
> E at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> E at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> E at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> E at java.lang.reflect.Method.invoke(Method.java:498)
> E at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> E at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> E at py4j.Gateway.invoke(Gateway.java:282)
> E at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> E at py4j.commands.CallCommand.execute(CallCommand.java:79)
> E at py4j.GatewayConnection.run(GatewayConnection.java:238)
> E at java.lang.Thread.run(Thread.java:748)
> E Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: java.io.FileNotFoundException: non-existent (No such file or directory)
> E at org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:44)
> E at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:93)
> E at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:51)
> E at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:84)
> E at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:657)
> E ... 19 more
> E Caused by: org.apache.kafka.common.KafkaException: java.io.FileNotFoundException: non-existent (No such file or directory)
> E at org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:121)
> E at org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:41)
> E ... 23 more
> E Caused by: java.io.FileNotFoundException: non-existent (No such file or directory)
> E at java.io.FileInputStream.open0(Native Method)
> E at java.io.FileInputStream.open(FileInputStream.java:195)
> E at java.io.FileInputStream.<init>(FileInputStream.java:138)
> E at java.io.FileInputStream.<init>(FileInputStream.java:93)
> E at org.apache.kafka.common.security.ssl.SslFactory$SecurityStore.load(SslFactory.java:216)
> E at org.apache.kafka.common.security.ssl.SslFactory$SecurityStore.access$000(SslFactory.java:201)
> E at org.apache.kafka.common.security.ssl.SslFactory.createSSLContext(SslFactory.java:137)
> E at org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:119)
> E ... 24 more{code}
>  When running a simple data stream loader for kafka without an SSL cert, it goes through this code block - 
>  
> {code:java}
> ...
> ...
> org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:130)
> E at org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:43)
> E at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:185)
> ...
> ...{code}
>  
> Note that I haven't selected `trigger=continuous...` when creating the dataframe, still the code is going through the continuous path. My understanding was that `continuous` is optional and not the default.
>  
> Please clarify.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org